mito2/read/
scan_util.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities for scanners.
16
17use std::fmt;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use async_stream::try_stream;
22use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time};
23use futures::Stream;
24use prometheus::IntGauge;
25use smallvec::SmallVec;
26use store_api::storage::RegionId;
27
28use crate::error::Result;
29use crate::memtable::MemScanMetrics;
30use crate::metrics::{
31    IN_PROGRESS_SCAN, PRECISE_FILTER_ROWS_TOTAL, READ_BATCHES_RETURN, READ_ROWS_IN_ROW_GROUP_TOTAL,
32    READ_ROWS_RETURN, READ_ROW_GROUPS_TOTAL, READ_STAGE_ELAPSED,
33};
34use crate::read::range::{RangeBuilderList, RowGroupIndex};
35use crate::read::scan_region::StreamContext;
36use crate::read::{Batch, BoxedBatchStream, ScannerMetrics, Source};
37use crate::sst::file::FileTimeRange;
38use crate::sst::parquet::file_range::FileRange;
39use crate::sst::parquet::reader::{ReaderFilterMetrics, ReaderMetrics};
40
41/// Verbose scan metrics for a partition.
42#[derive(Default)]
43pub(crate) struct ScanMetricsSet {
44    /// Duration to prepare the scan task.
45    prepare_scan_cost: Duration,
46    /// Duration to build the (merge) reader.
47    build_reader_cost: Duration,
48    /// Duration to scan data.
49    scan_cost: Duration,
50    /// Duration while waiting for `yield`.
51    yield_cost: Duration,
52    /// Duration of the scan.
53    total_cost: Duration,
54    /// Number of rows returned.
55    num_rows: usize,
56    /// Number of batches returned.
57    num_batches: usize,
58    /// Number of mem ranges scanned.
59    num_mem_ranges: usize,
60    /// Number of file ranges scanned.
61    num_file_ranges: usize,
62
63    // Memtable related metrics:
64    /// Duration to scan memtables.
65    mem_scan_cost: Duration,
66    /// Number of rows read from memtables.
67    mem_rows: usize,
68    /// Number of batches read from memtables.
69    mem_batches: usize,
70    /// Number of series read from memtables.
71    mem_series: usize,
72
73    // SST related metrics:
74    /// Duration to build file ranges.
75    build_parts_cost: Duration,
76    /// Number of row groups before filtering.
77    rg_total: usize,
78    /// Number of row groups filtered by fulltext index.
79    rg_fulltext_filtered: usize,
80    /// Number of row groups filtered by inverted index.
81    rg_inverted_filtered: usize,
82    /// Number of row groups filtered by min-max index.
83    rg_minmax_filtered: usize,
84    /// Number of row groups filtered by bloom filter index.
85    rg_bloom_filtered: usize,
86    /// Number of rows in row group before filtering.
87    rows_before_filter: usize,
88    /// Number of rows in row group filtered by fulltext index.
89    rows_fulltext_filtered: usize,
90    /// Number of rows in row group filtered by inverted index.
91    rows_inverted_filtered: usize,
92    /// Number of rows in row group filtered by bloom filter index.
93    rows_bloom_filtered: usize,
94    /// Number of rows filtered by precise filter.
95    rows_precise_filtered: usize,
96    /// Number of record batches read from SST.
97    num_sst_record_batches: usize,
98    /// Number of batches decoded from SST.
99    num_sst_batches: usize,
100    /// Number of rows read from SST.
101    num_sst_rows: usize,
102
103    /// Elapsed time before the first poll operation.
104    first_poll: Duration,
105
106    /// Number of send timeout in SeriesScan.
107    num_series_send_timeout: usize,
108    /// Number of send full in SeriesScan.
109    num_series_send_full: usize,
110    /// Number of rows the series distributor scanned.
111    num_distributor_rows: usize,
112    /// Number of batches the series distributor scanned.
113    num_distributor_batches: usize,
114    /// Duration of the series distributor to scan.
115    distributor_scan_cost: Duration,
116    /// Duration of the series distributor to yield.
117    distributor_yield_cost: Duration,
118
119    /// The stream reached EOF
120    stream_eof: bool,
121}
122
123impl fmt::Debug for ScanMetricsSet {
124    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125        let ScanMetricsSet {
126            prepare_scan_cost,
127            build_reader_cost,
128            scan_cost,
129            yield_cost,
130            total_cost,
131            num_rows,
132            num_batches,
133            num_mem_ranges,
134            num_file_ranges,
135            build_parts_cost,
136            rg_total,
137            rg_fulltext_filtered,
138            rg_inverted_filtered,
139            rg_minmax_filtered,
140            rg_bloom_filtered,
141            rows_before_filter,
142            rows_fulltext_filtered,
143            rows_inverted_filtered,
144            rows_bloom_filtered,
145            rows_precise_filtered,
146            num_sst_record_batches,
147            num_sst_batches,
148            num_sst_rows,
149            first_poll,
150            num_series_send_timeout,
151            num_series_send_full,
152            num_distributor_rows,
153            num_distributor_batches,
154            distributor_scan_cost,
155            distributor_yield_cost,
156            stream_eof,
157            mem_scan_cost,
158            mem_rows,
159            mem_batches,
160            mem_series,
161        } = self;
162
163        // Write core metrics
164        write!(
165            f,
166            "{{\"prepare_scan_cost\":\"{prepare_scan_cost:?}\", \
167            \"build_reader_cost\":\"{build_reader_cost:?}\", \
168            \"scan_cost\":\"{scan_cost:?}\", \
169            \"yield_cost\":\"{yield_cost:?}\", \
170            \"total_cost\":\"{total_cost:?}\", \
171            \"num_rows\":{num_rows}, \
172            \"num_batches\":{num_batches}, \
173            \"num_mem_ranges\":{num_mem_ranges}, \
174            \"num_file_ranges\":{num_file_ranges}, \
175            \"build_parts_cost\":\"{build_parts_cost:?}\", \
176            \"rg_total\":{rg_total}, \
177            \"rows_before_filter\":{rows_before_filter}, \
178            \"num_sst_record_batches\":{num_sst_record_batches}, \
179            \"num_sst_batches\":{num_sst_batches}, \
180            \"num_sst_rows\":{num_sst_rows}, \
181            \"first_poll\":\"{first_poll:?}\""
182        )?;
183
184        // Write non-zero filter counters
185        if *rg_fulltext_filtered > 0 {
186            write!(f, ", \"rg_fulltext_filtered\":{rg_fulltext_filtered}")?;
187        }
188        if *rg_inverted_filtered > 0 {
189            write!(f, ", \"rg_inverted_filtered\":{rg_inverted_filtered}")?;
190        }
191        if *rg_minmax_filtered > 0 {
192            write!(f, ", \"rg_minmax_filtered\":{rg_minmax_filtered}")?;
193        }
194        if *rg_bloom_filtered > 0 {
195            write!(f, ", \"rg_bloom_filtered\":{rg_bloom_filtered}")?;
196        }
197        if *rows_fulltext_filtered > 0 {
198            write!(f, ", \"rows_fulltext_filtered\":{rows_fulltext_filtered}")?;
199        }
200        if *rows_inverted_filtered > 0 {
201            write!(f, ", \"rows_inverted_filtered\":{rows_inverted_filtered}")?;
202        }
203        if *rows_bloom_filtered > 0 {
204            write!(f, ", \"rows_bloom_filtered\":{rows_bloom_filtered}")?;
205        }
206        if *rows_precise_filtered > 0 {
207            write!(f, ", \"rows_precise_filtered\":{rows_precise_filtered}")?;
208        }
209
210        // Write non-zero distributor metrics
211        if *num_series_send_timeout > 0 {
212            write!(f, ", \"num_series_send_timeout\":{num_series_send_timeout}")?;
213        }
214        if *num_series_send_full > 0 {
215            write!(f, ", \"num_series_send_full\":{num_series_send_full}")?;
216        }
217        if *num_distributor_rows > 0 {
218            write!(f, ", \"num_distributor_rows\":{num_distributor_rows}")?;
219        }
220        if *num_distributor_batches > 0 {
221            write!(f, ", \"num_distributor_batches\":{num_distributor_batches}")?;
222        }
223        if !distributor_scan_cost.is_zero() {
224            write!(
225                f,
226                ", \"distributor_scan_cost\":\"{distributor_scan_cost:?}\""
227            )?;
228        }
229        if !distributor_yield_cost.is_zero() {
230            write!(
231                f,
232                ", \"distributor_yield_cost\":\"{distributor_yield_cost:?}\""
233            )?;
234        }
235
236        // Write non-zero memtable metrics
237        if *mem_rows > 0 {
238            write!(f, ", \"mem_rows\":{mem_rows}")?;
239        }
240        if *mem_batches > 0 {
241            write!(f, ", \"mem_batches\":{mem_batches}")?;
242        }
243        if *mem_series > 0 {
244            write!(f, ", \"mem_series\":{mem_series}")?;
245        }
246        if !mem_scan_cost.is_zero() {
247            write!(f, ", \"mem_scan_cost\":\"{mem_scan_cost:?}\"")?;
248        }
249
250        write!(f, ", \"stream_eof\":{stream_eof}}}")
251    }
252}
253impl ScanMetricsSet {
254    /// Attaches the `prepare_scan_cost` to the metrics set.
255    fn with_prepare_scan_cost(mut self, cost: Duration) -> Self {
256        self.prepare_scan_cost += cost;
257        self
258    }
259
260    /// Merges the local scanner metrics.
261    fn merge_scanner_metrics(&mut self, other: &ScannerMetrics) {
262        let ScannerMetrics {
263            prepare_scan_cost,
264            build_reader_cost,
265            scan_cost,
266            yield_cost,
267            num_batches,
268            num_rows,
269            num_mem_ranges,
270            num_file_ranges,
271        } = other;
272
273        self.prepare_scan_cost += *prepare_scan_cost;
274        self.build_reader_cost += *build_reader_cost;
275        self.scan_cost += *scan_cost;
276        self.yield_cost += *yield_cost;
277        self.num_rows += *num_rows;
278        self.num_batches += *num_batches;
279        self.num_mem_ranges += *num_mem_ranges;
280        self.num_file_ranges += *num_file_ranges;
281    }
282
283    /// Merges the local reader metrics.
284    fn merge_reader_metrics(&mut self, other: &ReaderMetrics) {
285        let ReaderMetrics {
286            build_cost,
287            filter_metrics:
288                ReaderFilterMetrics {
289                    rg_total,
290                    rg_fulltext_filtered,
291                    rg_inverted_filtered,
292                    rg_minmax_filtered,
293                    rg_bloom_filtered,
294                    rows_total,
295                    rows_fulltext_filtered,
296                    rows_inverted_filtered,
297                    rows_bloom_filtered,
298                    rows_precise_filtered,
299                },
300            num_record_batches,
301            num_batches,
302            num_rows,
303            scan_cost: _,
304        } = other;
305
306        self.build_parts_cost += *build_cost;
307
308        self.rg_total += *rg_total;
309        self.rg_fulltext_filtered += *rg_fulltext_filtered;
310        self.rg_inverted_filtered += *rg_inverted_filtered;
311        self.rg_minmax_filtered += *rg_minmax_filtered;
312        self.rg_bloom_filtered += *rg_bloom_filtered;
313
314        self.rows_before_filter += *rows_total;
315        self.rows_fulltext_filtered += *rows_fulltext_filtered;
316        self.rows_inverted_filtered += *rows_inverted_filtered;
317        self.rows_bloom_filtered += *rows_bloom_filtered;
318        self.rows_precise_filtered += *rows_precise_filtered;
319
320        self.num_sst_record_batches += *num_record_batches;
321        self.num_sst_batches += *num_batches;
322        self.num_sst_rows += *num_rows;
323    }
324
325    /// Sets distributor metrics.
326    fn set_distributor_metrics(&mut self, distributor_metrics: &SeriesDistributorMetrics) {
327        let SeriesDistributorMetrics {
328            num_series_send_timeout,
329            num_series_send_full,
330            num_rows,
331            num_batches,
332            scan_cost,
333            yield_cost,
334        } = distributor_metrics;
335
336        self.num_series_send_timeout += *num_series_send_timeout;
337        self.num_series_send_full += *num_series_send_full;
338        self.num_distributor_rows += *num_rows;
339        self.num_distributor_batches += *num_batches;
340        self.distributor_scan_cost += *scan_cost;
341        self.distributor_yield_cost += *yield_cost;
342    }
343
344    /// Observes metrics.
345    fn observe_metrics(&self) {
346        READ_STAGE_ELAPSED
347            .with_label_values(&["prepare_scan"])
348            .observe(self.prepare_scan_cost.as_secs_f64());
349        READ_STAGE_ELAPSED
350            .with_label_values(&["build_reader"])
351            .observe(self.build_reader_cost.as_secs_f64());
352        READ_STAGE_ELAPSED
353            .with_label_values(&["scan"])
354            .observe(self.scan_cost.as_secs_f64());
355        READ_STAGE_ELAPSED
356            .with_label_values(&["yield"])
357            .observe(self.yield_cost.as_secs_f64());
358        READ_STAGE_ELAPSED
359            .with_label_values(&["total"])
360            .observe(self.total_cost.as_secs_f64());
361        READ_ROWS_RETURN.observe(self.num_rows as f64);
362        READ_BATCHES_RETURN.observe(self.num_batches as f64);
363
364        READ_STAGE_ELAPSED
365            .with_label_values(&["build_parts"])
366            .observe(self.build_parts_cost.as_secs_f64());
367
368        READ_ROW_GROUPS_TOTAL
369            .with_label_values(&["before_filtering"])
370            .inc_by(self.rg_total as u64);
371        READ_ROW_GROUPS_TOTAL
372            .with_label_values(&["fulltext_index_filtered"])
373            .inc_by(self.rg_fulltext_filtered as u64);
374        READ_ROW_GROUPS_TOTAL
375            .with_label_values(&["inverted_index_filtered"])
376            .inc_by(self.rg_inverted_filtered as u64);
377        READ_ROW_GROUPS_TOTAL
378            .with_label_values(&["minmax_index_filtered"])
379            .inc_by(self.rg_minmax_filtered as u64);
380        READ_ROW_GROUPS_TOTAL
381            .with_label_values(&["bloom_filter_index_filtered"])
382            .inc_by(self.rg_bloom_filtered as u64);
383
384        PRECISE_FILTER_ROWS_TOTAL
385            .with_label_values(&["parquet"])
386            .inc_by(self.rows_precise_filtered as u64);
387        READ_ROWS_IN_ROW_GROUP_TOTAL
388            .with_label_values(&["before_filtering"])
389            .inc_by(self.rows_before_filter as u64);
390        READ_ROWS_IN_ROW_GROUP_TOTAL
391            .with_label_values(&["fulltext_index_filtered"])
392            .inc_by(self.rows_fulltext_filtered as u64);
393        READ_ROWS_IN_ROW_GROUP_TOTAL
394            .with_label_values(&["inverted_index_filtered"])
395            .inc_by(self.rows_inverted_filtered as u64);
396        READ_ROWS_IN_ROW_GROUP_TOTAL
397            .with_label_values(&["bloom_filter_index_filtered"])
398            .inc_by(self.rows_bloom_filtered as u64);
399    }
400}
401
402struct PartitionMetricsInner {
403    region_id: RegionId,
404    /// Index of the partition to scan.
405    partition: usize,
406    /// Label to distinguish different scan operation.
407    scanner_type: &'static str,
408    /// Query start time.
409    query_start: Instant,
410    /// Whether to use verbose logging.
411    explain_verbose: bool,
412    /// Verbose scan metrics that only log to debug logs by default.
413    metrics: Mutex<ScanMetricsSet>,
414    in_progress_scan: IntGauge,
415
416    // Normal metrics that always report to the [ExecutionPlanMetricsSet]:
417    /// Duration to build file ranges.
418    build_parts_cost: Time,
419    /// Duration to build the (merge) reader.
420    build_reader_cost: Time,
421    /// Duration to scan data.
422    scan_cost: Time,
423    /// Duration while waiting for `yield`.
424    yield_cost: Time,
425    /// Duration to convert [`Batch`]es.
426    convert_cost: Time,
427}
428
429impl PartitionMetricsInner {
430    fn on_finish(&self, stream_eof: bool) {
431        let mut metrics = self.metrics.lock().unwrap();
432        if metrics.total_cost.is_zero() {
433            metrics.total_cost = self.query_start.elapsed();
434        }
435        if !metrics.stream_eof {
436            metrics.stream_eof = stream_eof;
437        }
438    }
439}
440
441impl Drop for PartitionMetricsInner {
442    fn drop(&mut self) {
443        self.on_finish(false);
444        let metrics = self.metrics.lock().unwrap();
445        metrics.observe_metrics();
446        self.in_progress_scan.dec();
447
448        if self.explain_verbose {
449            common_telemetry::info!(
450                "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}, convert_batch_costs: {}",
451                self.scanner_type, self.region_id, self.partition, metrics, self.convert_cost,
452            );
453        } else {
454            common_telemetry::debug!(
455                "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}, convert_batch_costs: {}",
456                self.scanner_type, self.region_id, self.partition, metrics, self.convert_cost,
457            );
458        }
459    }
460}
461
462/// List of PartitionMetrics.
463#[derive(Default)]
464pub(crate) struct PartitionMetricsList(Mutex<Vec<Option<PartitionMetrics>>>);
465
466impl PartitionMetricsList {
467    /// Sets a new [PartitionMetrics] at the specified partition.
468    pub(crate) fn set(&self, partition: usize, metrics: PartitionMetrics) {
469        let mut list = self.0.lock().unwrap();
470        if list.len() <= partition {
471            list.resize(partition + 1, None);
472        }
473        list[partition] = Some(metrics);
474    }
475
476    /// Format verbose metrics for each partition for explain.
477    pub(crate) fn format_verbose_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result {
478        let list = self.0.lock().unwrap();
479        write!(f, ", \"metrics_per_partition\": ")?;
480        f.debug_list()
481            .entries(list.iter().filter_map(|p| p.as_ref()))
482            .finish()?;
483        write!(f, "}}")
484    }
485}
486
487/// Metrics while reading a partition.
488#[derive(Clone)]
489pub struct PartitionMetrics(Arc<PartitionMetricsInner>);
490
491impl PartitionMetrics {
492    pub(crate) fn new(
493        region_id: RegionId,
494        partition: usize,
495        scanner_type: &'static str,
496        query_start: Instant,
497        explain_verbose: bool,
498        metrics_set: &ExecutionPlanMetricsSet,
499    ) -> Self {
500        let partition_str = partition.to_string();
501        let in_progress_scan = IN_PROGRESS_SCAN.with_label_values(&[scanner_type, &partition_str]);
502        in_progress_scan.inc();
503        let metrics = ScanMetricsSet::default().with_prepare_scan_cost(query_start.elapsed());
504        let inner = PartitionMetricsInner {
505            region_id,
506            partition,
507            scanner_type,
508            query_start,
509            explain_verbose,
510            metrics: Mutex::new(metrics),
511            in_progress_scan,
512            build_parts_cost: MetricBuilder::new(metrics_set)
513                .subset_time("build_parts_cost", partition),
514            build_reader_cost: MetricBuilder::new(metrics_set)
515                .subset_time("build_reader_cost", partition),
516            scan_cost: MetricBuilder::new(metrics_set).subset_time("scan_cost", partition),
517            yield_cost: MetricBuilder::new(metrics_set).subset_time("yield_cost", partition),
518            convert_cost: MetricBuilder::new(metrics_set).subset_time("convert_cost", partition),
519        };
520        Self(Arc::new(inner))
521    }
522
523    pub(crate) fn on_first_poll(&self) {
524        let mut metrics = self.0.metrics.lock().unwrap();
525        metrics.first_poll = self.0.query_start.elapsed();
526    }
527
528    pub(crate) fn inc_num_mem_ranges(&self, num: usize) {
529        let mut metrics = self.0.metrics.lock().unwrap();
530        metrics.num_mem_ranges += num;
531    }
532
533    pub fn inc_num_file_ranges(&self, num: usize) {
534        let mut metrics = self.0.metrics.lock().unwrap();
535        metrics.num_file_ranges += num;
536    }
537
538    /// Merges `build_reader_cost`.
539    pub(crate) fn inc_build_reader_cost(&self, cost: Duration) {
540        self.0.build_reader_cost.add_duration(cost);
541
542        let mut metrics = self.0.metrics.lock().unwrap();
543        metrics.build_reader_cost += cost;
544    }
545
546    pub(crate) fn inc_convert_batch_cost(&self, cost: Duration) {
547        self.0.convert_cost.add_duration(cost);
548    }
549
550    /// Reports memtable scan metrics.
551    pub(crate) fn report_mem_scan_metrics(&self, data: &crate::memtable::MemScanMetricsData) {
552        let mut metrics = self.0.metrics.lock().unwrap();
553        metrics.mem_scan_cost += data.scan_cost;
554        metrics.mem_rows += data.num_rows;
555        metrics.mem_batches += data.num_batches;
556        metrics.mem_series += data.total_series;
557    }
558
559    /// Merges [ScannerMetrics], `build_reader_cost`, `scan_cost` and `yield_cost`.
560    pub(crate) fn merge_metrics(&self, metrics: &ScannerMetrics) {
561        self.0
562            .build_reader_cost
563            .add_duration(metrics.build_reader_cost);
564        self.0.scan_cost.add_duration(metrics.scan_cost);
565        self.0.yield_cost.add_duration(metrics.yield_cost);
566
567        let mut metrics_set = self.0.metrics.lock().unwrap();
568        metrics_set.merge_scanner_metrics(metrics);
569    }
570
571    /// Merges [ReaderMetrics] and `build_reader_cost`.
572    pub fn merge_reader_metrics(&self, metrics: &ReaderMetrics) {
573        self.0.build_parts_cost.add_duration(metrics.build_cost);
574
575        let mut metrics_set = self.0.metrics.lock().unwrap();
576        metrics_set.merge_reader_metrics(metrics);
577    }
578
579    /// Finishes the query.
580    pub(crate) fn on_finish(&self) {
581        self.0.on_finish(true);
582    }
583
584    /// Sets the distributor metrics.
585    pub(crate) fn set_distributor_metrics(&self, metrics: &SeriesDistributorMetrics) {
586        let mut metrics_set = self.0.metrics.lock().unwrap();
587        metrics_set.set_distributor_metrics(metrics);
588    }
589}
590
591impl fmt::Debug for PartitionMetrics {
592    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
593        let metrics = self.0.metrics.lock().unwrap();
594        write!(
595            f,
596            r#"{{"partition":{}, "metrics":{:?}}}"#,
597            self.0.partition, metrics
598        )
599    }
600}
601
602/// Metrics for the series distributor.
603#[derive(Default)]
604pub(crate) struct SeriesDistributorMetrics {
605    /// Number of send timeout in SeriesScan.
606    pub(crate) num_series_send_timeout: usize,
607    /// Number of send full in SeriesScan.
608    pub(crate) num_series_send_full: usize,
609    /// Number of rows the series distributor scanned.
610    pub(crate) num_rows: usize,
611    /// Number of batches the series distributor scanned.
612    pub(crate) num_batches: usize,
613    /// Duration of the series distributor to scan.
614    pub(crate) scan_cost: Duration,
615    /// Duration of the series distributor to yield.
616    pub(crate) yield_cost: Duration,
617}
618
619/// Scans memtable ranges at `index`.
620pub(crate) fn scan_mem_ranges(
621    stream_ctx: Arc<StreamContext>,
622    part_metrics: PartitionMetrics,
623    index: RowGroupIndex,
624    time_range: FileTimeRange,
625) -> impl Stream<Item = Result<Batch>> {
626    try_stream! {
627        let ranges = stream_ctx.input.build_mem_ranges(index);
628        part_metrics.inc_num_mem_ranges(ranges.len());
629        for range in ranges {
630            let build_reader_start = Instant::now();
631            let mem_scan_metrics = Some(MemScanMetrics::default());
632            let iter = range.build_prune_iter(time_range, mem_scan_metrics.clone())?;
633            part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
634
635            let mut source = Source::Iter(iter);
636            while let Some(batch) = source.next_batch().await? {
637                yield batch;
638            }
639
640            // Report the memtable scan metrics to partition metrics
641            if let Some(ref metrics) = mem_scan_metrics {
642                let data = metrics.data();
643                part_metrics.report_mem_scan_metrics(&data);
644            }
645        }
646    }
647}
648
649/// Scans file ranges at `index`.
650pub(crate) async fn scan_file_ranges(
651    stream_ctx: Arc<StreamContext>,
652    part_metrics: PartitionMetrics,
653    index: RowGroupIndex,
654    read_type: &'static str,
655    range_builder: Arc<RangeBuilderList>,
656) -> Result<impl Stream<Item = Result<Batch>>> {
657    let mut reader_metrics = ReaderMetrics::default();
658    let ranges = range_builder
659        .build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
660        .await?;
661    part_metrics.inc_num_file_ranges(ranges.len());
662    part_metrics.merge_reader_metrics(&reader_metrics);
663
664    Ok(build_file_range_scan_stream(
665        stream_ctx,
666        part_metrics,
667        read_type,
668        ranges,
669    ))
670}
671
672/// Build the stream of scanning the input [`FileRange`]s.
673pub fn build_file_range_scan_stream(
674    stream_ctx: Arc<StreamContext>,
675    part_metrics: PartitionMetrics,
676    read_type: &'static str,
677    ranges: SmallVec<[FileRange; 2]>,
678) -> impl Stream<Item = Result<Batch>> {
679    try_stream! {
680        let reader_metrics = &mut ReaderMetrics::default();
681        for range in ranges {
682            let build_reader_start = Instant::now();
683            let reader = range.reader(stream_ctx.input.series_row_selector).await?;
684            let build_cost = build_reader_start.elapsed();
685            part_metrics.inc_build_reader_cost(build_cost);
686            let compat_batch = range.compat_batch();
687            let mut source = Source::PruneReader(reader);
688            while let Some(mut batch) = source.next_batch().await? {
689                if let Some(compact_batch) = compat_batch {
690                    batch = compact_batch.compat_batch(batch)?;
691                }
692                yield batch;
693            }
694            if let Source::PruneReader(reader) = source {
695                let prune_metrics = reader.metrics();
696                reader_metrics.merge_from(&prune_metrics);
697            }
698        }
699
700        // Reports metrics.
701        reader_metrics.observe_rows(read_type);
702        reader_metrics.filter_metrics.observe();
703        part_metrics.merge_reader_metrics(reader_metrics);
704    }
705}
706
707/// Build the stream of scanning the extension range denoted by the [`RowGroupIndex`].
708#[cfg(feature = "enterprise")]
709pub(crate) async fn scan_extension_range(
710    context: Arc<StreamContext>,
711    index: RowGroupIndex,
712    partition_metrics: PartitionMetrics,
713) -> Result<BoxedBatchStream> {
714    use snafu::ResultExt;
715
716    let range = context.input.extension_range(index.index);
717    let reader = range.reader(context.as_ref());
718    let stream = reader
719        .read(context, partition_metrics, index)
720        .await
721        .context(crate::error::ScanExternalRangeSnafu)?;
722    Ok(stream)
723}
724
725pub(crate) async fn maybe_scan_other_ranges(
726    context: &Arc<StreamContext>,
727    index: RowGroupIndex,
728    metrics: &PartitionMetrics,
729) -> Result<BoxedBatchStream> {
730    #[cfg(feature = "enterprise")]
731    {
732        scan_extension_range(context.clone(), index, metrics.clone()).await
733    }
734
735    #[cfg(not(feature = "enterprise"))]
736    {
737        let _ = context;
738        let _ = index;
739        let _ = metrics;
740
741        crate::error::UnexpectedSnafu {
742            reason: "no other ranges scannable",
743        }
744        .fail()
745    }
746}