mito2/read/
scan_util.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities for scanners.
16
17use std::fmt;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use async_stream::try_stream;
22use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time};
23use datatypes::arrow::record_batch::RecordBatch;
24use futures::Stream;
25use prometheus::IntGauge;
26use smallvec::SmallVec;
27use snafu::OptionExt;
28use store_api::storage::RegionId;
29
30use crate::error::{Result, UnexpectedSnafu};
31use crate::memtable::MemScanMetrics;
32use crate::metrics::{
33    IN_PROGRESS_SCAN, PRECISE_FILTER_ROWS_TOTAL, READ_BATCHES_RETURN, READ_ROW_GROUPS_TOTAL,
34    READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_RETURN, READ_STAGE_ELAPSED,
35};
36use crate::read::range::{RangeBuilderList, RowGroupIndex};
37use crate::read::scan_region::StreamContext;
38use crate::read::{Batch, BoxedBatchStream, BoxedRecordBatchStream, ScannerMetrics, Source};
39use crate::sst::file::FileTimeRange;
40use crate::sst::parquet::file_range::FileRange;
41use crate::sst::parquet::reader::{ReaderFilterMetrics, ReaderMetrics};
42
43/// Verbose scan metrics for a partition.
44#[derive(Default)]
45pub(crate) struct ScanMetricsSet {
46    /// Duration to prepare the scan task.
47    prepare_scan_cost: Duration,
48    /// Duration to build the (merge) reader.
49    build_reader_cost: Duration,
50    /// Duration to scan data.
51    scan_cost: Duration,
52    /// Duration while waiting for `yield`.
53    yield_cost: Duration,
54    /// Duration of the scan.
55    total_cost: Duration,
56    /// Number of rows returned.
57    num_rows: usize,
58    /// Number of batches returned.
59    num_batches: usize,
60    /// Number of mem ranges scanned.
61    num_mem_ranges: usize,
62    /// Number of file ranges scanned.
63    num_file_ranges: usize,
64
65    // Memtable related metrics:
66    /// Duration to scan memtables.
67    mem_scan_cost: Duration,
68    /// Number of rows read from memtables.
69    mem_rows: usize,
70    /// Number of batches read from memtables.
71    mem_batches: usize,
72    /// Number of series read from memtables.
73    mem_series: usize,
74
75    // SST related metrics:
76    /// Duration to build file ranges.
77    build_parts_cost: Duration,
78    /// Number of row groups before filtering.
79    rg_total: usize,
80    /// Number of row groups filtered by fulltext index.
81    rg_fulltext_filtered: usize,
82    /// Number of row groups filtered by inverted index.
83    rg_inverted_filtered: usize,
84    /// Number of row groups filtered by min-max index.
85    rg_minmax_filtered: usize,
86    /// Number of row groups filtered by bloom filter index.
87    rg_bloom_filtered: usize,
88    /// Number of rows in row group before filtering.
89    rows_before_filter: usize,
90    /// Number of rows in row group filtered by fulltext index.
91    rows_fulltext_filtered: usize,
92    /// Number of rows in row group filtered by inverted index.
93    rows_inverted_filtered: usize,
94    /// Number of rows in row group filtered by bloom filter index.
95    rows_bloom_filtered: usize,
96    /// Number of rows filtered by precise filter.
97    rows_precise_filtered: usize,
98    /// Number of record batches read from SST.
99    num_sst_record_batches: usize,
100    /// Number of batches decoded from SST.
101    num_sst_batches: usize,
102    /// Number of rows read from SST.
103    num_sst_rows: usize,
104
105    /// Elapsed time before the first poll operation.
106    first_poll: Duration,
107
108    /// Number of send timeout in SeriesScan.
109    num_series_send_timeout: usize,
110    /// Number of send full in SeriesScan.
111    num_series_send_full: usize,
112    /// Number of rows the series distributor scanned.
113    num_distributor_rows: usize,
114    /// Number of batches the series distributor scanned.
115    num_distributor_batches: usize,
116    /// Duration of the series distributor to scan.
117    distributor_scan_cost: Duration,
118    /// Duration of the series distributor to yield.
119    distributor_yield_cost: Duration,
120
121    /// The stream reached EOF
122    stream_eof: bool,
123}
124
125impl fmt::Debug for ScanMetricsSet {
126    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127        let ScanMetricsSet {
128            prepare_scan_cost,
129            build_reader_cost,
130            scan_cost,
131            yield_cost,
132            total_cost,
133            num_rows,
134            num_batches,
135            num_mem_ranges,
136            num_file_ranges,
137            build_parts_cost,
138            rg_total,
139            rg_fulltext_filtered,
140            rg_inverted_filtered,
141            rg_minmax_filtered,
142            rg_bloom_filtered,
143            rows_before_filter,
144            rows_fulltext_filtered,
145            rows_inverted_filtered,
146            rows_bloom_filtered,
147            rows_precise_filtered,
148            num_sst_record_batches,
149            num_sst_batches,
150            num_sst_rows,
151            first_poll,
152            num_series_send_timeout,
153            num_series_send_full,
154            num_distributor_rows,
155            num_distributor_batches,
156            distributor_scan_cost,
157            distributor_yield_cost,
158            stream_eof,
159            mem_scan_cost,
160            mem_rows,
161            mem_batches,
162            mem_series,
163        } = self;
164
165        // Write core metrics
166        write!(
167            f,
168            "{{\"prepare_scan_cost\":\"{prepare_scan_cost:?}\", \
169            \"build_reader_cost\":\"{build_reader_cost:?}\", \
170            \"scan_cost\":\"{scan_cost:?}\", \
171            \"yield_cost\":\"{yield_cost:?}\", \
172            \"total_cost\":\"{total_cost:?}\", \
173            \"num_rows\":{num_rows}, \
174            \"num_batches\":{num_batches}, \
175            \"num_mem_ranges\":{num_mem_ranges}, \
176            \"num_file_ranges\":{num_file_ranges}, \
177            \"build_parts_cost\":\"{build_parts_cost:?}\", \
178            \"rg_total\":{rg_total}, \
179            \"rows_before_filter\":{rows_before_filter}, \
180            \"num_sst_record_batches\":{num_sst_record_batches}, \
181            \"num_sst_batches\":{num_sst_batches}, \
182            \"num_sst_rows\":{num_sst_rows}, \
183            \"first_poll\":\"{first_poll:?}\""
184        )?;
185
186        // Write non-zero filter counters
187        if *rg_fulltext_filtered > 0 {
188            write!(f, ", \"rg_fulltext_filtered\":{rg_fulltext_filtered}")?;
189        }
190        if *rg_inverted_filtered > 0 {
191            write!(f, ", \"rg_inverted_filtered\":{rg_inverted_filtered}")?;
192        }
193        if *rg_minmax_filtered > 0 {
194            write!(f, ", \"rg_minmax_filtered\":{rg_minmax_filtered}")?;
195        }
196        if *rg_bloom_filtered > 0 {
197            write!(f, ", \"rg_bloom_filtered\":{rg_bloom_filtered}")?;
198        }
199        if *rows_fulltext_filtered > 0 {
200            write!(f, ", \"rows_fulltext_filtered\":{rows_fulltext_filtered}")?;
201        }
202        if *rows_inverted_filtered > 0 {
203            write!(f, ", \"rows_inverted_filtered\":{rows_inverted_filtered}")?;
204        }
205        if *rows_bloom_filtered > 0 {
206            write!(f, ", \"rows_bloom_filtered\":{rows_bloom_filtered}")?;
207        }
208        if *rows_precise_filtered > 0 {
209            write!(f, ", \"rows_precise_filtered\":{rows_precise_filtered}")?;
210        }
211
212        // Write non-zero distributor metrics
213        if *num_series_send_timeout > 0 {
214            write!(f, ", \"num_series_send_timeout\":{num_series_send_timeout}")?;
215        }
216        if *num_series_send_full > 0 {
217            write!(f, ", \"num_series_send_full\":{num_series_send_full}")?;
218        }
219        if *num_distributor_rows > 0 {
220            write!(f, ", \"num_distributor_rows\":{num_distributor_rows}")?;
221        }
222        if *num_distributor_batches > 0 {
223            write!(f, ", \"num_distributor_batches\":{num_distributor_batches}")?;
224        }
225        if !distributor_scan_cost.is_zero() {
226            write!(
227                f,
228                ", \"distributor_scan_cost\":\"{distributor_scan_cost:?}\""
229            )?;
230        }
231        if !distributor_yield_cost.is_zero() {
232            write!(
233                f,
234                ", \"distributor_yield_cost\":\"{distributor_yield_cost:?}\""
235            )?;
236        }
237
238        // Write non-zero memtable metrics
239        if *mem_rows > 0 {
240            write!(f, ", \"mem_rows\":{mem_rows}")?;
241        }
242        if *mem_batches > 0 {
243            write!(f, ", \"mem_batches\":{mem_batches}")?;
244        }
245        if *mem_series > 0 {
246            write!(f, ", \"mem_series\":{mem_series}")?;
247        }
248        if !mem_scan_cost.is_zero() {
249            write!(f, ", \"mem_scan_cost\":\"{mem_scan_cost:?}\"")?;
250        }
251
252        write!(f, ", \"stream_eof\":{stream_eof}}}")
253    }
254}
255impl ScanMetricsSet {
256    /// Attaches the `prepare_scan_cost` to the metrics set.
257    fn with_prepare_scan_cost(mut self, cost: Duration) -> Self {
258        self.prepare_scan_cost += cost;
259        self
260    }
261
262    /// Merges the local scanner metrics.
263    fn merge_scanner_metrics(&mut self, other: &ScannerMetrics) {
264        let ScannerMetrics {
265            prepare_scan_cost,
266            build_reader_cost,
267            scan_cost,
268            yield_cost,
269            num_batches,
270            num_rows,
271            num_mem_ranges,
272            num_file_ranges,
273        } = other;
274
275        self.prepare_scan_cost += *prepare_scan_cost;
276        self.build_reader_cost += *build_reader_cost;
277        self.scan_cost += *scan_cost;
278        self.yield_cost += *yield_cost;
279        self.num_rows += *num_rows;
280        self.num_batches += *num_batches;
281        self.num_mem_ranges += *num_mem_ranges;
282        self.num_file_ranges += *num_file_ranges;
283    }
284
285    /// Merges the local reader metrics.
286    fn merge_reader_metrics(&mut self, other: &ReaderMetrics) {
287        let ReaderMetrics {
288            build_cost,
289            filter_metrics:
290                ReaderFilterMetrics {
291                    rg_total,
292                    rg_fulltext_filtered,
293                    rg_inverted_filtered,
294                    rg_minmax_filtered,
295                    rg_bloom_filtered,
296                    rows_total,
297                    rows_fulltext_filtered,
298                    rows_inverted_filtered,
299                    rows_bloom_filtered,
300                    rows_precise_filtered,
301                },
302            num_record_batches,
303            num_batches,
304            num_rows,
305            scan_cost: _,
306        } = other;
307
308        self.build_parts_cost += *build_cost;
309
310        self.rg_total += *rg_total;
311        self.rg_fulltext_filtered += *rg_fulltext_filtered;
312        self.rg_inverted_filtered += *rg_inverted_filtered;
313        self.rg_minmax_filtered += *rg_minmax_filtered;
314        self.rg_bloom_filtered += *rg_bloom_filtered;
315
316        self.rows_before_filter += *rows_total;
317        self.rows_fulltext_filtered += *rows_fulltext_filtered;
318        self.rows_inverted_filtered += *rows_inverted_filtered;
319        self.rows_bloom_filtered += *rows_bloom_filtered;
320        self.rows_precise_filtered += *rows_precise_filtered;
321
322        self.num_sst_record_batches += *num_record_batches;
323        self.num_sst_batches += *num_batches;
324        self.num_sst_rows += *num_rows;
325    }
326
327    /// Sets distributor metrics.
328    fn set_distributor_metrics(&mut self, distributor_metrics: &SeriesDistributorMetrics) {
329        let SeriesDistributorMetrics {
330            num_series_send_timeout,
331            num_series_send_full,
332            num_rows,
333            num_batches,
334            scan_cost,
335            yield_cost,
336        } = distributor_metrics;
337
338        self.num_series_send_timeout += *num_series_send_timeout;
339        self.num_series_send_full += *num_series_send_full;
340        self.num_distributor_rows += *num_rows;
341        self.num_distributor_batches += *num_batches;
342        self.distributor_scan_cost += *scan_cost;
343        self.distributor_yield_cost += *yield_cost;
344    }
345
346    /// Observes metrics.
347    fn observe_metrics(&self) {
348        READ_STAGE_ELAPSED
349            .with_label_values(&["prepare_scan"])
350            .observe(self.prepare_scan_cost.as_secs_f64());
351        READ_STAGE_ELAPSED
352            .with_label_values(&["build_reader"])
353            .observe(self.build_reader_cost.as_secs_f64());
354        READ_STAGE_ELAPSED
355            .with_label_values(&["scan"])
356            .observe(self.scan_cost.as_secs_f64());
357        READ_STAGE_ELAPSED
358            .with_label_values(&["yield"])
359            .observe(self.yield_cost.as_secs_f64());
360        READ_STAGE_ELAPSED
361            .with_label_values(&["total"])
362            .observe(self.total_cost.as_secs_f64());
363        READ_ROWS_RETURN.observe(self.num_rows as f64);
364        READ_BATCHES_RETURN.observe(self.num_batches as f64);
365
366        READ_STAGE_ELAPSED
367            .with_label_values(&["build_parts"])
368            .observe(self.build_parts_cost.as_secs_f64());
369
370        READ_ROW_GROUPS_TOTAL
371            .with_label_values(&["before_filtering"])
372            .inc_by(self.rg_total as u64);
373        READ_ROW_GROUPS_TOTAL
374            .with_label_values(&["fulltext_index_filtered"])
375            .inc_by(self.rg_fulltext_filtered as u64);
376        READ_ROW_GROUPS_TOTAL
377            .with_label_values(&["inverted_index_filtered"])
378            .inc_by(self.rg_inverted_filtered as u64);
379        READ_ROW_GROUPS_TOTAL
380            .with_label_values(&["minmax_index_filtered"])
381            .inc_by(self.rg_minmax_filtered as u64);
382        READ_ROW_GROUPS_TOTAL
383            .with_label_values(&["bloom_filter_index_filtered"])
384            .inc_by(self.rg_bloom_filtered as u64);
385
386        PRECISE_FILTER_ROWS_TOTAL
387            .with_label_values(&["parquet"])
388            .inc_by(self.rows_precise_filtered as u64);
389        READ_ROWS_IN_ROW_GROUP_TOTAL
390            .with_label_values(&["before_filtering"])
391            .inc_by(self.rows_before_filter as u64);
392        READ_ROWS_IN_ROW_GROUP_TOTAL
393            .with_label_values(&["fulltext_index_filtered"])
394            .inc_by(self.rows_fulltext_filtered as u64);
395        READ_ROWS_IN_ROW_GROUP_TOTAL
396            .with_label_values(&["inverted_index_filtered"])
397            .inc_by(self.rows_inverted_filtered as u64);
398        READ_ROWS_IN_ROW_GROUP_TOTAL
399            .with_label_values(&["bloom_filter_index_filtered"])
400            .inc_by(self.rows_bloom_filtered as u64);
401    }
402}
403
404struct PartitionMetricsInner {
405    region_id: RegionId,
406    /// Index of the partition to scan.
407    partition: usize,
408    /// Label to distinguish different scan operation.
409    scanner_type: &'static str,
410    /// Query start time.
411    query_start: Instant,
412    /// Whether to use verbose logging.
413    explain_verbose: bool,
414    /// Verbose scan metrics that only log to debug logs by default.
415    metrics: Mutex<ScanMetricsSet>,
416    in_progress_scan: IntGauge,
417
418    // Normal metrics that always report to the [ExecutionPlanMetricsSet]:
419    /// Duration to build file ranges.
420    build_parts_cost: Time,
421    /// Duration to build the (merge) reader.
422    build_reader_cost: Time,
423    /// Duration to scan data.
424    scan_cost: Time,
425    /// Duration while waiting for `yield`.
426    yield_cost: Time,
427    /// Duration to convert [`Batch`]es.
428    convert_cost: Time,
429}
430
431impl PartitionMetricsInner {
432    fn on_finish(&self, stream_eof: bool) {
433        let mut metrics = self.metrics.lock().unwrap();
434        if metrics.total_cost.is_zero() {
435            metrics.total_cost = self.query_start.elapsed();
436        }
437        if !metrics.stream_eof {
438            metrics.stream_eof = stream_eof;
439        }
440    }
441}
442
443impl Drop for PartitionMetricsInner {
444    fn drop(&mut self) {
445        self.on_finish(false);
446        let metrics = self.metrics.lock().unwrap();
447        metrics.observe_metrics();
448        self.in_progress_scan.dec();
449
450        if self.explain_verbose {
451            common_telemetry::info!(
452                "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}, convert_batch_costs: {}",
453                self.scanner_type,
454                self.region_id,
455                self.partition,
456                metrics,
457                self.convert_cost,
458            );
459        } else {
460            common_telemetry::debug!(
461                "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}, convert_batch_costs: {}",
462                self.scanner_type,
463                self.region_id,
464                self.partition,
465                metrics,
466                self.convert_cost,
467            );
468        }
469    }
470}
471
472/// List of PartitionMetrics.
473#[derive(Default)]
474pub(crate) struct PartitionMetricsList(Mutex<Vec<Option<PartitionMetrics>>>);
475
476impl PartitionMetricsList {
477    /// Sets a new [PartitionMetrics] at the specified partition.
478    pub(crate) fn set(&self, partition: usize, metrics: PartitionMetrics) {
479        let mut list = self.0.lock().unwrap();
480        if list.len() <= partition {
481            list.resize(partition + 1, None);
482        }
483        list[partition] = Some(metrics);
484    }
485
486    /// Format verbose metrics for each partition for explain.
487    pub(crate) fn format_verbose_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result {
488        let list = self.0.lock().unwrap();
489        write!(f, ", \"metrics_per_partition\": ")?;
490        f.debug_list()
491            .entries(list.iter().filter_map(|p| p.as_ref()))
492            .finish()?;
493        write!(f, "}}")
494    }
495}
496
497/// Metrics while reading a partition.
498#[derive(Clone)]
499pub struct PartitionMetrics(Arc<PartitionMetricsInner>);
500
501impl PartitionMetrics {
502    pub(crate) fn new(
503        region_id: RegionId,
504        partition: usize,
505        scanner_type: &'static str,
506        query_start: Instant,
507        explain_verbose: bool,
508        metrics_set: &ExecutionPlanMetricsSet,
509    ) -> Self {
510        let partition_str = partition.to_string();
511        let in_progress_scan = IN_PROGRESS_SCAN.with_label_values(&[scanner_type, &partition_str]);
512        in_progress_scan.inc();
513        let metrics = ScanMetricsSet::default().with_prepare_scan_cost(query_start.elapsed());
514        let inner = PartitionMetricsInner {
515            region_id,
516            partition,
517            scanner_type,
518            query_start,
519            explain_verbose,
520            metrics: Mutex::new(metrics),
521            in_progress_scan,
522            build_parts_cost: MetricBuilder::new(metrics_set)
523                .subset_time("build_parts_cost", partition),
524            build_reader_cost: MetricBuilder::new(metrics_set)
525                .subset_time("build_reader_cost", partition),
526            scan_cost: MetricBuilder::new(metrics_set).subset_time("scan_cost", partition),
527            yield_cost: MetricBuilder::new(metrics_set).subset_time("yield_cost", partition),
528            convert_cost: MetricBuilder::new(metrics_set).subset_time("convert_cost", partition),
529        };
530        Self(Arc::new(inner))
531    }
532
533    pub(crate) fn on_first_poll(&self) {
534        let mut metrics = self.0.metrics.lock().unwrap();
535        metrics.first_poll = self.0.query_start.elapsed();
536    }
537
538    pub(crate) fn inc_num_mem_ranges(&self, num: usize) {
539        let mut metrics = self.0.metrics.lock().unwrap();
540        metrics.num_mem_ranges += num;
541    }
542
543    pub fn inc_num_file_ranges(&self, num: usize) {
544        let mut metrics = self.0.metrics.lock().unwrap();
545        metrics.num_file_ranges += num;
546    }
547
548    /// Merges `build_reader_cost`.
549    pub(crate) fn inc_build_reader_cost(&self, cost: Duration) {
550        self.0.build_reader_cost.add_duration(cost);
551
552        let mut metrics = self.0.metrics.lock().unwrap();
553        metrics.build_reader_cost += cost;
554    }
555
556    pub(crate) fn inc_convert_batch_cost(&self, cost: Duration) {
557        self.0.convert_cost.add_duration(cost);
558    }
559
560    /// Reports memtable scan metrics.
561    pub(crate) fn report_mem_scan_metrics(&self, data: &crate::memtable::MemScanMetricsData) {
562        let mut metrics = self.0.metrics.lock().unwrap();
563        metrics.mem_scan_cost += data.scan_cost;
564        metrics.mem_rows += data.num_rows;
565        metrics.mem_batches += data.num_batches;
566        metrics.mem_series += data.total_series;
567    }
568
569    /// Merges [ScannerMetrics], `build_reader_cost`, `scan_cost` and `yield_cost`.
570    pub(crate) fn merge_metrics(&self, metrics: &ScannerMetrics) {
571        self.0
572            .build_reader_cost
573            .add_duration(metrics.build_reader_cost);
574        self.0.scan_cost.add_duration(metrics.scan_cost);
575        self.0.yield_cost.add_duration(metrics.yield_cost);
576
577        let mut metrics_set = self.0.metrics.lock().unwrap();
578        metrics_set.merge_scanner_metrics(metrics);
579    }
580
581    /// Merges [ReaderMetrics] and `build_reader_cost`.
582    pub fn merge_reader_metrics(&self, metrics: &ReaderMetrics) {
583        self.0.build_parts_cost.add_duration(metrics.build_cost);
584
585        let mut metrics_set = self.0.metrics.lock().unwrap();
586        metrics_set.merge_reader_metrics(metrics);
587    }
588
589    /// Finishes the query.
590    pub(crate) fn on_finish(&self) {
591        self.0.on_finish(true);
592    }
593
594    /// Sets the distributor metrics.
595    pub(crate) fn set_distributor_metrics(&self, metrics: &SeriesDistributorMetrics) {
596        let mut metrics_set = self.0.metrics.lock().unwrap();
597        metrics_set.set_distributor_metrics(metrics);
598    }
599}
600
601impl fmt::Debug for PartitionMetrics {
602    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
603        let metrics = self.0.metrics.lock().unwrap();
604        write!(
605            f,
606            r#"{{"partition":{}, "metrics":{:?}}}"#,
607            self.0.partition, metrics
608        )
609    }
610}
611
612/// Metrics for the series distributor.
613#[derive(Default)]
614pub(crate) struct SeriesDistributorMetrics {
615    /// Number of send timeout in SeriesScan.
616    pub(crate) num_series_send_timeout: usize,
617    /// Number of send full in SeriesScan.
618    pub(crate) num_series_send_full: usize,
619    /// Number of rows the series distributor scanned.
620    pub(crate) num_rows: usize,
621    /// Number of batches the series distributor scanned.
622    pub(crate) num_batches: usize,
623    /// Duration of the series distributor to scan.
624    pub(crate) scan_cost: Duration,
625    /// Duration of the series distributor to yield.
626    pub(crate) yield_cost: Duration,
627}
628
629/// Scans memtable ranges at `index`.
630pub(crate) fn scan_mem_ranges(
631    stream_ctx: Arc<StreamContext>,
632    part_metrics: PartitionMetrics,
633    index: RowGroupIndex,
634    time_range: FileTimeRange,
635) -> impl Stream<Item = Result<Batch>> {
636    try_stream! {
637        let ranges = stream_ctx.input.build_mem_ranges(index);
638        part_metrics.inc_num_mem_ranges(ranges.len());
639        for range in ranges {
640            let build_reader_start = Instant::now();
641            let mem_scan_metrics = Some(MemScanMetrics::default());
642            let iter = range.build_prune_iter(time_range, mem_scan_metrics.clone())?;
643            part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
644
645            let mut source = Source::Iter(iter);
646            while let Some(batch) = source.next_batch().await? {
647                yield batch;
648            }
649
650            // Report the memtable scan metrics to partition metrics
651            if let Some(ref metrics) = mem_scan_metrics {
652                let data = metrics.data();
653                part_metrics.report_mem_scan_metrics(&data);
654            }
655        }
656    }
657}
658
659/// Scans memtable ranges at `index` using flat format that returns RecordBatch.
660#[allow(dead_code)]
661pub(crate) fn scan_flat_mem_ranges(
662    stream_ctx: Arc<StreamContext>,
663    part_metrics: PartitionMetrics,
664    index: RowGroupIndex,
665) -> impl Stream<Item = Result<RecordBatch>> {
666    try_stream! {
667        let ranges = stream_ctx.input.build_mem_ranges(index);
668        part_metrics.inc_num_mem_ranges(ranges.len());
669        for range in ranges {
670            let build_reader_start = Instant::now();
671            let mem_scan_metrics = Some(MemScanMetrics::default());
672            let mut iter = range.build_record_batch_iter(mem_scan_metrics.clone())?;
673            part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
674
675            while let Some(record_batch) = iter.next().transpose()? {
676                yield record_batch;
677            }
678
679            // Report the memtable scan metrics to partition metrics
680            if let Some(ref metrics) = mem_scan_metrics {
681                let data = metrics.data();
682                part_metrics.report_mem_scan_metrics(&data);
683            }
684        }
685    }
686}
687
688/// Scans file ranges at `index`.
689pub(crate) async fn scan_file_ranges(
690    stream_ctx: Arc<StreamContext>,
691    part_metrics: PartitionMetrics,
692    index: RowGroupIndex,
693    read_type: &'static str,
694    range_builder: Arc<RangeBuilderList>,
695) -> Result<impl Stream<Item = Result<Batch>>> {
696    let mut reader_metrics = ReaderMetrics::default();
697    let ranges = range_builder
698        .build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
699        .await?;
700    part_metrics.inc_num_file_ranges(ranges.len());
701    part_metrics.merge_reader_metrics(&reader_metrics);
702
703    Ok(build_file_range_scan_stream(
704        stream_ctx,
705        part_metrics,
706        read_type,
707        ranges,
708    ))
709}
710
711/// Scans file ranges at `index` using flat reader that returns RecordBatch.
712#[allow(dead_code)]
713pub(crate) async fn scan_flat_file_ranges(
714    stream_ctx: Arc<StreamContext>,
715    part_metrics: PartitionMetrics,
716    index: RowGroupIndex,
717    read_type: &'static str,
718    range_builder: Arc<RangeBuilderList>,
719) -> Result<impl Stream<Item = Result<RecordBatch>>> {
720    let mut reader_metrics = ReaderMetrics::default();
721    let ranges = range_builder
722        .build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
723        .await?;
724    part_metrics.inc_num_file_ranges(ranges.len());
725    part_metrics.merge_reader_metrics(&reader_metrics);
726
727    Ok(build_flat_file_range_scan_stream(
728        stream_ctx,
729        part_metrics,
730        read_type,
731        ranges,
732    ))
733}
734
735/// Build the stream of scanning the input [`FileRange`]s.
736pub fn build_file_range_scan_stream(
737    stream_ctx: Arc<StreamContext>,
738    part_metrics: PartitionMetrics,
739    read_type: &'static str,
740    ranges: SmallVec<[FileRange; 2]>,
741) -> impl Stream<Item = Result<Batch>> {
742    try_stream! {
743        let reader_metrics = &mut ReaderMetrics::default();
744        for range in ranges {
745            let build_reader_start = Instant::now();
746            let reader = range.reader(stream_ctx.input.series_row_selector).await?;
747            let build_cost = build_reader_start.elapsed();
748            part_metrics.inc_build_reader_cost(build_cost);
749            let compat_batch = range.compat_batch();
750            let mut source = Source::PruneReader(reader);
751            while let Some(mut batch) = source.next_batch().await? {
752                if let Some(compact_batch) = compat_batch {
753                    batch = compact_batch.as_primary_key().unwrap().compat_batch(batch)?;
754                }
755                yield batch;
756            }
757            if let Source::PruneReader(reader) = source {
758                let prune_metrics = reader.metrics();
759                reader_metrics.merge_from(&prune_metrics);
760            }
761        }
762
763        // Reports metrics.
764        reader_metrics.observe_rows(read_type);
765        reader_metrics.filter_metrics.observe();
766        part_metrics.merge_reader_metrics(reader_metrics);
767    }
768}
769
770/// Build the stream of scanning the input [`FileRange`]s using flat reader that returns RecordBatch.
771pub fn build_flat_file_range_scan_stream(
772    _stream_ctx: Arc<StreamContext>,
773    part_metrics: PartitionMetrics,
774    read_type: &'static str,
775    ranges: SmallVec<[FileRange; 2]>,
776) -> impl Stream<Item = Result<RecordBatch>> {
777    try_stream! {
778        let reader_metrics = &mut ReaderMetrics::default();
779        for range in ranges {
780            let build_reader_start = Instant::now();
781            let mut reader = range.flat_reader().await?;
782            let build_cost = build_reader_start.elapsed();
783            part_metrics.inc_build_reader_cost(build_cost);
784
785            let may_compat = range
786                .compat_batch()
787                .map(|compat| {
788                    compat.as_flat().context(UnexpectedSnafu {
789                        reason: "Invalid compat for flat format",
790                    })
791                })
792                .transpose()?;
793            while let Some(record_batch) = reader.next_batch()? {
794                if let Some(flat_compat) = may_compat {
795                    let batch = flat_compat.compat(record_batch)?;
796                    yield batch;
797                } else {
798                    yield record_batch;
799                }
800            }
801
802            let prune_metrics = reader.metrics();
803            reader_metrics.merge_from(&prune_metrics);
804        }
805
806        // Reports metrics.
807        reader_metrics.observe_rows(read_type);
808        reader_metrics.filter_metrics.observe();
809        part_metrics.merge_reader_metrics(reader_metrics);
810    }
811}
812
813/// Build the stream of scanning the extension range denoted by the [`RowGroupIndex`].
814#[cfg(feature = "enterprise")]
815pub(crate) async fn scan_extension_range(
816    context: Arc<StreamContext>,
817    index: RowGroupIndex,
818    partition_metrics: PartitionMetrics,
819) -> Result<BoxedBatchStream> {
820    use snafu::ResultExt;
821
822    let range = context.input.extension_range(index.index);
823    let reader = range.reader(context.as_ref());
824    let stream = reader
825        .read(context, partition_metrics, index)
826        .await
827        .context(crate::error::ScanExternalRangeSnafu)?;
828    Ok(stream)
829}
830
831pub(crate) async fn maybe_scan_other_ranges(
832    context: &Arc<StreamContext>,
833    index: RowGroupIndex,
834    metrics: &PartitionMetrics,
835) -> Result<BoxedBatchStream> {
836    #[cfg(feature = "enterprise")]
837    {
838        scan_extension_range(context.clone(), index, metrics.clone()).await
839    }
840
841    #[cfg(not(feature = "enterprise"))]
842    {
843        let _ = context;
844        let _ = index;
845        let _ = metrics;
846
847        crate::error::UnexpectedSnafu {
848            reason: "no other ranges scannable",
849        }
850        .fail()
851    }
852}
853
854#[allow(dead_code)]
855pub(crate) async fn maybe_scan_flat_other_ranges(
856    context: &Arc<StreamContext>,
857    index: RowGroupIndex,
858    metrics: &PartitionMetrics,
859) -> Result<BoxedRecordBatchStream> {
860    let _ = context;
861    let _ = index;
862    let _ = metrics;
863
864    crate::error::UnexpectedSnafu {
865        reason: "no other ranges scannable in flat format",
866    }
867    .fail()
868}