mito2/read/
scan_util.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities for scanners.
16
17use std::fmt;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use async_stream::try_stream;
22use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time};
23use datatypes::arrow::record_batch::RecordBatch;
24use futures::Stream;
25use prometheus::IntGauge;
26use smallvec::SmallVec;
27use snafu::OptionExt;
28use store_api::storage::RegionId;
29
30use crate::error::{Result, UnexpectedSnafu};
31use crate::memtable::MemScanMetrics;
32use crate::metrics::{
33    IN_PROGRESS_SCAN, PRECISE_FILTER_ROWS_TOTAL, READ_BATCHES_RETURN, READ_ROW_GROUPS_TOTAL,
34    READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_RETURN, READ_STAGE_ELAPSED,
35};
36use crate::read::range::{RangeBuilderList, RowGroupIndex};
37use crate::read::scan_region::StreamContext;
38use crate::read::{Batch, BoxedBatchStream, BoxedRecordBatchStream, ScannerMetrics, Source};
39use crate::sst::file::FileTimeRange;
40use crate::sst::parquet::file_range::FileRange;
41use crate::sst::parquet::reader::{ReaderFilterMetrics, ReaderMetrics};
42
43/// Verbose scan metrics for a partition.
44#[derive(Default)]
45pub(crate) struct ScanMetricsSet {
46    /// Duration to prepare the scan task.
47    prepare_scan_cost: Duration,
48    /// Duration to build the (merge) reader.
49    build_reader_cost: Duration,
50    /// Duration to scan data.
51    scan_cost: Duration,
52    /// Duration while waiting for `yield`.
53    yield_cost: Duration,
54    /// Duration of the scan.
55    total_cost: Duration,
56    /// Number of rows returned.
57    num_rows: usize,
58    /// Number of batches returned.
59    num_batches: usize,
60    /// Number of mem ranges scanned.
61    num_mem_ranges: usize,
62    /// Number of file ranges scanned.
63    num_file_ranges: usize,
64
65    // Memtable related metrics:
66    /// Duration to scan memtables.
67    mem_scan_cost: Duration,
68    /// Number of rows read from memtables.
69    mem_rows: usize,
70    /// Number of batches read from memtables.
71    mem_batches: usize,
72    /// Number of series read from memtables.
73    mem_series: usize,
74
75    // SST related metrics:
76    /// Duration to build file ranges.
77    build_parts_cost: Duration,
78    /// Number of row groups before filtering.
79    rg_total: usize,
80    /// Number of row groups filtered by fulltext index.
81    rg_fulltext_filtered: usize,
82    /// Number of row groups filtered by inverted index.
83    rg_inverted_filtered: usize,
84    /// Number of row groups filtered by min-max index.
85    rg_minmax_filtered: usize,
86    /// Number of row groups filtered by bloom filter index.
87    rg_bloom_filtered: usize,
88    /// Number of rows in row group before filtering.
89    rows_before_filter: usize,
90    /// Number of rows in row group filtered by fulltext index.
91    rows_fulltext_filtered: usize,
92    /// Number of rows in row group filtered by inverted index.
93    rows_inverted_filtered: usize,
94    /// Number of rows in row group filtered by bloom filter index.
95    rows_bloom_filtered: usize,
96    /// Number of rows filtered by precise filter.
97    rows_precise_filtered: usize,
98    /// Number of record batches read from SST.
99    num_sst_record_batches: usize,
100    /// Number of batches decoded from SST.
101    num_sst_batches: usize,
102    /// Number of rows read from SST.
103    num_sst_rows: usize,
104
105    /// Elapsed time before the first poll operation.
106    first_poll: Duration,
107
108    /// Number of send timeout in SeriesScan.
109    num_series_send_timeout: usize,
110    /// Number of send full in SeriesScan.
111    num_series_send_full: usize,
112    /// Number of rows the series distributor scanned.
113    num_distributor_rows: usize,
114    /// Number of batches the series distributor scanned.
115    num_distributor_batches: usize,
116    /// Duration of the series distributor to scan.
117    distributor_scan_cost: Duration,
118    /// Duration of the series distributor to yield.
119    distributor_yield_cost: Duration,
120
121    /// The stream reached EOF
122    stream_eof: bool,
123}
124
125impl fmt::Debug for ScanMetricsSet {
126    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127        let ScanMetricsSet {
128            prepare_scan_cost,
129            build_reader_cost,
130            scan_cost,
131            yield_cost,
132            total_cost,
133            num_rows,
134            num_batches,
135            num_mem_ranges,
136            num_file_ranges,
137            build_parts_cost,
138            rg_total,
139            rg_fulltext_filtered,
140            rg_inverted_filtered,
141            rg_minmax_filtered,
142            rg_bloom_filtered,
143            rows_before_filter,
144            rows_fulltext_filtered,
145            rows_inverted_filtered,
146            rows_bloom_filtered,
147            rows_precise_filtered,
148            num_sst_record_batches,
149            num_sst_batches,
150            num_sst_rows,
151            first_poll,
152            num_series_send_timeout,
153            num_series_send_full,
154            num_distributor_rows,
155            num_distributor_batches,
156            distributor_scan_cost,
157            distributor_yield_cost,
158            stream_eof,
159            mem_scan_cost,
160            mem_rows,
161            mem_batches,
162            mem_series,
163        } = self;
164
165        // Write core metrics
166        write!(
167            f,
168            "{{\"prepare_scan_cost\":\"{prepare_scan_cost:?}\", \
169            \"build_reader_cost\":\"{build_reader_cost:?}\", \
170            \"scan_cost\":\"{scan_cost:?}\", \
171            \"yield_cost\":\"{yield_cost:?}\", \
172            \"total_cost\":\"{total_cost:?}\", \
173            \"num_rows\":{num_rows}, \
174            \"num_batches\":{num_batches}, \
175            \"num_mem_ranges\":{num_mem_ranges}, \
176            \"num_file_ranges\":{num_file_ranges}, \
177            \"build_parts_cost\":\"{build_parts_cost:?}\", \
178            \"rg_total\":{rg_total}, \
179            \"rows_before_filter\":{rows_before_filter}, \
180            \"num_sst_record_batches\":{num_sst_record_batches}, \
181            \"num_sst_batches\":{num_sst_batches}, \
182            \"num_sst_rows\":{num_sst_rows}, \
183            \"first_poll\":\"{first_poll:?}\""
184        )?;
185
186        // Write non-zero filter counters
187        if *rg_fulltext_filtered > 0 {
188            write!(f, ", \"rg_fulltext_filtered\":{rg_fulltext_filtered}")?;
189        }
190        if *rg_inverted_filtered > 0 {
191            write!(f, ", \"rg_inverted_filtered\":{rg_inverted_filtered}")?;
192        }
193        if *rg_minmax_filtered > 0 {
194            write!(f, ", \"rg_minmax_filtered\":{rg_minmax_filtered}")?;
195        }
196        if *rg_bloom_filtered > 0 {
197            write!(f, ", \"rg_bloom_filtered\":{rg_bloom_filtered}")?;
198        }
199        if *rows_fulltext_filtered > 0 {
200            write!(f, ", \"rows_fulltext_filtered\":{rows_fulltext_filtered}")?;
201        }
202        if *rows_inverted_filtered > 0 {
203            write!(f, ", \"rows_inverted_filtered\":{rows_inverted_filtered}")?;
204        }
205        if *rows_bloom_filtered > 0 {
206            write!(f, ", \"rows_bloom_filtered\":{rows_bloom_filtered}")?;
207        }
208        if *rows_precise_filtered > 0 {
209            write!(f, ", \"rows_precise_filtered\":{rows_precise_filtered}")?;
210        }
211
212        // Write non-zero distributor metrics
213        if *num_series_send_timeout > 0 {
214            write!(f, ", \"num_series_send_timeout\":{num_series_send_timeout}")?;
215        }
216        if *num_series_send_full > 0 {
217            write!(f, ", \"num_series_send_full\":{num_series_send_full}")?;
218        }
219        if *num_distributor_rows > 0 {
220            write!(f, ", \"num_distributor_rows\":{num_distributor_rows}")?;
221        }
222        if *num_distributor_batches > 0 {
223            write!(f, ", \"num_distributor_batches\":{num_distributor_batches}")?;
224        }
225        if !distributor_scan_cost.is_zero() {
226            write!(
227                f,
228                ", \"distributor_scan_cost\":\"{distributor_scan_cost:?}\""
229            )?;
230        }
231        if !distributor_yield_cost.is_zero() {
232            write!(
233                f,
234                ", \"distributor_yield_cost\":\"{distributor_yield_cost:?}\""
235            )?;
236        }
237
238        // Write non-zero memtable metrics
239        if *mem_rows > 0 {
240            write!(f, ", \"mem_rows\":{mem_rows}")?;
241        }
242        if *mem_batches > 0 {
243            write!(f, ", \"mem_batches\":{mem_batches}")?;
244        }
245        if *mem_series > 0 {
246            write!(f, ", \"mem_series\":{mem_series}")?;
247        }
248        if !mem_scan_cost.is_zero() {
249            write!(f, ", \"mem_scan_cost\":\"{mem_scan_cost:?}\"")?;
250        }
251
252        write!(f, ", \"stream_eof\":{stream_eof}}}")
253    }
254}
255impl ScanMetricsSet {
256    /// Attaches the `prepare_scan_cost` to the metrics set.
257    fn with_prepare_scan_cost(mut self, cost: Duration) -> Self {
258        self.prepare_scan_cost += cost;
259        self
260    }
261
262    /// Merges the local scanner metrics.
263    fn merge_scanner_metrics(&mut self, other: &ScannerMetrics) {
264        let ScannerMetrics {
265            prepare_scan_cost,
266            build_reader_cost,
267            scan_cost,
268            yield_cost,
269            num_batches,
270            num_rows,
271            num_mem_ranges,
272            num_file_ranges,
273        } = other;
274
275        self.prepare_scan_cost += *prepare_scan_cost;
276        self.build_reader_cost += *build_reader_cost;
277        self.scan_cost += *scan_cost;
278        self.yield_cost += *yield_cost;
279        self.num_rows += *num_rows;
280        self.num_batches += *num_batches;
281        self.num_mem_ranges += *num_mem_ranges;
282        self.num_file_ranges += *num_file_ranges;
283    }
284
285    /// Merges the local reader metrics.
286    fn merge_reader_metrics(&mut self, other: &ReaderMetrics) {
287        let ReaderMetrics {
288            build_cost,
289            filter_metrics:
290                ReaderFilterMetrics {
291                    rg_total,
292                    rg_fulltext_filtered,
293                    rg_inverted_filtered,
294                    rg_minmax_filtered,
295                    rg_bloom_filtered,
296                    rows_total,
297                    rows_fulltext_filtered,
298                    rows_inverted_filtered,
299                    rows_bloom_filtered,
300                    rows_precise_filtered,
301                },
302            num_record_batches,
303            num_batches,
304            num_rows,
305            scan_cost: _,
306        } = other;
307
308        self.build_parts_cost += *build_cost;
309
310        self.rg_total += *rg_total;
311        self.rg_fulltext_filtered += *rg_fulltext_filtered;
312        self.rg_inverted_filtered += *rg_inverted_filtered;
313        self.rg_minmax_filtered += *rg_minmax_filtered;
314        self.rg_bloom_filtered += *rg_bloom_filtered;
315
316        self.rows_before_filter += *rows_total;
317        self.rows_fulltext_filtered += *rows_fulltext_filtered;
318        self.rows_inverted_filtered += *rows_inverted_filtered;
319        self.rows_bloom_filtered += *rows_bloom_filtered;
320        self.rows_precise_filtered += *rows_precise_filtered;
321
322        self.num_sst_record_batches += *num_record_batches;
323        self.num_sst_batches += *num_batches;
324        self.num_sst_rows += *num_rows;
325    }
326
327    /// Sets distributor metrics.
328    fn set_distributor_metrics(&mut self, distributor_metrics: &SeriesDistributorMetrics) {
329        let SeriesDistributorMetrics {
330            num_series_send_timeout,
331            num_series_send_full,
332            num_rows,
333            num_batches,
334            scan_cost,
335            yield_cost,
336        } = distributor_metrics;
337
338        self.num_series_send_timeout += *num_series_send_timeout;
339        self.num_series_send_full += *num_series_send_full;
340        self.num_distributor_rows += *num_rows;
341        self.num_distributor_batches += *num_batches;
342        self.distributor_scan_cost += *scan_cost;
343        self.distributor_yield_cost += *yield_cost;
344    }
345
346    /// Observes metrics.
347    fn observe_metrics(&self) {
348        READ_STAGE_ELAPSED
349            .with_label_values(&["prepare_scan"])
350            .observe(self.prepare_scan_cost.as_secs_f64());
351        READ_STAGE_ELAPSED
352            .with_label_values(&["build_reader"])
353            .observe(self.build_reader_cost.as_secs_f64());
354        READ_STAGE_ELAPSED
355            .with_label_values(&["scan"])
356            .observe(self.scan_cost.as_secs_f64());
357        READ_STAGE_ELAPSED
358            .with_label_values(&["yield"])
359            .observe(self.yield_cost.as_secs_f64());
360        READ_STAGE_ELAPSED
361            .with_label_values(&["total"])
362            .observe(self.total_cost.as_secs_f64());
363        READ_ROWS_RETURN.observe(self.num_rows as f64);
364        READ_BATCHES_RETURN.observe(self.num_batches as f64);
365
366        READ_STAGE_ELAPSED
367            .with_label_values(&["build_parts"])
368            .observe(self.build_parts_cost.as_secs_f64());
369
370        READ_ROW_GROUPS_TOTAL
371            .with_label_values(&["before_filtering"])
372            .inc_by(self.rg_total as u64);
373        READ_ROW_GROUPS_TOTAL
374            .with_label_values(&["fulltext_index_filtered"])
375            .inc_by(self.rg_fulltext_filtered as u64);
376        READ_ROW_GROUPS_TOTAL
377            .with_label_values(&["inverted_index_filtered"])
378            .inc_by(self.rg_inverted_filtered as u64);
379        READ_ROW_GROUPS_TOTAL
380            .with_label_values(&["minmax_index_filtered"])
381            .inc_by(self.rg_minmax_filtered as u64);
382        READ_ROW_GROUPS_TOTAL
383            .with_label_values(&["bloom_filter_index_filtered"])
384            .inc_by(self.rg_bloom_filtered as u64);
385
386        PRECISE_FILTER_ROWS_TOTAL
387            .with_label_values(&["parquet"])
388            .inc_by(self.rows_precise_filtered as u64);
389        READ_ROWS_IN_ROW_GROUP_TOTAL
390            .with_label_values(&["before_filtering"])
391            .inc_by(self.rows_before_filter as u64);
392        READ_ROWS_IN_ROW_GROUP_TOTAL
393            .with_label_values(&["fulltext_index_filtered"])
394            .inc_by(self.rows_fulltext_filtered as u64);
395        READ_ROWS_IN_ROW_GROUP_TOTAL
396            .with_label_values(&["inverted_index_filtered"])
397            .inc_by(self.rows_inverted_filtered as u64);
398        READ_ROWS_IN_ROW_GROUP_TOTAL
399            .with_label_values(&["bloom_filter_index_filtered"])
400            .inc_by(self.rows_bloom_filtered as u64);
401    }
402}
403
404struct PartitionMetricsInner {
405    region_id: RegionId,
406    /// Index of the partition to scan.
407    partition: usize,
408    /// Label to distinguish different scan operation.
409    scanner_type: &'static str,
410    /// Query start time.
411    query_start: Instant,
412    /// Whether to use verbose logging.
413    explain_verbose: bool,
414    /// Verbose scan metrics that only log to debug logs by default.
415    metrics: Mutex<ScanMetricsSet>,
416    in_progress_scan: IntGauge,
417
418    // Normal metrics that always report to the [ExecutionPlanMetricsSet]:
419    /// Duration to build file ranges.
420    build_parts_cost: Time,
421    /// Duration to build the (merge) reader.
422    build_reader_cost: Time,
423    /// Duration to scan data.
424    scan_cost: Time,
425    /// Duration while waiting for `yield`.
426    yield_cost: Time,
427    /// Duration to convert [`Batch`]es.
428    convert_cost: Time,
429}
430
431impl PartitionMetricsInner {
432    fn on_finish(&self, stream_eof: bool) {
433        let mut metrics = self.metrics.lock().unwrap();
434        if metrics.total_cost.is_zero() {
435            metrics.total_cost = self.query_start.elapsed();
436        }
437        if !metrics.stream_eof {
438            metrics.stream_eof = stream_eof;
439        }
440    }
441}
442
443impl Drop for PartitionMetricsInner {
444    fn drop(&mut self) {
445        self.on_finish(false);
446        let metrics = self.metrics.lock().unwrap();
447        metrics.observe_metrics();
448        self.in_progress_scan.dec();
449
450        if self.explain_verbose {
451            common_telemetry::info!(
452                "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}, convert_batch_costs: {}",
453                self.scanner_type,
454                self.region_id,
455                self.partition,
456                metrics,
457                self.convert_cost,
458            );
459        } else {
460            common_telemetry::debug!(
461                "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}, convert_batch_costs: {}",
462                self.scanner_type,
463                self.region_id,
464                self.partition,
465                metrics,
466                self.convert_cost,
467            );
468        }
469    }
470}
471
472/// List of PartitionMetrics.
473#[derive(Default)]
474pub(crate) struct PartitionMetricsList(Mutex<Vec<Option<PartitionMetrics>>>);
475
476impl PartitionMetricsList {
477    /// Sets a new [PartitionMetrics] at the specified partition.
478    pub(crate) fn set(&self, partition: usize, metrics: PartitionMetrics) {
479        let mut list = self.0.lock().unwrap();
480        if list.len() <= partition {
481            list.resize(partition + 1, None);
482        }
483        list[partition] = Some(metrics);
484    }
485
486    /// Format verbose metrics for each partition for explain.
487    pub(crate) fn format_verbose_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result {
488        let list = self.0.lock().unwrap();
489        write!(f, ", \"metrics_per_partition\": ")?;
490        f.debug_list()
491            .entries(list.iter().filter_map(|p| p.as_ref()))
492            .finish()?;
493        write!(f, "}}")
494    }
495}
496
497/// Metrics while reading a partition.
498#[derive(Clone)]
499pub struct PartitionMetrics(Arc<PartitionMetricsInner>);
500
501impl PartitionMetrics {
502    pub(crate) fn new(
503        region_id: RegionId,
504        partition: usize,
505        scanner_type: &'static str,
506        query_start: Instant,
507        explain_verbose: bool,
508        metrics_set: &ExecutionPlanMetricsSet,
509    ) -> Self {
510        let partition_str = partition.to_string();
511        let in_progress_scan = IN_PROGRESS_SCAN.with_label_values(&[scanner_type, &partition_str]);
512        in_progress_scan.inc();
513        let metrics = ScanMetricsSet::default().with_prepare_scan_cost(query_start.elapsed());
514        let inner = PartitionMetricsInner {
515            region_id,
516            partition,
517            scanner_type,
518            query_start,
519            explain_verbose,
520            metrics: Mutex::new(metrics),
521            in_progress_scan,
522            build_parts_cost: MetricBuilder::new(metrics_set)
523                .subset_time("build_parts_cost", partition),
524            build_reader_cost: MetricBuilder::new(metrics_set)
525                .subset_time("build_reader_cost", partition),
526            scan_cost: MetricBuilder::new(metrics_set).subset_time("scan_cost", partition),
527            yield_cost: MetricBuilder::new(metrics_set).subset_time("yield_cost", partition),
528            convert_cost: MetricBuilder::new(metrics_set).subset_time("convert_cost", partition),
529        };
530        Self(Arc::new(inner))
531    }
532
533    pub(crate) fn on_first_poll(&self) {
534        let mut metrics = self.0.metrics.lock().unwrap();
535        metrics.first_poll = self.0.query_start.elapsed();
536    }
537
538    pub(crate) fn inc_num_mem_ranges(&self, num: usize) {
539        let mut metrics = self.0.metrics.lock().unwrap();
540        metrics.num_mem_ranges += num;
541    }
542
543    pub fn inc_num_file_ranges(&self, num: usize) {
544        let mut metrics = self.0.metrics.lock().unwrap();
545        metrics.num_file_ranges += num;
546    }
547
548    /// Merges `build_reader_cost`.
549    pub(crate) fn inc_build_reader_cost(&self, cost: Duration) {
550        self.0.build_reader_cost.add_duration(cost);
551
552        let mut metrics = self.0.metrics.lock().unwrap();
553        metrics.build_reader_cost += cost;
554    }
555
556    pub(crate) fn inc_convert_batch_cost(&self, cost: Duration) {
557        self.0.convert_cost.add_duration(cost);
558    }
559
560    /// Reports memtable scan metrics.
561    pub(crate) fn report_mem_scan_metrics(&self, data: &crate::memtable::MemScanMetricsData) {
562        let mut metrics = self.0.metrics.lock().unwrap();
563        metrics.mem_scan_cost += data.scan_cost;
564        metrics.mem_rows += data.num_rows;
565        metrics.mem_batches += data.num_batches;
566        metrics.mem_series += data.total_series;
567    }
568
569    /// Merges [ScannerMetrics], `build_reader_cost`, `scan_cost` and `yield_cost`.
570    pub(crate) fn merge_metrics(&self, metrics: &ScannerMetrics) {
571        self.0
572            .build_reader_cost
573            .add_duration(metrics.build_reader_cost);
574        self.0.scan_cost.add_duration(metrics.scan_cost);
575        self.0.yield_cost.add_duration(metrics.yield_cost);
576
577        let mut metrics_set = self.0.metrics.lock().unwrap();
578        metrics_set.merge_scanner_metrics(metrics);
579    }
580
581    /// Merges [ReaderMetrics] and `build_reader_cost`.
582    pub fn merge_reader_metrics(&self, metrics: &ReaderMetrics) {
583        self.0.build_parts_cost.add_duration(metrics.build_cost);
584
585        let mut metrics_set = self.0.metrics.lock().unwrap();
586        metrics_set.merge_reader_metrics(metrics);
587    }
588
589    /// Finishes the query.
590    pub(crate) fn on_finish(&self) {
591        self.0.on_finish(true);
592    }
593
594    /// Sets the distributor metrics.
595    pub(crate) fn set_distributor_metrics(&self, metrics: &SeriesDistributorMetrics) {
596        let mut metrics_set = self.0.metrics.lock().unwrap();
597        metrics_set.set_distributor_metrics(metrics);
598    }
599}
600
601impl fmt::Debug for PartitionMetrics {
602    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
603        let metrics = self.0.metrics.lock().unwrap();
604        write!(
605            f,
606            r#"{{"partition":{}, "metrics":{:?}}}"#,
607            self.0.partition, metrics
608        )
609    }
610}
611
612/// Metrics for the series distributor.
613#[derive(Default)]
614pub(crate) struct SeriesDistributorMetrics {
615    /// Number of send timeout in SeriesScan.
616    pub(crate) num_series_send_timeout: usize,
617    /// Number of send full in SeriesScan.
618    pub(crate) num_series_send_full: usize,
619    /// Number of rows the series distributor scanned.
620    pub(crate) num_rows: usize,
621    /// Number of batches the series distributor scanned.
622    pub(crate) num_batches: usize,
623    /// Duration of the series distributor to scan.
624    pub(crate) scan_cost: Duration,
625    /// Duration of the series distributor to yield.
626    pub(crate) yield_cost: Duration,
627}
628
629/// Scans memtable ranges at `index`.
630pub(crate) fn scan_mem_ranges(
631    stream_ctx: Arc<StreamContext>,
632    part_metrics: PartitionMetrics,
633    index: RowGroupIndex,
634    time_range: FileTimeRange,
635) -> impl Stream<Item = Result<Batch>> {
636    try_stream! {
637        let ranges = stream_ctx.input.build_mem_ranges(index);
638        part_metrics.inc_num_mem_ranges(ranges.len());
639        for range in ranges {
640            let build_reader_start = Instant::now();
641            let mem_scan_metrics = Some(MemScanMetrics::default());
642            let iter = range.build_prune_iter(time_range, mem_scan_metrics.clone())?;
643            part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
644
645            let mut source = Source::Iter(iter);
646            while let Some(batch) = source.next_batch().await? {
647                yield batch;
648            }
649
650            // Report the memtable scan metrics to partition metrics
651            if let Some(ref metrics) = mem_scan_metrics {
652                let data = metrics.data();
653                part_metrics.report_mem_scan_metrics(&data);
654            }
655        }
656    }
657}
658
659/// Scans memtable ranges at `index` using flat format that returns RecordBatch.
660pub(crate) fn scan_flat_mem_ranges(
661    stream_ctx: Arc<StreamContext>,
662    part_metrics: PartitionMetrics,
663    index: RowGroupIndex,
664) -> impl Stream<Item = Result<RecordBatch>> {
665    try_stream! {
666        let ranges = stream_ctx.input.build_mem_ranges(index);
667        part_metrics.inc_num_mem_ranges(ranges.len());
668        for range in ranges {
669            let build_reader_start = Instant::now();
670            let mem_scan_metrics = Some(MemScanMetrics::default());
671            let mut iter = range.build_record_batch_iter(mem_scan_metrics.clone())?;
672            part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
673
674            while let Some(record_batch) = iter.next().transpose()? {
675                yield record_batch;
676            }
677
678            // Report the memtable scan metrics to partition metrics
679            if let Some(ref metrics) = mem_scan_metrics {
680                let data = metrics.data();
681                part_metrics.report_mem_scan_metrics(&data);
682            }
683        }
684    }
685}
686
687/// Scans file ranges at `index`.
688pub(crate) async fn scan_file_ranges(
689    stream_ctx: Arc<StreamContext>,
690    part_metrics: PartitionMetrics,
691    index: RowGroupIndex,
692    read_type: &'static str,
693    range_builder: Arc<RangeBuilderList>,
694) -> Result<impl Stream<Item = Result<Batch>>> {
695    let mut reader_metrics = ReaderMetrics::default();
696    let ranges = range_builder
697        .build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
698        .await?;
699    part_metrics.inc_num_file_ranges(ranges.len());
700    part_metrics.merge_reader_metrics(&reader_metrics);
701
702    Ok(build_file_range_scan_stream(
703        stream_ctx,
704        part_metrics,
705        read_type,
706        ranges,
707    ))
708}
709
710/// Scans file ranges at `index` using flat reader that returns RecordBatch.
711pub(crate) async fn scan_flat_file_ranges(
712    stream_ctx: Arc<StreamContext>,
713    part_metrics: PartitionMetrics,
714    index: RowGroupIndex,
715    read_type: &'static str,
716    range_builder: Arc<RangeBuilderList>,
717) -> Result<impl Stream<Item = Result<RecordBatch>>> {
718    let mut reader_metrics = ReaderMetrics::default();
719    let ranges = range_builder
720        .build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
721        .await?;
722    part_metrics.inc_num_file_ranges(ranges.len());
723    part_metrics.merge_reader_metrics(&reader_metrics);
724
725    Ok(build_flat_file_range_scan_stream(
726        stream_ctx,
727        part_metrics,
728        read_type,
729        ranges,
730    ))
731}
732
733/// Build the stream of scanning the input [`FileRange`]s.
734pub fn build_file_range_scan_stream(
735    stream_ctx: Arc<StreamContext>,
736    part_metrics: PartitionMetrics,
737    read_type: &'static str,
738    ranges: SmallVec<[FileRange; 2]>,
739) -> impl Stream<Item = Result<Batch>> {
740    try_stream! {
741        let reader_metrics = &mut ReaderMetrics::default();
742        for range in ranges {
743            let build_reader_start = Instant::now();
744            let reader = range.reader(stream_ctx.input.series_row_selector).await?;
745            let build_cost = build_reader_start.elapsed();
746            part_metrics.inc_build_reader_cost(build_cost);
747            let compat_batch = range.compat_batch();
748            let mut source = Source::PruneReader(reader);
749            while let Some(mut batch) = source.next_batch().await? {
750                if let Some(compact_batch) = compat_batch {
751                    batch = compact_batch.as_primary_key().unwrap().compat_batch(batch)?;
752                }
753                yield batch;
754            }
755            if let Source::PruneReader(reader) = source {
756                let prune_metrics = reader.metrics();
757                reader_metrics.merge_from(&prune_metrics);
758            }
759        }
760
761        // Reports metrics.
762        reader_metrics.observe_rows(read_type);
763        reader_metrics.filter_metrics.observe();
764        part_metrics.merge_reader_metrics(reader_metrics);
765    }
766}
767
768/// Build the stream of scanning the input [`FileRange`]s using flat reader that returns RecordBatch.
769pub fn build_flat_file_range_scan_stream(
770    _stream_ctx: Arc<StreamContext>,
771    part_metrics: PartitionMetrics,
772    read_type: &'static str,
773    ranges: SmallVec<[FileRange; 2]>,
774) -> impl Stream<Item = Result<RecordBatch>> {
775    try_stream! {
776        let reader_metrics = &mut ReaderMetrics::default();
777        for range in ranges {
778            let build_reader_start = Instant::now();
779            let mut reader = range.flat_reader().await?;
780            let build_cost = build_reader_start.elapsed();
781            part_metrics.inc_build_reader_cost(build_cost);
782
783            let may_compat = range
784                .compat_batch()
785                .map(|compat| {
786                    compat.as_flat().context(UnexpectedSnafu {
787                        reason: "Invalid compat for flat format",
788                    })
789                })
790                .transpose()?;
791            while let Some(record_batch) = reader.next_batch()? {
792                if let Some(flat_compat) = may_compat {
793                    let batch = flat_compat.compat(record_batch)?;
794                    yield batch;
795                } else {
796                    yield record_batch;
797                }
798            }
799
800            let prune_metrics = reader.metrics();
801            reader_metrics.merge_from(&prune_metrics);
802        }
803
804        // Reports metrics.
805        reader_metrics.observe_rows(read_type);
806        reader_metrics.filter_metrics.observe();
807        part_metrics.merge_reader_metrics(reader_metrics);
808    }
809}
810
811/// Build the stream of scanning the extension range denoted by the [`RowGroupIndex`].
812#[cfg(feature = "enterprise")]
813pub(crate) async fn scan_extension_range(
814    context: Arc<StreamContext>,
815    index: RowGroupIndex,
816    partition_metrics: PartitionMetrics,
817) -> Result<BoxedBatchStream> {
818    use snafu::ResultExt;
819
820    let range = context.input.extension_range(index.index);
821    let reader = range.reader(context.as_ref());
822    let stream = reader
823        .read(context, partition_metrics, index)
824        .await
825        .context(crate::error::ScanExternalRangeSnafu)?;
826    Ok(stream)
827}
828
829pub(crate) async fn maybe_scan_other_ranges(
830    context: &Arc<StreamContext>,
831    index: RowGroupIndex,
832    metrics: &PartitionMetrics,
833) -> Result<BoxedBatchStream> {
834    #[cfg(feature = "enterprise")]
835    {
836        scan_extension_range(context.clone(), index, metrics.clone()).await
837    }
838
839    #[cfg(not(feature = "enterprise"))]
840    {
841        let _ = context;
842        let _ = index;
843        let _ = metrics;
844
845        crate::error::UnexpectedSnafu {
846            reason: "no other ranges scannable",
847        }
848        .fail()
849    }
850}
851
852pub(crate) async fn maybe_scan_flat_other_ranges(
853    context: &Arc<StreamContext>,
854    index: RowGroupIndex,
855    metrics: &PartitionMetrics,
856) -> Result<BoxedRecordBatchStream> {
857    let _ = context;
858    let _ = index;
859    let _ = metrics;
860
861    crate::error::UnexpectedSnafu {
862        reason: "no other ranges scannable in flat format",
863    }
864    .fail()
865}