mito2/read/
scan_util.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities for scanners.
16
17use std::fmt;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use async_stream::try_stream;
22use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time};
23use datatypes::arrow::record_batch::RecordBatch;
24use futures::Stream;
25use prometheus::IntGauge;
26use smallvec::SmallVec;
27use snafu::OptionExt;
28use store_api::storage::RegionId;
29
30use crate::error::{Result, UnexpectedSnafu};
31use crate::memtable::MemScanMetrics;
32use crate::metrics::{
33    IN_PROGRESS_SCAN, PRECISE_FILTER_ROWS_TOTAL, READ_BATCHES_RETURN, READ_ROWS_IN_ROW_GROUP_TOTAL,
34    READ_ROWS_RETURN, READ_ROW_GROUPS_TOTAL, READ_STAGE_ELAPSED,
35};
36use crate::read::range::{RangeBuilderList, RowGroupIndex};
37use crate::read::scan_region::StreamContext;
38use crate::read::{Batch, BoxedBatchStream, BoxedRecordBatchStream, ScannerMetrics, Source};
39use crate::sst::file::FileTimeRange;
40use crate::sst::parquet::file_range::FileRange;
41use crate::sst::parquet::reader::{ReaderFilterMetrics, ReaderMetrics};
42
43/// Verbose scan metrics for a partition.
44#[derive(Default)]
45pub(crate) struct ScanMetricsSet {
46    /// Duration to prepare the scan task.
47    prepare_scan_cost: Duration,
48    /// Duration to build the (merge) reader.
49    build_reader_cost: Duration,
50    /// Duration to scan data.
51    scan_cost: Duration,
52    /// Duration while waiting for `yield`.
53    yield_cost: Duration,
54    /// Duration of the scan.
55    total_cost: Duration,
56    /// Number of rows returned.
57    num_rows: usize,
58    /// Number of batches returned.
59    num_batches: usize,
60    /// Number of mem ranges scanned.
61    num_mem_ranges: usize,
62    /// Number of file ranges scanned.
63    num_file_ranges: usize,
64
65    // Memtable related metrics:
66    /// Duration to scan memtables.
67    mem_scan_cost: Duration,
68    /// Number of rows read from memtables.
69    mem_rows: usize,
70    /// Number of batches read from memtables.
71    mem_batches: usize,
72    /// Number of series read from memtables.
73    mem_series: usize,
74
75    // SST related metrics:
76    /// Duration to build file ranges.
77    build_parts_cost: Duration,
78    /// Number of row groups before filtering.
79    rg_total: usize,
80    /// Number of row groups filtered by fulltext index.
81    rg_fulltext_filtered: usize,
82    /// Number of row groups filtered by inverted index.
83    rg_inverted_filtered: usize,
84    /// Number of row groups filtered by min-max index.
85    rg_minmax_filtered: usize,
86    /// Number of row groups filtered by bloom filter index.
87    rg_bloom_filtered: usize,
88    /// Number of rows in row group before filtering.
89    rows_before_filter: usize,
90    /// Number of rows in row group filtered by fulltext index.
91    rows_fulltext_filtered: usize,
92    /// Number of rows in row group filtered by inverted index.
93    rows_inverted_filtered: usize,
94    /// Number of rows in row group filtered by bloom filter index.
95    rows_bloom_filtered: usize,
96    /// Number of rows filtered by precise filter.
97    rows_precise_filtered: usize,
98    /// Number of record batches read from SST.
99    num_sst_record_batches: usize,
100    /// Number of batches decoded from SST.
101    num_sst_batches: usize,
102    /// Number of rows read from SST.
103    num_sst_rows: usize,
104
105    /// Elapsed time before the first poll operation.
106    first_poll: Duration,
107
108    /// Number of send timeout in SeriesScan.
109    num_series_send_timeout: usize,
110    /// Number of send full in SeriesScan.
111    num_series_send_full: usize,
112    /// Number of rows the series distributor scanned.
113    num_distributor_rows: usize,
114    /// Number of batches the series distributor scanned.
115    num_distributor_batches: usize,
116    /// Duration of the series distributor to scan.
117    distributor_scan_cost: Duration,
118    /// Duration of the series distributor to yield.
119    distributor_yield_cost: Duration,
120
121    /// The stream reached EOF
122    stream_eof: bool,
123}
124
125impl fmt::Debug for ScanMetricsSet {
126    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127        let ScanMetricsSet {
128            prepare_scan_cost,
129            build_reader_cost,
130            scan_cost,
131            yield_cost,
132            total_cost,
133            num_rows,
134            num_batches,
135            num_mem_ranges,
136            num_file_ranges,
137            build_parts_cost,
138            rg_total,
139            rg_fulltext_filtered,
140            rg_inverted_filtered,
141            rg_minmax_filtered,
142            rg_bloom_filtered,
143            rows_before_filter,
144            rows_fulltext_filtered,
145            rows_inverted_filtered,
146            rows_bloom_filtered,
147            rows_precise_filtered,
148            num_sst_record_batches,
149            num_sst_batches,
150            num_sst_rows,
151            first_poll,
152            num_series_send_timeout,
153            num_series_send_full,
154            num_distributor_rows,
155            num_distributor_batches,
156            distributor_scan_cost,
157            distributor_yield_cost,
158            stream_eof,
159            mem_scan_cost,
160            mem_rows,
161            mem_batches,
162            mem_series,
163        } = self;
164
165        // Write core metrics
166        write!(
167            f,
168            "{{\"prepare_scan_cost\":\"{prepare_scan_cost:?}\", \
169            \"build_reader_cost\":\"{build_reader_cost:?}\", \
170            \"scan_cost\":\"{scan_cost:?}\", \
171            \"yield_cost\":\"{yield_cost:?}\", \
172            \"total_cost\":\"{total_cost:?}\", \
173            \"num_rows\":{num_rows}, \
174            \"num_batches\":{num_batches}, \
175            \"num_mem_ranges\":{num_mem_ranges}, \
176            \"num_file_ranges\":{num_file_ranges}, \
177            \"build_parts_cost\":\"{build_parts_cost:?}\", \
178            \"rg_total\":{rg_total}, \
179            \"rows_before_filter\":{rows_before_filter}, \
180            \"num_sst_record_batches\":{num_sst_record_batches}, \
181            \"num_sst_batches\":{num_sst_batches}, \
182            \"num_sst_rows\":{num_sst_rows}, \
183            \"first_poll\":\"{first_poll:?}\""
184        )?;
185
186        // Write non-zero filter counters
187        if *rg_fulltext_filtered > 0 {
188            write!(f, ", \"rg_fulltext_filtered\":{rg_fulltext_filtered}")?;
189        }
190        if *rg_inverted_filtered > 0 {
191            write!(f, ", \"rg_inverted_filtered\":{rg_inverted_filtered}")?;
192        }
193        if *rg_minmax_filtered > 0 {
194            write!(f, ", \"rg_minmax_filtered\":{rg_minmax_filtered}")?;
195        }
196        if *rg_bloom_filtered > 0 {
197            write!(f, ", \"rg_bloom_filtered\":{rg_bloom_filtered}")?;
198        }
199        if *rows_fulltext_filtered > 0 {
200            write!(f, ", \"rows_fulltext_filtered\":{rows_fulltext_filtered}")?;
201        }
202        if *rows_inverted_filtered > 0 {
203            write!(f, ", \"rows_inverted_filtered\":{rows_inverted_filtered}")?;
204        }
205        if *rows_bloom_filtered > 0 {
206            write!(f, ", \"rows_bloom_filtered\":{rows_bloom_filtered}")?;
207        }
208        if *rows_precise_filtered > 0 {
209            write!(f, ", \"rows_precise_filtered\":{rows_precise_filtered}")?;
210        }
211
212        // Write non-zero distributor metrics
213        if *num_series_send_timeout > 0 {
214            write!(f, ", \"num_series_send_timeout\":{num_series_send_timeout}")?;
215        }
216        if *num_series_send_full > 0 {
217            write!(f, ", \"num_series_send_full\":{num_series_send_full}")?;
218        }
219        if *num_distributor_rows > 0 {
220            write!(f, ", \"num_distributor_rows\":{num_distributor_rows}")?;
221        }
222        if *num_distributor_batches > 0 {
223            write!(f, ", \"num_distributor_batches\":{num_distributor_batches}")?;
224        }
225        if !distributor_scan_cost.is_zero() {
226            write!(
227                f,
228                ", \"distributor_scan_cost\":\"{distributor_scan_cost:?}\""
229            )?;
230        }
231        if !distributor_yield_cost.is_zero() {
232            write!(
233                f,
234                ", \"distributor_yield_cost\":\"{distributor_yield_cost:?}\""
235            )?;
236        }
237
238        // Write non-zero memtable metrics
239        if *mem_rows > 0 {
240            write!(f, ", \"mem_rows\":{mem_rows}")?;
241        }
242        if *mem_batches > 0 {
243            write!(f, ", \"mem_batches\":{mem_batches}")?;
244        }
245        if *mem_series > 0 {
246            write!(f, ", \"mem_series\":{mem_series}")?;
247        }
248        if !mem_scan_cost.is_zero() {
249            write!(f, ", \"mem_scan_cost\":\"{mem_scan_cost:?}\"")?;
250        }
251
252        write!(f, ", \"stream_eof\":{stream_eof}}}")
253    }
254}
255impl ScanMetricsSet {
256    /// Attaches the `prepare_scan_cost` to the metrics set.
257    fn with_prepare_scan_cost(mut self, cost: Duration) -> Self {
258        self.prepare_scan_cost += cost;
259        self
260    }
261
262    /// Merges the local scanner metrics.
263    fn merge_scanner_metrics(&mut self, other: &ScannerMetrics) {
264        let ScannerMetrics {
265            prepare_scan_cost,
266            build_reader_cost,
267            scan_cost,
268            yield_cost,
269            num_batches,
270            num_rows,
271            num_mem_ranges,
272            num_file_ranges,
273        } = other;
274
275        self.prepare_scan_cost += *prepare_scan_cost;
276        self.build_reader_cost += *build_reader_cost;
277        self.scan_cost += *scan_cost;
278        self.yield_cost += *yield_cost;
279        self.num_rows += *num_rows;
280        self.num_batches += *num_batches;
281        self.num_mem_ranges += *num_mem_ranges;
282        self.num_file_ranges += *num_file_ranges;
283    }
284
285    /// Merges the local reader metrics.
286    fn merge_reader_metrics(&mut self, other: &ReaderMetrics) {
287        let ReaderMetrics {
288            build_cost,
289            filter_metrics:
290                ReaderFilterMetrics {
291                    rg_total,
292                    rg_fulltext_filtered,
293                    rg_inverted_filtered,
294                    rg_minmax_filtered,
295                    rg_bloom_filtered,
296                    rows_total,
297                    rows_fulltext_filtered,
298                    rows_inverted_filtered,
299                    rows_bloom_filtered,
300                    rows_precise_filtered,
301                },
302            num_record_batches,
303            num_batches,
304            num_rows,
305            scan_cost: _,
306        } = other;
307
308        self.build_parts_cost += *build_cost;
309
310        self.rg_total += *rg_total;
311        self.rg_fulltext_filtered += *rg_fulltext_filtered;
312        self.rg_inverted_filtered += *rg_inverted_filtered;
313        self.rg_minmax_filtered += *rg_minmax_filtered;
314        self.rg_bloom_filtered += *rg_bloom_filtered;
315
316        self.rows_before_filter += *rows_total;
317        self.rows_fulltext_filtered += *rows_fulltext_filtered;
318        self.rows_inverted_filtered += *rows_inverted_filtered;
319        self.rows_bloom_filtered += *rows_bloom_filtered;
320        self.rows_precise_filtered += *rows_precise_filtered;
321
322        self.num_sst_record_batches += *num_record_batches;
323        self.num_sst_batches += *num_batches;
324        self.num_sst_rows += *num_rows;
325    }
326
327    /// Sets distributor metrics.
328    fn set_distributor_metrics(&mut self, distributor_metrics: &SeriesDistributorMetrics) {
329        let SeriesDistributorMetrics {
330            num_series_send_timeout,
331            num_series_send_full,
332            num_rows,
333            num_batches,
334            scan_cost,
335            yield_cost,
336        } = distributor_metrics;
337
338        self.num_series_send_timeout += *num_series_send_timeout;
339        self.num_series_send_full += *num_series_send_full;
340        self.num_distributor_rows += *num_rows;
341        self.num_distributor_batches += *num_batches;
342        self.distributor_scan_cost += *scan_cost;
343        self.distributor_yield_cost += *yield_cost;
344    }
345
346    /// Observes metrics.
347    fn observe_metrics(&self) {
348        READ_STAGE_ELAPSED
349            .with_label_values(&["prepare_scan"])
350            .observe(self.prepare_scan_cost.as_secs_f64());
351        READ_STAGE_ELAPSED
352            .with_label_values(&["build_reader"])
353            .observe(self.build_reader_cost.as_secs_f64());
354        READ_STAGE_ELAPSED
355            .with_label_values(&["scan"])
356            .observe(self.scan_cost.as_secs_f64());
357        READ_STAGE_ELAPSED
358            .with_label_values(&["yield"])
359            .observe(self.yield_cost.as_secs_f64());
360        READ_STAGE_ELAPSED
361            .with_label_values(&["total"])
362            .observe(self.total_cost.as_secs_f64());
363        READ_ROWS_RETURN.observe(self.num_rows as f64);
364        READ_BATCHES_RETURN.observe(self.num_batches as f64);
365
366        READ_STAGE_ELAPSED
367            .with_label_values(&["build_parts"])
368            .observe(self.build_parts_cost.as_secs_f64());
369
370        READ_ROW_GROUPS_TOTAL
371            .with_label_values(&["before_filtering"])
372            .inc_by(self.rg_total as u64);
373        READ_ROW_GROUPS_TOTAL
374            .with_label_values(&["fulltext_index_filtered"])
375            .inc_by(self.rg_fulltext_filtered as u64);
376        READ_ROW_GROUPS_TOTAL
377            .with_label_values(&["inverted_index_filtered"])
378            .inc_by(self.rg_inverted_filtered as u64);
379        READ_ROW_GROUPS_TOTAL
380            .with_label_values(&["minmax_index_filtered"])
381            .inc_by(self.rg_minmax_filtered as u64);
382        READ_ROW_GROUPS_TOTAL
383            .with_label_values(&["bloom_filter_index_filtered"])
384            .inc_by(self.rg_bloom_filtered as u64);
385
386        PRECISE_FILTER_ROWS_TOTAL
387            .with_label_values(&["parquet"])
388            .inc_by(self.rows_precise_filtered as u64);
389        READ_ROWS_IN_ROW_GROUP_TOTAL
390            .with_label_values(&["before_filtering"])
391            .inc_by(self.rows_before_filter as u64);
392        READ_ROWS_IN_ROW_GROUP_TOTAL
393            .with_label_values(&["fulltext_index_filtered"])
394            .inc_by(self.rows_fulltext_filtered as u64);
395        READ_ROWS_IN_ROW_GROUP_TOTAL
396            .with_label_values(&["inverted_index_filtered"])
397            .inc_by(self.rows_inverted_filtered as u64);
398        READ_ROWS_IN_ROW_GROUP_TOTAL
399            .with_label_values(&["bloom_filter_index_filtered"])
400            .inc_by(self.rows_bloom_filtered as u64);
401    }
402}
403
404struct PartitionMetricsInner {
405    region_id: RegionId,
406    /// Index of the partition to scan.
407    partition: usize,
408    /// Label to distinguish different scan operation.
409    scanner_type: &'static str,
410    /// Query start time.
411    query_start: Instant,
412    /// Whether to use verbose logging.
413    explain_verbose: bool,
414    /// Verbose scan metrics that only log to debug logs by default.
415    metrics: Mutex<ScanMetricsSet>,
416    in_progress_scan: IntGauge,
417
418    // Normal metrics that always report to the [ExecutionPlanMetricsSet]:
419    /// Duration to build file ranges.
420    build_parts_cost: Time,
421    /// Duration to build the (merge) reader.
422    build_reader_cost: Time,
423    /// Duration to scan data.
424    scan_cost: Time,
425    /// Duration while waiting for `yield`.
426    yield_cost: Time,
427    /// Duration to convert [`Batch`]es.
428    convert_cost: Time,
429}
430
431impl PartitionMetricsInner {
432    fn on_finish(&self, stream_eof: bool) {
433        let mut metrics = self.metrics.lock().unwrap();
434        if metrics.total_cost.is_zero() {
435            metrics.total_cost = self.query_start.elapsed();
436        }
437        if !metrics.stream_eof {
438            metrics.stream_eof = stream_eof;
439        }
440    }
441}
442
443impl Drop for PartitionMetricsInner {
444    fn drop(&mut self) {
445        self.on_finish(false);
446        let metrics = self.metrics.lock().unwrap();
447        metrics.observe_metrics();
448        self.in_progress_scan.dec();
449
450        if self.explain_verbose {
451            common_telemetry::info!(
452                "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}, convert_batch_costs: {}",
453                self.scanner_type, self.region_id, self.partition, metrics, self.convert_cost,
454            );
455        } else {
456            common_telemetry::debug!(
457                "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}, convert_batch_costs: {}",
458                self.scanner_type, self.region_id, self.partition, metrics, self.convert_cost,
459            );
460        }
461    }
462}
463
464/// List of PartitionMetrics.
465#[derive(Default)]
466pub(crate) struct PartitionMetricsList(Mutex<Vec<Option<PartitionMetrics>>>);
467
468impl PartitionMetricsList {
469    /// Sets a new [PartitionMetrics] at the specified partition.
470    pub(crate) fn set(&self, partition: usize, metrics: PartitionMetrics) {
471        let mut list = self.0.lock().unwrap();
472        if list.len() <= partition {
473            list.resize(partition + 1, None);
474        }
475        list[partition] = Some(metrics);
476    }
477
478    /// Format verbose metrics for each partition for explain.
479    pub(crate) fn format_verbose_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result {
480        let list = self.0.lock().unwrap();
481        write!(f, ", \"metrics_per_partition\": ")?;
482        f.debug_list()
483            .entries(list.iter().filter_map(|p| p.as_ref()))
484            .finish()?;
485        write!(f, "}}")
486    }
487}
488
489/// Metrics while reading a partition.
490#[derive(Clone)]
491pub struct PartitionMetrics(Arc<PartitionMetricsInner>);
492
493impl PartitionMetrics {
494    pub(crate) fn new(
495        region_id: RegionId,
496        partition: usize,
497        scanner_type: &'static str,
498        query_start: Instant,
499        explain_verbose: bool,
500        metrics_set: &ExecutionPlanMetricsSet,
501    ) -> Self {
502        let partition_str = partition.to_string();
503        let in_progress_scan = IN_PROGRESS_SCAN.with_label_values(&[scanner_type, &partition_str]);
504        in_progress_scan.inc();
505        let metrics = ScanMetricsSet::default().with_prepare_scan_cost(query_start.elapsed());
506        let inner = PartitionMetricsInner {
507            region_id,
508            partition,
509            scanner_type,
510            query_start,
511            explain_verbose,
512            metrics: Mutex::new(metrics),
513            in_progress_scan,
514            build_parts_cost: MetricBuilder::new(metrics_set)
515                .subset_time("build_parts_cost", partition),
516            build_reader_cost: MetricBuilder::new(metrics_set)
517                .subset_time("build_reader_cost", partition),
518            scan_cost: MetricBuilder::new(metrics_set).subset_time("scan_cost", partition),
519            yield_cost: MetricBuilder::new(metrics_set).subset_time("yield_cost", partition),
520            convert_cost: MetricBuilder::new(metrics_set).subset_time("convert_cost", partition),
521        };
522        Self(Arc::new(inner))
523    }
524
525    pub(crate) fn on_first_poll(&self) {
526        let mut metrics = self.0.metrics.lock().unwrap();
527        metrics.first_poll = self.0.query_start.elapsed();
528    }
529
530    pub(crate) fn inc_num_mem_ranges(&self, num: usize) {
531        let mut metrics = self.0.metrics.lock().unwrap();
532        metrics.num_mem_ranges += num;
533    }
534
535    pub fn inc_num_file_ranges(&self, num: usize) {
536        let mut metrics = self.0.metrics.lock().unwrap();
537        metrics.num_file_ranges += num;
538    }
539
540    /// Merges `build_reader_cost`.
541    pub(crate) fn inc_build_reader_cost(&self, cost: Duration) {
542        self.0.build_reader_cost.add_duration(cost);
543
544        let mut metrics = self.0.metrics.lock().unwrap();
545        metrics.build_reader_cost += cost;
546    }
547
548    pub(crate) fn inc_convert_batch_cost(&self, cost: Duration) {
549        self.0.convert_cost.add_duration(cost);
550    }
551
552    /// Reports memtable scan metrics.
553    pub(crate) fn report_mem_scan_metrics(&self, data: &crate::memtable::MemScanMetricsData) {
554        let mut metrics = self.0.metrics.lock().unwrap();
555        metrics.mem_scan_cost += data.scan_cost;
556        metrics.mem_rows += data.num_rows;
557        metrics.mem_batches += data.num_batches;
558        metrics.mem_series += data.total_series;
559    }
560
561    /// Merges [ScannerMetrics], `build_reader_cost`, `scan_cost` and `yield_cost`.
562    pub(crate) fn merge_metrics(&self, metrics: &ScannerMetrics) {
563        self.0
564            .build_reader_cost
565            .add_duration(metrics.build_reader_cost);
566        self.0.scan_cost.add_duration(metrics.scan_cost);
567        self.0.yield_cost.add_duration(metrics.yield_cost);
568
569        let mut metrics_set = self.0.metrics.lock().unwrap();
570        metrics_set.merge_scanner_metrics(metrics);
571    }
572
573    /// Merges [ReaderMetrics] and `build_reader_cost`.
574    pub fn merge_reader_metrics(&self, metrics: &ReaderMetrics) {
575        self.0.build_parts_cost.add_duration(metrics.build_cost);
576
577        let mut metrics_set = self.0.metrics.lock().unwrap();
578        metrics_set.merge_reader_metrics(metrics);
579    }
580
581    /// Finishes the query.
582    pub(crate) fn on_finish(&self) {
583        self.0.on_finish(true);
584    }
585
586    /// Sets the distributor metrics.
587    pub(crate) fn set_distributor_metrics(&self, metrics: &SeriesDistributorMetrics) {
588        let mut metrics_set = self.0.metrics.lock().unwrap();
589        metrics_set.set_distributor_metrics(metrics);
590    }
591}
592
593impl fmt::Debug for PartitionMetrics {
594    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
595        let metrics = self.0.metrics.lock().unwrap();
596        write!(
597            f,
598            r#"{{"partition":{}, "metrics":{:?}}}"#,
599            self.0.partition, metrics
600        )
601    }
602}
603
604/// Metrics for the series distributor.
605#[derive(Default)]
606pub(crate) struct SeriesDistributorMetrics {
607    /// Number of send timeout in SeriesScan.
608    pub(crate) num_series_send_timeout: usize,
609    /// Number of send full in SeriesScan.
610    pub(crate) num_series_send_full: usize,
611    /// Number of rows the series distributor scanned.
612    pub(crate) num_rows: usize,
613    /// Number of batches the series distributor scanned.
614    pub(crate) num_batches: usize,
615    /// Duration of the series distributor to scan.
616    pub(crate) scan_cost: Duration,
617    /// Duration of the series distributor to yield.
618    pub(crate) yield_cost: Duration,
619}
620
621/// Scans memtable ranges at `index`.
622pub(crate) fn scan_mem_ranges(
623    stream_ctx: Arc<StreamContext>,
624    part_metrics: PartitionMetrics,
625    index: RowGroupIndex,
626    time_range: FileTimeRange,
627) -> impl Stream<Item = Result<Batch>> {
628    try_stream! {
629        let ranges = stream_ctx.input.build_mem_ranges(index);
630        part_metrics.inc_num_mem_ranges(ranges.len());
631        for range in ranges {
632            let build_reader_start = Instant::now();
633            let mem_scan_metrics = Some(MemScanMetrics::default());
634            let iter = range.build_prune_iter(time_range, mem_scan_metrics.clone())?;
635            part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
636
637            let mut source = Source::Iter(iter);
638            while let Some(batch) = source.next_batch().await? {
639                yield batch;
640            }
641
642            // Report the memtable scan metrics to partition metrics
643            if let Some(ref metrics) = mem_scan_metrics {
644                let data = metrics.data();
645                part_metrics.report_mem_scan_metrics(&data);
646            }
647        }
648    }
649}
650
651/// Scans memtable ranges at `index` using flat format that returns RecordBatch.
652#[allow(dead_code)]
653pub(crate) fn scan_flat_mem_ranges(
654    stream_ctx: Arc<StreamContext>,
655    part_metrics: PartitionMetrics,
656    index: RowGroupIndex,
657) -> impl Stream<Item = Result<RecordBatch>> {
658    try_stream! {
659        let ranges = stream_ctx.input.build_mem_ranges(index);
660        part_metrics.inc_num_mem_ranges(ranges.len());
661        for range in ranges {
662            let build_reader_start = Instant::now();
663            let mem_scan_metrics = Some(MemScanMetrics::default());
664            let mut iter = range.build_record_batch_iter(mem_scan_metrics.clone())?;
665            part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
666
667            while let Some(record_batch) = iter.next().transpose()? {
668                yield record_batch;
669            }
670
671            // Report the memtable scan metrics to partition metrics
672            if let Some(ref metrics) = mem_scan_metrics {
673                let data = metrics.data();
674                part_metrics.report_mem_scan_metrics(&data);
675            }
676        }
677    }
678}
679
680/// Scans file ranges at `index`.
681pub(crate) async fn scan_file_ranges(
682    stream_ctx: Arc<StreamContext>,
683    part_metrics: PartitionMetrics,
684    index: RowGroupIndex,
685    read_type: &'static str,
686    range_builder: Arc<RangeBuilderList>,
687) -> Result<impl Stream<Item = Result<Batch>>> {
688    let mut reader_metrics = ReaderMetrics::default();
689    let ranges = range_builder
690        .build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
691        .await?;
692    part_metrics.inc_num_file_ranges(ranges.len());
693    part_metrics.merge_reader_metrics(&reader_metrics);
694
695    Ok(build_file_range_scan_stream(
696        stream_ctx,
697        part_metrics,
698        read_type,
699        ranges,
700    ))
701}
702
703/// Scans file ranges at `index` using flat reader that returns RecordBatch.
704#[allow(dead_code)]
705pub(crate) async fn scan_flat_file_ranges(
706    stream_ctx: Arc<StreamContext>,
707    part_metrics: PartitionMetrics,
708    index: RowGroupIndex,
709    read_type: &'static str,
710    range_builder: Arc<RangeBuilderList>,
711) -> Result<impl Stream<Item = Result<RecordBatch>>> {
712    let mut reader_metrics = ReaderMetrics::default();
713    let ranges = range_builder
714        .build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
715        .await?;
716    part_metrics.inc_num_file_ranges(ranges.len());
717    part_metrics.merge_reader_metrics(&reader_metrics);
718
719    Ok(build_flat_file_range_scan_stream(
720        stream_ctx,
721        part_metrics,
722        read_type,
723        ranges,
724    ))
725}
726
727/// Build the stream of scanning the input [`FileRange`]s.
728pub fn build_file_range_scan_stream(
729    stream_ctx: Arc<StreamContext>,
730    part_metrics: PartitionMetrics,
731    read_type: &'static str,
732    ranges: SmallVec<[FileRange; 2]>,
733) -> impl Stream<Item = Result<Batch>> {
734    try_stream! {
735        let reader_metrics = &mut ReaderMetrics::default();
736        for range in ranges {
737            let build_reader_start = Instant::now();
738            let reader = range.reader(stream_ctx.input.series_row_selector).await?;
739            let build_cost = build_reader_start.elapsed();
740            part_metrics.inc_build_reader_cost(build_cost);
741            let compat_batch = range.compat_batch();
742            let mut source = Source::PruneReader(reader);
743            while let Some(mut batch) = source.next_batch().await? {
744                if let Some(compact_batch) = compat_batch {
745                    batch = compact_batch.as_primary_key().unwrap().compat_batch(batch)?;
746                }
747                yield batch;
748            }
749            if let Source::PruneReader(reader) = source {
750                let prune_metrics = reader.metrics();
751                reader_metrics.merge_from(&prune_metrics);
752            }
753        }
754
755        // Reports metrics.
756        reader_metrics.observe_rows(read_type);
757        reader_metrics.filter_metrics.observe();
758        part_metrics.merge_reader_metrics(reader_metrics);
759    }
760}
761
762/// Build the stream of scanning the input [`FileRange`]s using flat reader that returns RecordBatch.
763pub fn build_flat_file_range_scan_stream(
764    _stream_ctx: Arc<StreamContext>,
765    part_metrics: PartitionMetrics,
766    read_type: &'static str,
767    ranges: SmallVec<[FileRange; 2]>,
768) -> impl Stream<Item = Result<RecordBatch>> {
769    try_stream! {
770        let reader_metrics = &mut ReaderMetrics::default();
771        for range in ranges {
772            let build_reader_start = Instant::now();
773            let mut reader = range.flat_reader().await?;
774            let build_cost = build_reader_start.elapsed();
775            part_metrics.inc_build_reader_cost(build_cost);
776
777            let may_compat = range
778                .compat_batch()
779                .map(|compat| {
780                    compat.as_flat().context(UnexpectedSnafu {
781                        reason: "Invalid compat for flat format",
782                    })
783                })
784                .transpose()?;
785            while let Some(record_batch) = reader.next_batch()? {
786                if let Some(flat_compat) = may_compat {
787                    let batch = flat_compat.compat(record_batch)?;
788                    yield batch;
789                } else {
790                    yield record_batch;
791                }
792            }
793
794            let prune_metrics = reader.metrics();
795            reader_metrics.merge_from(&prune_metrics);
796        }
797
798        // Reports metrics.
799        reader_metrics.observe_rows(read_type);
800        reader_metrics.filter_metrics.observe();
801        part_metrics.merge_reader_metrics(reader_metrics);
802    }
803}
804
805/// Build the stream of scanning the extension range denoted by the [`RowGroupIndex`].
806#[cfg(feature = "enterprise")]
807pub(crate) async fn scan_extension_range(
808    context: Arc<StreamContext>,
809    index: RowGroupIndex,
810    partition_metrics: PartitionMetrics,
811) -> Result<BoxedBatchStream> {
812    use snafu::ResultExt;
813
814    let range = context.input.extension_range(index.index);
815    let reader = range.reader(context.as_ref());
816    let stream = reader
817        .read(context, partition_metrics, index)
818        .await
819        .context(crate::error::ScanExternalRangeSnafu)?;
820    Ok(stream)
821}
822
823pub(crate) async fn maybe_scan_other_ranges(
824    context: &Arc<StreamContext>,
825    index: RowGroupIndex,
826    metrics: &PartitionMetrics,
827) -> Result<BoxedBatchStream> {
828    #[cfg(feature = "enterprise")]
829    {
830        scan_extension_range(context.clone(), index, metrics.clone()).await
831    }
832
833    #[cfg(not(feature = "enterprise"))]
834    {
835        let _ = context;
836        let _ = index;
837        let _ = metrics;
838
839        crate::error::UnexpectedSnafu {
840            reason: "no other ranges scannable",
841        }
842        .fail()
843    }
844}
845
846#[allow(dead_code)]
847pub(crate) async fn maybe_scan_flat_other_ranges(
848    context: &Arc<StreamContext>,
849    index: RowGroupIndex,
850    metrics: &PartitionMetrics,
851) -> Result<BoxedRecordBatchStream> {
852    let _ = context;
853    let _ = index;
854    let _ = metrics;
855
856    crate::error::UnexpectedSnafu {
857        reason: "no other ranges scannable in flat format",
858    }
859    .fail()
860}