mito2/read/
scan_util.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities for scanners.
16
17use std::collections::{BinaryHeap, HashMap, VecDeque};
18use std::fmt;
19use std::pin::Pin;
20use std::sync::{Arc, Mutex};
21use std::task::{Context, Poll};
22use std::time::{Duration, Instant};
23
24use async_stream::try_stream;
25use common_telemetry::tracing;
26use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time};
27use datatypes::arrow::record_batch::RecordBatch;
28use datatypes::timestamp::timestamp_array_to_primitive;
29use futures::Stream;
30use prometheus::IntGauge;
31use smallvec::SmallVec;
32use snafu::OptionExt;
33use store_api::storage::RegionId;
34
35use crate::error::{Result, UnexpectedSnafu};
36use crate::memtable::MemScanMetrics;
37use crate::metrics::{
38    IN_PROGRESS_SCAN, PRECISE_FILTER_ROWS_TOTAL, READ_BATCHES_RETURN, READ_ROW_GROUPS_TOTAL,
39    READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_RETURN, READ_STAGE_ELAPSED,
40};
41use crate::read::dedup::{DedupMetrics, DedupMetricsReport};
42use crate::read::merge::{MergeMetrics, MergeMetricsReport};
43use crate::read::range::{RangeBuilderList, RangeMeta, RowGroupIndex};
44use crate::read::scan_region::StreamContext;
45use crate::read::{Batch, BoxedBatchStream, BoxedRecordBatchStream, ScannerMetrics, Source};
46use crate::sst::file::{FileTimeRange, RegionFileId};
47use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplyMetrics;
48use crate::sst::index::fulltext_index::applier::FulltextIndexApplyMetrics;
49use crate::sst::index::inverted_index::applier::InvertedIndexApplyMetrics;
50use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
51use crate::sst::parquet::file_range::FileRange;
52use crate::sst::parquet::flat_format::time_index_column_index;
53use crate::sst::parquet::reader::{MetadataCacheMetrics, ReaderFilterMetrics, ReaderMetrics};
54use crate::sst::parquet::row_group::ParquetFetchMetrics;
55
56/// Per-file scan metrics.
57#[derive(Default, Clone)]
58pub struct FileScanMetrics {
59    /// Number of ranges (row groups) read from this file.
60    pub num_ranges: usize,
61    /// Number of rows read from this file.
62    pub num_rows: usize,
63    /// Time spent building file ranges/parts (file-level preparation).
64    pub build_part_cost: Duration,
65    /// Time spent building readers for this file (accumulated across all ranges).
66    pub build_reader_cost: Duration,
67    /// Time spent scanning this file (accumulated across all ranges).
68    pub scan_cost: Duration,
69}
70
71impl fmt::Debug for FileScanMetrics {
72    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73        write!(f, "{{\"build_part_cost\":\"{:?}\"", self.build_part_cost)?;
74
75        if self.num_ranges > 0 {
76            write!(f, ", \"num_ranges\":{}", self.num_ranges)?;
77        }
78        if self.num_rows > 0 {
79            write!(f, ", \"num_rows\":{}", self.num_rows)?;
80        }
81        if !self.build_reader_cost.is_zero() {
82            write!(
83                f,
84                ", \"build_reader_cost\":\"{:?}\"",
85                self.build_reader_cost
86            )?;
87        }
88        if !self.scan_cost.is_zero() {
89            write!(f, ", \"scan_cost\":\"{:?}\"", self.scan_cost)?;
90        }
91
92        write!(f, "}}")
93    }
94}
95
96impl FileScanMetrics {
97    /// Merges another FileMetrics into this one.
98    pub(crate) fn merge_from(&mut self, other: &FileScanMetrics) {
99        self.num_ranges += other.num_ranges;
100        self.num_rows += other.num_rows;
101        self.build_part_cost += other.build_part_cost;
102        self.build_reader_cost += other.build_reader_cost;
103        self.scan_cost += other.scan_cost;
104    }
105}
106
107/// Verbose scan metrics for a partition.
108#[derive(Default)]
109pub(crate) struct ScanMetricsSet {
110    /// Duration to prepare the scan task.
111    prepare_scan_cost: Duration,
112    /// Duration to build the (merge) reader.
113    build_reader_cost: Duration,
114    /// Duration to scan data.
115    scan_cost: Duration,
116    /// Duration while waiting for `yield`.
117    yield_cost: Duration,
118    /// Duration to convert [`Batch`]es.
119    convert_cost: Option<Time>,
120    /// Duration of the scan.
121    total_cost: Duration,
122    /// Number of rows returned.
123    num_rows: usize,
124    /// Number of batches returned.
125    num_batches: usize,
126    /// Number of mem ranges scanned.
127    num_mem_ranges: usize,
128    /// Number of file ranges scanned.
129    num_file_ranges: usize,
130
131    // Memtable related metrics:
132    /// Duration to scan memtables.
133    mem_scan_cost: Duration,
134    /// Number of rows read from memtables.
135    mem_rows: usize,
136    /// Number of batches read from memtables.
137    mem_batches: usize,
138    /// Number of series read from memtables.
139    mem_series: usize,
140
141    // SST related metrics:
142    /// Duration to build file ranges.
143    build_parts_cost: Duration,
144    /// Duration to scan SST files.
145    sst_scan_cost: Duration,
146    /// Number of row groups before filtering.
147    rg_total: usize,
148    /// Number of row groups filtered by fulltext index.
149    rg_fulltext_filtered: usize,
150    /// Number of row groups filtered by inverted index.
151    rg_inverted_filtered: usize,
152    /// Number of row groups filtered by min-max index.
153    rg_minmax_filtered: usize,
154    /// Number of row groups filtered by bloom filter index.
155    rg_bloom_filtered: usize,
156    /// Number of row groups filtered by vector index.
157    rg_vector_filtered: usize,
158    /// Number of rows in row group before filtering.
159    rows_before_filter: usize,
160    /// Number of rows in row group filtered by fulltext index.
161    rows_fulltext_filtered: usize,
162    /// Number of rows in row group filtered by inverted index.
163    rows_inverted_filtered: usize,
164    /// Number of rows in row group filtered by bloom filter index.
165    rows_bloom_filtered: usize,
166    /// Number of rows filtered by vector index.
167    rows_vector_filtered: usize,
168    /// Number of rows selected by vector index.
169    rows_vector_selected: usize,
170    /// Number of rows filtered by precise filter.
171    rows_precise_filtered: usize,
172    /// Number of index result cache hits for fulltext index.
173    fulltext_index_cache_hit: usize,
174    /// Number of index result cache misses for fulltext index.
175    fulltext_index_cache_miss: usize,
176    /// Number of index result cache hits for inverted index.
177    inverted_index_cache_hit: usize,
178    /// Number of index result cache misses for inverted index.
179    inverted_index_cache_miss: usize,
180    /// Number of index result cache hits for bloom filter index.
181    bloom_filter_cache_hit: usize,
182    /// Number of index result cache misses for bloom filter index.
183    bloom_filter_cache_miss: usize,
184    /// Number of record batches read from SST.
185    num_sst_record_batches: usize,
186    /// Number of batches decoded from SST.
187    num_sst_batches: usize,
188    /// Number of rows read from SST.
189    num_sst_rows: usize,
190
191    /// Elapsed time before the first poll operation.
192    first_poll: Duration,
193
194    /// Number of send timeout in SeriesScan.
195    num_series_send_timeout: usize,
196    /// Number of send full in SeriesScan.
197    num_series_send_full: usize,
198    /// Number of rows the series distributor scanned.
199    num_distributor_rows: usize,
200    /// Number of batches the series distributor scanned.
201    num_distributor_batches: usize,
202    /// Duration of the series distributor to scan.
203    distributor_scan_cost: Duration,
204    /// Duration of the series distributor to yield.
205    distributor_yield_cost: Duration,
206    /// Duration spent in divider operations.
207    distributor_divider_cost: Duration,
208
209    /// Merge metrics.
210    merge_metrics: MergeMetrics,
211    /// Dedup metrics.
212    dedup_metrics: DedupMetrics,
213
214    /// The stream reached EOF
215    stream_eof: bool,
216
217    // Optional verbose metrics:
218    /// Inverted index apply metrics.
219    inverted_index_apply_metrics: Option<InvertedIndexApplyMetrics>,
220    /// Bloom filter index apply metrics.
221    bloom_filter_apply_metrics: Option<BloomFilterIndexApplyMetrics>,
222    /// Fulltext index apply metrics.
223    fulltext_index_apply_metrics: Option<FulltextIndexApplyMetrics>,
224    /// Parquet fetch metrics.
225    fetch_metrics: Option<ParquetFetchMetrics>,
226    /// Metadata cache metrics.
227    metadata_cache_metrics: Option<MetadataCacheMetrics>,
228    /// Per-file scan metrics, only populated when explain_verbose is true.
229    per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
230
231    /// Current memory usage for file range builders.
232    build_ranges_mem_size: isize,
233    /// Peak memory usage for file range builders.
234    build_ranges_peak_mem_size: isize,
235    /// Current number of file range builders.
236    num_range_builders: isize,
237    /// Peak number of file range builders.
238    num_peak_range_builders: isize,
239}
240
241/// Wrapper for file metrics that compares by total cost in reverse order.
242/// This allows using BinaryHeap as a min-heap for efficient top-K selection.
243struct CompareCostReverse<'a> {
244    total_cost: Duration,
245    file_id: RegionFileId,
246    metrics: &'a FileScanMetrics,
247}
248
249impl Ord for CompareCostReverse<'_> {
250    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
251        // Reverse comparison: smaller costs are "greater"
252        other.total_cost.cmp(&self.total_cost)
253    }
254}
255
256impl PartialOrd for CompareCostReverse<'_> {
257    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
258        Some(self.cmp(other))
259    }
260}
261
262impl Eq for CompareCostReverse<'_> {}
263
264impl PartialEq for CompareCostReverse<'_> {
265    fn eq(&self, other: &Self) -> bool {
266        self.total_cost == other.total_cost
267    }
268}
269
270impl fmt::Debug for ScanMetricsSet {
271    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
272        let ScanMetricsSet {
273            prepare_scan_cost,
274            build_reader_cost,
275            scan_cost,
276            yield_cost,
277            convert_cost,
278            total_cost,
279            num_rows,
280            num_batches,
281            num_mem_ranges,
282            num_file_ranges,
283            build_parts_cost,
284            sst_scan_cost,
285            rg_total,
286            rg_fulltext_filtered,
287            rg_inverted_filtered,
288            rg_minmax_filtered,
289            rg_bloom_filtered,
290            rg_vector_filtered,
291            rows_before_filter,
292            rows_fulltext_filtered,
293            rows_inverted_filtered,
294            rows_bloom_filtered,
295            rows_vector_filtered,
296            rows_vector_selected,
297            rows_precise_filtered,
298            fulltext_index_cache_hit,
299            fulltext_index_cache_miss,
300            inverted_index_cache_hit,
301            inverted_index_cache_miss,
302            bloom_filter_cache_hit,
303            bloom_filter_cache_miss,
304            num_sst_record_batches,
305            num_sst_batches,
306            num_sst_rows,
307            first_poll,
308            num_series_send_timeout,
309            num_series_send_full,
310            num_distributor_rows,
311            num_distributor_batches,
312            distributor_scan_cost,
313            distributor_yield_cost,
314            distributor_divider_cost,
315            merge_metrics,
316            dedup_metrics,
317            stream_eof,
318            mem_scan_cost,
319            mem_rows,
320            mem_batches,
321            mem_series,
322            inverted_index_apply_metrics,
323            bloom_filter_apply_metrics,
324            fulltext_index_apply_metrics,
325            fetch_metrics,
326            metadata_cache_metrics,
327            per_file_metrics,
328            build_ranges_mem_size: _,
329            build_ranges_peak_mem_size,
330            num_range_builders: _,
331            num_peak_range_builders,
332        } = self;
333
334        // Write core metrics
335        write!(
336            f,
337            "{{\"prepare_scan_cost\":\"{prepare_scan_cost:?}\", \
338            \"build_reader_cost\":\"{build_reader_cost:?}\", \
339            \"scan_cost\":\"{scan_cost:?}\", \
340            \"yield_cost\":\"{yield_cost:?}\", \
341            \"total_cost\":\"{total_cost:?}\", \
342            \"num_rows\":{num_rows}, \
343            \"num_batches\":{num_batches}, \
344            \"num_mem_ranges\":{num_mem_ranges}, \
345            \"num_file_ranges\":{num_file_ranges}, \
346            \"build_parts_cost\":\"{build_parts_cost:?}\", \
347            \"sst_scan_cost\":\"{sst_scan_cost:?}\", \
348            \"rg_total\":{rg_total}, \
349            \"rows_before_filter\":{rows_before_filter}, \
350            \"num_sst_record_batches\":{num_sst_record_batches}, \
351            \"num_sst_batches\":{num_sst_batches}, \
352            \"num_sst_rows\":{num_sst_rows}, \
353            \"first_poll\":\"{first_poll:?}\""
354        )?;
355
356        // Write convert_cost if present
357        if let Some(time) = convert_cost {
358            let duration = Duration::from_nanos(time.value() as u64);
359            write!(f, ", \"convert_cost\":\"{duration:?}\"")?;
360        }
361
362        // Write non-zero filter counters
363        if *rg_fulltext_filtered > 0 {
364            write!(f, ", \"rg_fulltext_filtered\":{rg_fulltext_filtered}")?;
365        }
366        if *rg_inverted_filtered > 0 {
367            write!(f, ", \"rg_inverted_filtered\":{rg_inverted_filtered}")?;
368        }
369        if *rg_minmax_filtered > 0 {
370            write!(f, ", \"rg_minmax_filtered\":{rg_minmax_filtered}")?;
371        }
372        if *rg_bloom_filtered > 0 {
373            write!(f, ", \"rg_bloom_filtered\":{rg_bloom_filtered}")?;
374        }
375        if *rg_vector_filtered > 0 {
376            write!(f, ", \"rg_vector_filtered\":{rg_vector_filtered}")?;
377        }
378        if *rows_fulltext_filtered > 0 {
379            write!(f, ", \"rows_fulltext_filtered\":{rows_fulltext_filtered}")?;
380        }
381        if *rows_inverted_filtered > 0 {
382            write!(f, ", \"rows_inverted_filtered\":{rows_inverted_filtered}")?;
383        }
384        if *rows_bloom_filtered > 0 {
385            write!(f, ", \"rows_bloom_filtered\":{rows_bloom_filtered}")?;
386        }
387        if *rows_vector_filtered > 0 {
388            write!(f, ", \"rows_vector_filtered\":{rows_vector_filtered}")?;
389        }
390        if *rows_vector_selected > 0 {
391            write!(f, ", \"rows_vector_selected\":{rows_vector_selected}")?;
392        }
393        if *rows_precise_filtered > 0 {
394            write!(f, ", \"rows_precise_filtered\":{rows_precise_filtered}")?;
395        }
396        if *fulltext_index_cache_hit > 0 {
397            write!(
398                f,
399                ", \"fulltext_index_cache_hit\":{fulltext_index_cache_hit}"
400            )?;
401        }
402        if *fulltext_index_cache_miss > 0 {
403            write!(
404                f,
405                ", \"fulltext_index_cache_miss\":{fulltext_index_cache_miss}"
406            )?;
407        }
408        if *inverted_index_cache_hit > 0 {
409            write!(
410                f,
411                ", \"inverted_index_cache_hit\":{inverted_index_cache_hit}"
412            )?;
413        }
414        if *inverted_index_cache_miss > 0 {
415            write!(
416                f,
417                ", \"inverted_index_cache_miss\":{inverted_index_cache_miss}"
418            )?;
419        }
420        if *bloom_filter_cache_hit > 0 {
421            write!(f, ", \"bloom_filter_cache_hit\":{bloom_filter_cache_hit}")?;
422        }
423        if *bloom_filter_cache_miss > 0 {
424            write!(f, ", \"bloom_filter_cache_miss\":{bloom_filter_cache_miss}")?;
425        }
426
427        // Write non-zero distributor metrics
428        if *num_series_send_timeout > 0 {
429            write!(f, ", \"num_series_send_timeout\":{num_series_send_timeout}")?;
430        }
431        if *num_series_send_full > 0 {
432            write!(f, ", \"num_series_send_full\":{num_series_send_full}")?;
433        }
434        if *num_distributor_rows > 0 {
435            write!(f, ", \"num_distributor_rows\":{num_distributor_rows}")?;
436        }
437        if *num_distributor_batches > 0 {
438            write!(f, ", \"num_distributor_batches\":{num_distributor_batches}")?;
439        }
440        if !distributor_scan_cost.is_zero() {
441            write!(
442                f,
443                ", \"distributor_scan_cost\":\"{distributor_scan_cost:?}\""
444            )?;
445        }
446        if !distributor_yield_cost.is_zero() {
447            write!(
448                f,
449                ", \"distributor_yield_cost\":\"{distributor_yield_cost:?}\""
450            )?;
451        }
452        if !distributor_divider_cost.is_zero() {
453            write!(
454                f,
455                ", \"distributor_divider_cost\":\"{distributor_divider_cost:?}\""
456            )?;
457        }
458
459        // Write non-zero memtable metrics
460        if *mem_rows > 0 {
461            write!(f, ", \"mem_rows\":{mem_rows}")?;
462        }
463        if *mem_batches > 0 {
464            write!(f, ", \"mem_batches\":{mem_batches}")?;
465        }
466        if *mem_series > 0 {
467            write!(f, ", \"mem_series\":{mem_series}")?;
468        }
469        if !mem_scan_cost.is_zero() {
470            write!(f, ", \"mem_scan_cost\":\"{mem_scan_cost:?}\"")?;
471        }
472
473        // Write optional verbose metrics if they are not empty
474        if let Some(metrics) = inverted_index_apply_metrics
475            && !metrics.is_empty()
476        {
477            write!(f, ", \"inverted_index_apply_metrics\":{:?}", metrics)?;
478        }
479        if let Some(metrics) = bloom_filter_apply_metrics
480            && !metrics.is_empty()
481        {
482            write!(f, ", \"bloom_filter_apply_metrics\":{:?}", metrics)?;
483        }
484        if let Some(metrics) = fulltext_index_apply_metrics
485            && !metrics.is_empty()
486        {
487            write!(f, ", \"fulltext_index_apply_metrics\":{:?}", metrics)?;
488        }
489        if let Some(metrics) = fetch_metrics
490            && !metrics.is_empty()
491        {
492            write!(f, ", \"fetch_metrics\":{:?}", metrics)?;
493        }
494        if let Some(metrics) = metadata_cache_metrics
495            && !metrics.is_empty()
496        {
497            write!(f, ", \"metadata_cache_metrics\":{:?}", metrics)?;
498        }
499
500        // Write merge metrics if not empty
501        if !merge_metrics.scan_cost.is_zero() {
502            write!(f, ", \"merge_metrics\":{:?}", merge_metrics)?;
503        }
504
505        // Write dedup metrics if not empty
506        if !dedup_metrics.dedup_cost.is_zero() {
507            write!(f, ", \"dedup_metrics\":{:?}", dedup_metrics)?;
508        }
509
510        // Write top file metrics if present and non-empty
511        if let Some(file_metrics) = per_file_metrics
512            && !file_metrics.is_empty()
513        {
514            // Use min-heap (BinaryHeap with reverse comparison) to keep only top 10
515            let mut heap = BinaryHeap::new();
516            for (file_id, metrics) in file_metrics.iter() {
517                let total_cost =
518                    metrics.build_part_cost + metrics.build_reader_cost + metrics.scan_cost;
519
520                if heap.len() < 10 {
521                    // Haven't reached 10 yet, just push
522                    heap.push(CompareCostReverse {
523                        total_cost,
524                        file_id: *file_id,
525                        metrics,
526                    });
527                } else if let Some(min_entry) = heap.peek() {
528                    // If current cost is higher than the minimum in our top-10, replace it
529                    if total_cost > min_entry.total_cost {
530                        heap.pop();
531                        heap.push(CompareCostReverse {
532                            total_cost,
533                            file_id: *file_id,
534                            metrics,
535                        });
536                    }
537                }
538            }
539
540            let top_files = heap.into_sorted_vec();
541            write!(f, ", \"top_file_metrics\": {{")?;
542            for (i, item) in top_files.iter().enumerate() {
543                let CompareCostReverse {
544                    total_cost: _,
545                    file_id,
546                    metrics,
547                } = item;
548                if i > 0 {
549                    write!(f, ", ")?;
550                }
551                write!(f, "\"{}\": {:?}", file_id, metrics)?;
552            }
553            write!(f, "}}")?;
554        }
555
556        write!(
557            f,
558            ", \"build_ranges_peak_mem_size\":{build_ranges_peak_mem_size}, \
559             \"num_peak_range_builders\":{num_peak_range_builders}, \
560             \"stream_eof\":{stream_eof}}}"
561        )
562    }
563}
564impl ScanMetricsSet {
565    /// Attaches the `prepare_scan_cost` to the metrics set.
566    fn with_prepare_scan_cost(mut self, cost: Duration) -> Self {
567        self.prepare_scan_cost += cost;
568        self
569    }
570
571    /// Attaches the `convert_cost` to the metrics set.
572    fn with_convert_cost(mut self, time: Time) -> Self {
573        self.convert_cost = Some(time);
574        self
575    }
576
577    /// Merges the local scanner metrics.
578    fn merge_scanner_metrics(&mut self, other: &ScannerMetrics) {
579        let ScannerMetrics {
580            scan_cost,
581            yield_cost,
582            num_batches,
583            num_rows,
584        } = other;
585
586        self.scan_cost += *scan_cost;
587        self.yield_cost += *yield_cost;
588        self.num_rows += *num_rows;
589        self.num_batches += *num_batches;
590    }
591
592    /// Merges the local reader metrics.
593    fn merge_reader_metrics(&mut self, other: &ReaderMetrics) {
594        let ReaderMetrics {
595            build_cost,
596            filter_metrics:
597                ReaderFilterMetrics {
598                    rg_total,
599                    rg_fulltext_filtered,
600                    rg_inverted_filtered,
601                    rg_minmax_filtered,
602                    rg_bloom_filtered,
603                    rg_vector_filtered,
604                    rows_total,
605                    rows_fulltext_filtered,
606                    rows_inverted_filtered,
607                    rows_bloom_filtered,
608                    rows_vector_filtered,
609                    rows_vector_selected,
610                    rows_precise_filtered,
611                    fulltext_index_cache_hit,
612                    fulltext_index_cache_miss,
613                    inverted_index_cache_hit,
614                    inverted_index_cache_miss,
615                    bloom_filter_cache_hit,
616                    bloom_filter_cache_miss,
617                    inverted_index_apply_metrics,
618                    bloom_filter_apply_metrics,
619                    fulltext_index_apply_metrics,
620                },
621            num_record_batches,
622            num_batches,
623            num_rows,
624            scan_cost,
625            metadata_cache_metrics,
626            fetch_metrics,
627            metadata_mem_size,
628            num_range_builders,
629        } = other;
630
631        self.build_parts_cost += *build_cost;
632        self.sst_scan_cost += *scan_cost;
633
634        self.rg_total += *rg_total;
635        self.rg_fulltext_filtered += *rg_fulltext_filtered;
636        self.rg_inverted_filtered += *rg_inverted_filtered;
637        self.rg_minmax_filtered += *rg_minmax_filtered;
638        self.rg_bloom_filtered += *rg_bloom_filtered;
639        self.rg_vector_filtered += *rg_vector_filtered;
640
641        self.rows_before_filter += *rows_total;
642        self.rows_fulltext_filtered += *rows_fulltext_filtered;
643        self.rows_inverted_filtered += *rows_inverted_filtered;
644        self.rows_bloom_filtered += *rows_bloom_filtered;
645        self.rows_vector_filtered += *rows_vector_filtered;
646        self.rows_vector_selected += *rows_vector_selected;
647        self.rows_precise_filtered += *rows_precise_filtered;
648
649        self.fulltext_index_cache_hit += *fulltext_index_cache_hit;
650        self.fulltext_index_cache_miss += *fulltext_index_cache_miss;
651        self.inverted_index_cache_hit += *inverted_index_cache_hit;
652        self.inverted_index_cache_miss += *inverted_index_cache_miss;
653        self.bloom_filter_cache_hit += *bloom_filter_cache_hit;
654        self.bloom_filter_cache_miss += *bloom_filter_cache_miss;
655
656        self.num_sst_record_batches += *num_record_batches;
657        self.num_sst_batches += *num_batches;
658        self.num_sst_rows += *num_rows;
659
660        // Merge optional verbose metrics
661        if let Some(metrics) = inverted_index_apply_metrics {
662            self.inverted_index_apply_metrics
663                .get_or_insert_with(InvertedIndexApplyMetrics::default)
664                .merge_from(metrics);
665        }
666        if let Some(metrics) = bloom_filter_apply_metrics {
667            self.bloom_filter_apply_metrics
668                .get_or_insert_with(BloomFilterIndexApplyMetrics::default)
669                .merge_from(metrics);
670        }
671        if let Some(metrics) = fulltext_index_apply_metrics {
672            self.fulltext_index_apply_metrics
673                .get_or_insert_with(FulltextIndexApplyMetrics::default)
674                .merge_from(metrics);
675        }
676        if let Some(metrics) = fetch_metrics {
677            self.fetch_metrics
678                .get_or_insert_with(ParquetFetchMetrics::default)
679                .merge_from(metrics);
680        }
681        self.metadata_cache_metrics
682            .get_or_insert_with(MetadataCacheMetrics::default)
683            .merge_from(metadata_cache_metrics);
684
685        // Track memory usage and update peak.
686        self.build_ranges_mem_size += *metadata_mem_size;
687        if self.build_ranges_mem_size > self.build_ranges_peak_mem_size {
688            self.build_ranges_peak_mem_size = self.build_ranges_mem_size;
689        }
690
691        // Track number of builders and update peak.
692        self.num_range_builders += *num_range_builders;
693        if self.num_range_builders > self.num_peak_range_builders {
694            self.num_peak_range_builders = self.num_range_builders;
695        }
696    }
697
698    /// Merges per-file metrics.
699    fn merge_per_file_metrics(&mut self, other: &HashMap<RegionFileId, FileScanMetrics>) {
700        let self_file_metrics = self.per_file_metrics.get_or_insert_with(HashMap::new);
701        for (file_id, metrics) in other {
702            self_file_metrics
703                .entry(*file_id)
704                .or_default()
705                .merge_from(metrics);
706        }
707    }
708
709    /// Sets distributor metrics.
710    fn set_distributor_metrics(&mut self, distributor_metrics: &SeriesDistributorMetrics) {
711        let SeriesDistributorMetrics {
712            num_series_send_timeout,
713            num_series_send_full,
714            num_rows,
715            num_batches,
716            scan_cost,
717            yield_cost,
718            divider_cost,
719        } = distributor_metrics;
720
721        self.num_series_send_timeout += *num_series_send_timeout;
722        self.num_series_send_full += *num_series_send_full;
723        self.num_distributor_rows += *num_rows;
724        self.num_distributor_batches += *num_batches;
725        self.distributor_scan_cost += *scan_cost;
726        self.distributor_yield_cost += *yield_cost;
727        self.distributor_divider_cost += *divider_cost;
728    }
729
730    /// Observes metrics.
731    fn observe_metrics(&self) {
732        READ_STAGE_ELAPSED
733            .with_label_values(&["prepare_scan"])
734            .observe(self.prepare_scan_cost.as_secs_f64());
735        READ_STAGE_ELAPSED
736            .with_label_values(&["build_reader"])
737            .observe(self.build_reader_cost.as_secs_f64());
738        READ_STAGE_ELAPSED
739            .with_label_values(&["scan"])
740            .observe(self.scan_cost.as_secs_f64());
741        READ_STAGE_ELAPSED
742            .with_label_values(&["yield"])
743            .observe(self.yield_cost.as_secs_f64());
744        if let Some(time) = &self.convert_cost {
745            READ_STAGE_ELAPSED
746                .with_label_values(&["convert"])
747                .observe(Duration::from_nanos(time.value() as u64).as_secs_f64());
748        }
749        READ_STAGE_ELAPSED
750            .with_label_values(&["total"])
751            .observe(self.total_cost.as_secs_f64());
752        READ_ROWS_RETURN.observe(self.num_rows as f64);
753        READ_BATCHES_RETURN.observe(self.num_batches as f64);
754
755        READ_STAGE_ELAPSED
756            .with_label_values(&["build_parts"])
757            .observe(self.build_parts_cost.as_secs_f64());
758
759        READ_ROW_GROUPS_TOTAL
760            .with_label_values(&["before_filtering"])
761            .inc_by(self.rg_total as u64);
762        READ_ROW_GROUPS_TOTAL
763            .with_label_values(&["fulltext_index_filtered"])
764            .inc_by(self.rg_fulltext_filtered as u64);
765        READ_ROW_GROUPS_TOTAL
766            .with_label_values(&["inverted_index_filtered"])
767            .inc_by(self.rg_inverted_filtered as u64);
768        READ_ROW_GROUPS_TOTAL
769            .with_label_values(&["minmax_index_filtered"])
770            .inc_by(self.rg_minmax_filtered as u64);
771        READ_ROW_GROUPS_TOTAL
772            .with_label_values(&["bloom_filter_index_filtered"])
773            .inc_by(self.rg_bloom_filtered as u64);
774        #[cfg(feature = "vector_index")]
775        READ_ROW_GROUPS_TOTAL
776            .with_label_values(&["vector_index_filtered"])
777            .inc_by(self.rg_vector_filtered as u64);
778
779        PRECISE_FILTER_ROWS_TOTAL
780            .with_label_values(&["parquet"])
781            .inc_by(self.rows_precise_filtered as u64);
782        READ_ROWS_IN_ROW_GROUP_TOTAL
783            .with_label_values(&["before_filtering"])
784            .inc_by(self.rows_before_filter as u64);
785        READ_ROWS_IN_ROW_GROUP_TOTAL
786            .with_label_values(&["fulltext_index_filtered"])
787            .inc_by(self.rows_fulltext_filtered as u64);
788        READ_ROWS_IN_ROW_GROUP_TOTAL
789            .with_label_values(&["inverted_index_filtered"])
790            .inc_by(self.rows_inverted_filtered as u64);
791        READ_ROWS_IN_ROW_GROUP_TOTAL
792            .with_label_values(&["bloom_filter_index_filtered"])
793            .inc_by(self.rows_bloom_filtered as u64);
794        #[cfg(feature = "vector_index")]
795        READ_ROWS_IN_ROW_GROUP_TOTAL
796            .with_label_values(&["vector_index_filtered"])
797            .inc_by(self.rows_vector_filtered as u64);
798    }
799}
800
801struct PartitionMetricsInner {
802    region_id: RegionId,
803    /// Index of the partition to scan.
804    partition: usize,
805    /// Label to distinguish different scan operation.
806    scanner_type: &'static str,
807    /// Query start time.
808    query_start: Instant,
809    /// Whether to use verbose logging.
810    explain_verbose: bool,
811    /// Verbose scan metrics that only log to debug logs by default.
812    metrics: Mutex<ScanMetricsSet>,
813    in_progress_scan: IntGauge,
814
815    // Normal metrics that always report to the [ExecutionPlanMetricsSet]:
816    /// Duration to build file ranges.
817    build_parts_cost: Time,
818    /// Duration to build the (merge) reader.
819    build_reader_cost: Time,
820    /// Duration to scan data.
821    scan_cost: Time,
822    /// Duration while waiting for `yield`.
823    yield_cost: Time,
824    /// Duration to convert [`Batch`]es.
825    convert_cost: Time,
826    /// Aggregated compute time reported to DataFusion.
827    elapsed_compute: Time,
828}
829
830impl PartitionMetricsInner {
831    fn on_finish(&self, stream_eof: bool) {
832        let mut metrics = self.metrics.lock().unwrap();
833        if metrics.total_cost.is_zero() {
834            metrics.total_cost = self.query_start.elapsed();
835        }
836        if !metrics.stream_eof {
837            metrics.stream_eof = stream_eof;
838        }
839    }
840}
841
842impl MergeMetricsReport for PartitionMetricsInner {
843    fn report(&self, metrics: &mut MergeMetrics) {
844        let mut scan_metrics = self.metrics.lock().unwrap();
845        // Merge the metrics into scan_metrics
846        scan_metrics.merge_metrics.merge(metrics);
847
848        // Reset the input metrics
849        *metrics = MergeMetrics::default();
850    }
851}
852
853impl DedupMetricsReport for PartitionMetricsInner {
854    fn report(&self, metrics: &mut DedupMetrics) {
855        let mut scan_metrics = self.metrics.lock().unwrap();
856        // Merge the metrics into scan_metrics
857        scan_metrics.dedup_metrics.merge(metrics);
858
859        // Reset the input metrics
860        *metrics = DedupMetrics::default();
861    }
862}
863
864impl Drop for PartitionMetricsInner {
865    fn drop(&mut self) {
866        self.on_finish(false);
867        let metrics = self.metrics.lock().unwrap();
868        metrics.observe_metrics();
869        self.in_progress_scan.dec();
870
871        if self.explain_verbose {
872            common_telemetry::info!(
873                "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}",
874                self.scanner_type,
875                self.region_id,
876                self.partition,
877                metrics,
878            );
879        } else {
880            common_telemetry::debug!(
881                "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}",
882                self.scanner_type,
883                self.region_id,
884                self.partition,
885                metrics,
886            );
887        }
888    }
889}
890
891/// List of PartitionMetrics.
892#[derive(Default)]
893pub(crate) struct PartitionMetricsList(Mutex<Vec<Option<PartitionMetrics>>>);
894
895impl PartitionMetricsList {
896    /// Sets a new [PartitionMetrics] at the specified partition.
897    pub(crate) fn set(&self, partition: usize, metrics: PartitionMetrics) {
898        let mut list = self.0.lock().unwrap();
899        if list.len() <= partition {
900            list.resize(partition + 1, None);
901        }
902        list[partition] = Some(metrics);
903    }
904
905    /// Format verbose metrics for each partition for explain.
906    pub(crate) fn format_verbose_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result {
907        let list = self.0.lock().unwrap();
908        write!(f, ", \"metrics_per_partition\": ")?;
909        f.debug_list()
910            .entries(list.iter().filter_map(|p| p.as_ref()))
911            .finish()?;
912        write!(f, "}}")
913    }
914}
915
916/// Metrics while reading a partition.
917#[derive(Clone)]
918pub struct PartitionMetrics(Arc<PartitionMetricsInner>);
919
920impl PartitionMetrics {
921    pub(crate) fn new(
922        region_id: RegionId,
923        partition: usize,
924        scanner_type: &'static str,
925        query_start: Instant,
926        explain_verbose: bool,
927        metrics_set: &ExecutionPlanMetricsSet,
928    ) -> Self {
929        let partition_str = partition.to_string();
930        let in_progress_scan = IN_PROGRESS_SCAN.with_label_values(&[scanner_type, &partition_str]);
931        in_progress_scan.inc();
932        let convert_cost = MetricBuilder::new(metrics_set).subset_time("convert_cost", partition);
933        let metrics = ScanMetricsSet::default()
934            .with_prepare_scan_cost(query_start.elapsed())
935            .with_convert_cost(convert_cost.clone());
936        let inner = PartitionMetricsInner {
937            region_id,
938            partition,
939            scanner_type,
940            query_start,
941            explain_verbose,
942            metrics: Mutex::new(metrics),
943            in_progress_scan,
944            build_parts_cost: MetricBuilder::new(metrics_set)
945                .subset_time("build_parts_cost", partition),
946            build_reader_cost: MetricBuilder::new(metrics_set)
947                .subset_time("build_reader_cost", partition),
948            scan_cost: MetricBuilder::new(metrics_set).subset_time("scan_cost", partition),
949            yield_cost: MetricBuilder::new(metrics_set).subset_time("yield_cost", partition),
950            convert_cost,
951            elapsed_compute: MetricBuilder::new(metrics_set).elapsed_compute(partition),
952        };
953        Self(Arc::new(inner))
954    }
955
956    pub(crate) fn on_first_poll(&self) {
957        let mut metrics = self.0.metrics.lock().unwrap();
958        metrics.first_poll = self.0.query_start.elapsed();
959    }
960
961    pub(crate) fn inc_num_mem_ranges(&self, num: usize) {
962        let mut metrics = self.0.metrics.lock().unwrap();
963        metrics.num_mem_ranges += num;
964    }
965
966    pub fn inc_num_file_ranges(&self, num: usize) {
967        let mut metrics = self.0.metrics.lock().unwrap();
968        metrics.num_file_ranges += num;
969    }
970
971    fn record_elapsed_compute(&self, duration: Duration) {
972        if duration.is_zero() {
973            return;
974        }
975        self.0.elapsed_compute.add_duration(duration);
976    }
977
978    /// Merges `build_reader_cost`.
979    pub(crate) fn inc_build_reader_cost(&self, cost: Duration) {
980        self.0.build_reader_cost.add_duration(cost);
981
982        let mut metrics = self.0.metrics.lock().unwrap();
983        metrics.build_reader_cost += cost;
984    }
985
986    pub(crate) fn inc_convert_batch_cost(&self, cost: Duration) {
987        self.0.convert_cost.add_duration(cost);
988        self.record_elapsed_compute(cost);
989    }
990
991    /// Reports memtable scan metrics.
992    pub(crate) fn report_mem_scan_metrics(&self, data: &crate::memtable::MemScanMetricsData) {
993        let mut metrics = self.0.metrics.lock().unwrap();
994        metrics.mem_scan_cost += data.scan_cost;
995        metrics.mem_rows += data.num_rows;
996        metrics.mem_batches += data.num_batches;
997        metrics.mem_series += data.total_series;
998    }
999
1000    /// Merges [ScannerMetrics], `build_reader_cost`, `scan_cost` and `yield_cost`.
1001    pub(crate) fn merge_metrics(&self, metrics: &ScannerMetrics) {
1002        self.0.scan_cost.add_duration(metrics.scan_cost);
1003        self.record_elapsed_compute(metrics.scan_cost);
1004        self.0.yield_cost.add_duration(metrics.yield_cost);
1005        self.record_elapsed_compute(metrics.yield_cost);
1006
1007        let mut metrics_set = self.0.metrics.lock().unwrap();
1008        metrics_set.merge_scanner_metrics(metrics);
1009    }
1010
1011    /// Merges [ReaderMetrics] and `build_reader_cost`.
1012    pub fn merge_reader_metrics(
1013        &self,
1014        metrics: &ReaderMetrics,
1015        per_file_metrics: Option<&HashMap<RegionFileId, FileScanMetrics>>,
1016    ) {
1017        self.0.build_parts_cost.add_duration(metrics.build_cost);
1018
1019        let mut metrics_set = self.0.metrics.lock().unwrap();
1020        metrics_set.merge_reader_metrics(metrics);
1021
1022        // Merge per-file metrics if provided
1023        if let Some(file_metrics) = per_file_metrics {
1024            metrics_set.merge_per_file_metrics(file_metrics);
1025        }
1026    }
1027
1028    /// Finishes the query.
1029    pub(crate) fn on_finish(&self) {
1030        self.0.on_finish(true);
1031    }
1032
1033    /// Sets the distributor metrics.
1034    pub(crate) fn set_distributor_metrics(&self, metrics: &SeriesDistributorMetrics) {
1035        let mut metrics_set = self.0.metrics.lock().unwrap();
1036        metrics_set.set_distributor_metrics(metrics);
1037    }
1038
1039    /// Returns whether verbose explain is enabled.
1040    pub(crate) fn explain_verbose(&self) -> bool {
1041        self.0.explain_verbose
1042    }
1043
1044    /// Returns a MergeMetricsReport trait object for reporting merge metrics.
1045    pub(crate) fn merge_metrics_reporter(&self) -> Arc<dyn MergeMetricsReport> {
1046        self.0.clone()
1047    }
1048
1049    /// Returns a DedupMetricsReport trait object for reporting dedup metrics.
1050    pub(crate) fn dedup_metrics_reporter(&self) -> Arc<dyn DedupMetricsReport> {
1051        self.0.clone()
1052    }
1053}
1054
1055impl fmt::Debug for PartitionMetrics {
1056    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1057        let metrics = self.0.metrics.lock().unwrap();
1058        write!(
1059            f,
1060            r#"{{"partition":{}, "metrics":{:?}}}"#,
1061            self.0.partition, metrics
1062        )
1063    }
1064}
1065
1066/// Metrics for the series distributor.
1067#[derive(Default)]
1068pub(crate) struct SeriesDistributorMetrics {
1069    /// Number of send timeout in SeriesScan.
1070    pub(crate) num_series_send_timeout: usize,
1071    /// Number of send full in SeriesScan.
1072    pub(crate) num_series_send_full: usize,
1073    /// Number of rows the series distributor scanned.
1074    pub(crate) num_rows: usize,
1075    /// Number of batches the series distributor scanned.
1076    pub(crate) num_batches: usize,
1077    /// Duration of the series distributor to scan.
1078    pub(crate) scan_cost: Duration,
1079    /// Duration of the series distributor to yield.
1080    pub(crate) yield_cost: Duration,
1081    /// Duration spent in divider operations.
1082    pub(crate) divider_cost: Duration,
1083}
1084
1085/// Scans memtable ranges at `index`.
1086#[tracing::instrument(
1087    skip_all,
1088    fields(
1089        region_id = %stream_ctx.input.region_metadata().region_id,
1090        file_or_mem_index = %index.index,
1091        row_group_index = %index.row_group_index,
1092        source = "mem"
1093    )
1094)]
1095pub(crate) fn scan_mem_ranges(
1096    stream_ctx: Arc<StreamContext>,
1097    part_metrics: PartitionMetrics,
1098    index: RowGroupIndex,
1099    time_range: FileTimeRange,
1100) -> impl Stream<Item = Result<Batch>> {
1101    try_stream! {
1102        let ranges = stream_ctx.input.build_mem_ranges(index);
1103        part_metrics.inc_num_mem_ranges(ranges.len());
1104        for range in ranges {
1105            let build_reader_start = Instant::now();
1106            let mem_scan_metrics = Some(MemScanMetrics::default());
1107            let iter = range.build_prune_iter(time_range, mem_scan_metrics.clone())?;
1108            part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
1109
1110            let mut source = Source::Iter(iter);
1111            while let Some(batch) = source.next_batch().await? {
1112                yield batch;
1113            }
1114
1115            // Report the memtable scan metrics to partition metrics
1116            if let Some(ref metrics) = mem_scan_metrics {
1117                let data = metrics.data();
1118                part_metrics.report_mem_scan_metrics(&data);
1119            }
1120        }
1121    }
1122}
1123
1124/// Scans memtable ranges at `index` using flat format that returns RecordBatch.
1125#[tracing::instrument(
1126    skip_all,
1127    fields(
1128        region_id = %stream_ctx.input.region_metadata().region_id,
1129        row_group_index = %index.index,
1130        source = "mem_flat"
1131    )
1132)]
1133pub(crate) fn scan_flat_mem_ranges(
1134    stream_ctx: Arc<StreamContext>,
1135    part_metrics: PartitionMetrics,
1136    index: RowGroupIndex,
1137) -> impl Stream<Item = Result<RecordBatch>> {
1138    try_stream! {
1139        let ranges = stream_ctx.input.build_mem_ranges(index);
1140        part_metrics.inc_num_mem_ranges(ranges.len());
1141        for range in ranges {
1142            let build_reader_start = Instant::now();
1143            let mem_scan_metrics = Some(MemScanMetrics::default());
1144            let mut iter = range.build_record_batch_iter(mem_scan_metrics.clone())?;
1145            part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
1146
1147            while let Some(record_batch) = iter.next().transpose()? {
1148                yield record_batch;
1149            }
1150
1151            // Report the memtable scan metrics to partition metrics
1152            if let Some(ref metrics) = mem_scan_metrics {
1153                let data = metrics.data();
1154                part_metrics.report_mem_scan_metrics(&data);
1155            }
1156        }
1157    }
1158}
1159
1160/// Files with row count greater than this threshold can contribute to the estimation.
1161const SPLIT_ROW_THRESHOLD: u64 = DEFAULT_ROW_GROUP_SIZE as u64;
1162/// Number of series threshold for splitting batches.
1163const NUM_SERIES_THRESHOLD: u64 = 10240;
1164/// Minimum batch size after splitting. The batch size is less than 60 because a series may only have
1165/// 60 samples per hour.
1166const BATCH_SIZE_THRESHOLD: u64 = 50;
1167
1168/// Returns true if splitting flat record batches may improve merge performance.
1169pub(crate) fn should_split_flat_batches_for_merge(
1170    stream_ctx: &Arc<StreamContext>,
1171    range_meta: &RangeMeta,
1172) -> bool {
1173    // Number of files to split and scan.
1174    let mut num_files_to_split = 0;
1175    let mut num_mem_rows = 0;
1176    let mut num_mem_series = 0;
1177    // Checks each file range, returns early if any range is not splittable.
1178    // For mem ranges, we collect the total number of rows and series because the number of rows in a
1179    // mem range may be too small.
1180    for index in &range_meta.row_group_indices {
1181        if stream_ctx.is_mem_range_index(*index) {
1182            let memtable = &stream_ctx.input.memtables[index.index];
1183            // Is mem range
1184            let stats = memtable.stats();
1185            num_mem_rows += stats.num_rows();
1186            num_mem_series += stats.series_count();
1187        } else if stream_ctx.is_file_range_index(*index) {
1188            // This is a file range.
1189            let file_index = index.index - stream_ctx.input.num_memtables();
1190            let file = &stream_ctx.input.files[file_index];
1191            if file.meta_ref().num_rows < SPLIT_ROW_THRESHOLD || file.meta_ref().num_series == 0 {
1192                // If the file doesn't have enough rows, or the number of series is unavailable, skips it.
1193                continue;
1194            }
1195            debug_assert!(file.meta_ref().num_rows > 0);
1196            if !can_split_series(file.meta_ref().num_rows, file.meta_ref().num_series) {
1197                // We can't split batches in a file.
1198                return false;
1199            } else {
1200                num_files_to_split += 1;
1201            }
1202        }
1203        // Skips non-file and non-mem ranges.
1204    }
1205
1206    if num_files_to_split > 0 {
1207        // We mainly consider file ranges because they have enough data for sampling.
1208        true
1209    } else if num_mem_series > 0 && num_mem_rows > 0 {
1210        // If we don't have files to scan, we check whether to split by the memtable.
1211        can_split_series(num_mem_rows as u64, num_mem_series as u64)
1212    } else {
1213        false
1214    }
1215}
1216
1217fn can_split_series(num_rows: u64, num_series: u64) -> bool {
1218    assert!(num_series > 0);
1219    assert!(num_rows > 0);
1220
1221    // It doesn't have too many series or it will have enough rows for each batch.
1222    num_series < NUM_SERIES_THRESHOLD || num_rows / num_series >= BATCH_SIZE_THRESHOLD
1223}
1224
1225/// Creates a new [ReaderFilterMetrics] with optional apply metrics initialized
1226/// based on the `explain_verbose` flag.
1227fn new_filter_metrics(explain_verbose: bool) -> ReaderFilterMetrics {
1228    if explain_verbose {
1229        ReaderFilterMetrics {
1230            inverted_index_apply_metrics: Some(InvertedIndexApplyMetrics::default()),
1231            bloom_filter_apply_metrics: Some(BloomFilterIndexApplyMetrics::default()),
1232            fulltext_index_apply_metrics: Some(FulltextIndexApplyMetrics::default()),
1233            ..Default::default()
1234        }
1235    } else {
1236        ReaderFilterMetrics::default()
1237    }
1238}
1239
1240/// Scans file ranges at `index`.
1241#[tracing::instrument(
1242    skip_all,
1243    fields(
1244        region_id = %stream_ctx.input.region_metadata().region_id,
1245        row_group_index = %index.index,
1246        source = read_type
1247    )
1248)]
1249pub(crate) async fn scan_file_ranges(
1250    stream_ctx: Arc<StreamContext>,
1251    part_metrics: PartitionMetrics,
1252    index: RowGroupIndex,
1253    read_type: &'static str,
1254    range_builder: Arc<RangeBuilderList>,
1255) -> Result<impl Stream<Item = Result<Batch>>> {
1256    let mut reader_metrics = ReaderMetrics {
1257        filter_metrics: new_filter_metrics(part_metrics.explain_verbose()),
1258        ..Default::default()
1259    };
1260    let ranges = range_builder
1261        .build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
1262        .await?;
1263    part_metrics.inc_num_file_ranges(ranges.len());
1264    part_metrics.merge_reader_metrics(&reader_metrics, None);
1265
1266    // Creates initial per-file metrics with build_part_cost.
1267    let init_per_file_metrics = if part_metrics.explain_verbose() {
1268        let file = stream_ctx.input.file_from_index(index);
1269        let file_id = file.file_id();
1270
1271        let mut map = HashMap::new();
1272        map.insert(
1273            file_id,
1274            FileScanMetrics {
1275                build_part_cost: reader_metrics.build_cost,
1276                ..Default::default()
1277            },
1278        );
1279        Some(map)
1280    } else {
1281        None
1282    };
1283
1284    Ok(build_file_range_scan_stream(
1285        stream_ctx,
1286        part_metrics,
1287        read_type,
1288        ranges,
1289        init_per_file_metrics,
1290    ))
1291}
1292
1293/// Scans file ranges at `index` using flat reader that returns RecordBatch.
1294#[tracing::instrument(
1295    skip_all,
1296    fields(
1297        region_id = %stream_ctx.input.region_metadata().region_id,
1298        row_group_index = %index.index,
1299        source = read_type
1300    )
1301)]
1302pub(crate) async fn scan_flat_file_ranges(
1303    stream_ctx: Arc<StreamContext>,
1304    part_metrics: PartitionMetrics,
1305    index: RowGroupIndex,
1306    read_type: &'static str,
1307    range_builder: Arc<RangeBuilderList>,
1308) -> Result<impl Stream<Item = Result<RecordBatch>>> {
1309    let mut reader_metrics = ReaderMetrics {
1310        filter_metrics: new_filter_metrics(part_metrics.explain_verbose()),
1311        ..Default::default()
1312    };
1313    let ranges = range_builder
1314        .build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
1315        .await?;
1316    part_metrics.inc_num_file_ranges(ranges.len());
1317    part_metrics.merge_reader_metrics(&reader_metrics, None);
1318
1319    // Creates initial per-file metrics with build_part_cost.
1320    let init_per_file_metrics = if part_metrics.explain_verbose() {
1321        let file = stream_ctx.input.file_from_index(index);
1322        let file_id = file.file_id();
1323
1324        let mut map = HashMap::new();
1325        map.insert(
1326            file_id,
1327            FileScanMetrics {
1328                build_part_cost: reader_metrics.build_cost,
1329                ..Default::default()
1330            },
1331        );
1332        Some(map)
1333    } else {
1334        None
1335    };
1336
1337    Ok(build_flat_file_range_scan_stream(
1338        stream_ctx,
1339        part_metrics,
1340        read_type,
1341        ranges,
1342        init_per_file_metrics,
1343    ))
1344}
1345
1346/// Build the stream of scanning the input [`FileRange`]s.
1347#[tracing::instrument(
1348    skip_all,
1349    fields(read_type = read_type, range_count = ranges.len())
1350)]
1351pub fn build_file_range_scan_stream(
1352    stream_ctx: Arc<StreamContext>,
1353    part_metrics: PartitionMetrics,
1354    read_type: &'static str,
1355    ranges: SmallVec<[FileRange; 2]>,
1356    mut per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
1357) -> impl Stream<Item = Result<Batch>> {
1358    try_stream! {
1359        let fetch_metrics = if part_metrics.explain_verbose() {
1360            Some(Arc::new(ParquetFetchMetrics::default()))
1361        } else {
1362            None
1363        };
1364        let reader_metrics = &mut ReaderMetrics {
1365            fetch_metrics: fetch_metrics.clone(),
1366            ..Default::default()
1367        };
1368        for range in ranges {
1369            let build_reader_start = Instant::now();
1370            let Some(reader) = range.reader(stream_ctx.input.series_row_selector, fetch_metrics.as_deref()).await? else {
1371                continue;
1372            };
1373            let build_cost = build_reader_start.elapsed();
1374            part_metrics.inc_build_reader_cost(build_cost);
1375            let compat_batch = range.compat_batch();
1376            let mut source = Source::PruneReader(reader);
1377            while let Some(mut batch) = source.next_batch().await? {
1378                if let Some(compact_batch) = compat_batch {
1379                    batch = compact_batch.as_primary_key().unwrap().compat_batch(batch)?;
1380                }
1381                yield batch;
1382            }
1383            if let Source::PruneReader(reader) = source {
1384                let prune_metrics = reader.metrics();
1385
1386                // Update per-file metrics if tracking is enabled
1387                if let Some(file_metrics_map) = per_file_metrics.as_mut() {
1388                    let file_id = range.file_handle().file_id();
1389                    let file_metrics = file_metrics_map
1390                        .entry(file_id)
1391                        .or_insert_with(FileScanMetrics::default);
1392
1393                    file_metrics.num_ranges += 1;
1394                    file_metrics.num_rows += prune_metrics.num_rows;
1395                    file_metrics.build_reader_cost += build_cost;
1396                    file_metrics.scan_cost += prune_metrics.scan_cost;
1397                }
1398
1399                reader_metrics.merge_from(&prune_metrics);
1400            }
1401        }
1402
1403        // Reports metrics.
1404        reader_metrics.observe_rows(read_type);
1405        reader_metrics.filter_metrics.observe();
1406        part_metrics.merge_reader_metrics(reader_metrics, per_file_metrics.as_ref());
1407    }
1408}
1409
1410/// Build the stream of scanning the input [`FileRange`]s using flat reader that returns RecordBatch.
1411#[tracing::instrument(
1412    skip_all,
1413    fields(read_type = read_type, range_count = ranges.len())
1414)]
1415pub fn build_flat_file_range_scan_stream(
1416    _stream_ctx: Arc<StreamContext>,
1417    part_metrics: PartitionMetrics,
1418    read_type: &'static str,
1419    ranges: SmallVec<[FileRange; 2]>,
1420    mut per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
1421) -> impl Stream<Item = Result<RecordBatch>> {
1422    try_stream! {
1423        let fetch_metrics = if part_metrics.explain_verbose() {
1424            Some(Arc::new(ParquetFetchMetrics::default()))
1425        } else {
1426            None
1427        };
1428        let reader_metrics = &mut ReaderMetrics {
1429            fetch_metrics: fetch_metrics.clone(),
1430            ..Default::default()
1431        };
1432        for range in ranges {
1433            let build_reader_start = Instant::now();
1434            let Some(mut reader) = range.flat_reader(fetch_metrics.as_deref()).await? else{continue};
1435            let build_cost = build_reader_start.elapsed();
1436            part_metrics.inc_build_reader_cost(build_cost);
1437
1438            let may_compat = range
1439                .compat_batch()
1440                .map(|compat| {
1441                    compat.as_flat().context(UnexpectedSnafu {
1442                        reason: "Invalid compat for flat format",
1443                    })
1444                })
1445                .transpose()?;
1446
1447            let mapper = range.compaction_projection_mapper();
1448            while let Some(record_batch) = reader.next_batch()? {
1449                let record_batch = if let Some(mapper) = mapper {
1450                    let batch = mapper.project(record_batch)?;
1451                    batch
1452                } else {
1453                    record_batch
1454                };
1455
1456                if let Some(flat_compat) = may_compat {
1457                    let batch = flat_compat.compat(record_batch)?;
1458                    yield batch;
1459                } else {
1460                    yield record_batch;
1461                }
1462            }
1463
1464            let prune_metrics = reader.metrics();
1465
1466            // Update per-file metrics if tracking is enabled
1467            if let Some(file_metrics_map) = per_file_metrics.as_mut() {
1468                let file_id = range.file_handle().file_id();
1469                let file_metrics = file_metrics_map
1470                    .entry(file_id)
1471                    .or_insert_with(FileScanMetrics::default);
1472
1473                file_metrics.num_ranges += 1;
1474                file_metrics.num_rows += prune_metrics.num_rows;
1475                file_metrics.build_reader_cost += build_cost;
1476                file_metrics.scan_cost += prune_metrics.scan_cost;
1477            }
1478
1479            reader_metrics.merge_from(&prune_metrics);
1480        }
1481
1482        // Reports metrics.
1483        reader_metrics.observe_rows(read_type);
1484        reader_metrics.filter_metrics.observe();
1485        part_metrics.merge_reader_metrics(reader_metrics, per_file_metrics.as_ref());
1486    }
1487}
1488
1489/// Build the stream of scanning the extension range denoted by the [`RowGroupIndex`].
1490#[cfg(feature = "enterprise")]
1491pub(crate) async fn scan_extension_range(
1492    context: Arc<StreamContext>,
1493    index: RowGroupIndex,
1494    partition_metrics: PartitionMetrics,
1495) -> Result<BoxedBatchStream> {
1496    use snafu::ResultExt;
1497
1498    let range = context.input.extension_range(index.index);
1499    let reader = range.reader(context.as_ref());
1500    let stream = reader
1501        .read(context, partition_metrics, index)
1502        .await
1503        .context(crate::error::ScanExternalRangeSnafu)?;
1504    Ok(stream)
1505}
1506
1507pub(crate) async fn maybe_scan_other_ranges(
1508    context: &Arc<StreamContext>,
1509    index: RowGroupIndex,
1510    metrics: &PartitionMetrics,
1511) -> Result<BoxedBatchStream> {
1512    #[cfg(feature = "enterprise")]
1513    {
1514        scan_extension_range(context.clone(), index, metrics.clone()).await
1515    }
1516
1517    #[cfg(not(feature = "enterprise"))]
1518    {
1519        let _ = context;
1520        let _ = index;
1521        let _ = metrics;
1522
1523        crate::error::UnexpectedSnafu {
1524            reason: "no other ranges scannable",
1525        }
1526        .fail()
1527    }
1528}
1529
1530pub(crate) async fn maybe_scan_flat_other_ranges(
1531    context: &Arc<StreamContext>,
1532    index: RowGroupIndex,
1533    metrics: &PartitionMetrics,
1534) -> Result<BoxedRecordBatchStream> {
1535    let _ = context;
1536    let _ = index;
1537    let _ = metrics;
1538
1539    crate::error::UnexpectedSnafu {
1540        reason: "no other ranges scannable in flat format",
1541    }
1542    .fail()
1543}
1544
1545/// A stream wrapper that splits record batches from an inner stream.
1546pub(crate) struct SplitRecordBatchStream<S> {
1547    /// The inner stream that yields record batches.
1548    inner: S,
1549    /// Buffer for split batches.
1550    batches: VecDeque<RecordBatch>,
1551}
1552
1553impl<S> SplitRecordBatchStream<S> {
1554    /// Creates a new splitting stream wrapper.
1555    pub(crate) fn new(inner: S) -> Self {
1556        Self {
1557            inner,
1558            batches: VecDeque::new(),
1559        }
1560    }
1561}
1562
1563impl<S> Stream for SplitRecordBatchStream<S>
1564where
1565    S: Stream<Item = Result<RecordBatch>> + Unpin,
1566{
1567    type Item = Result<RecordBatch>;
1568
1569    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1570        loop {
1571            // First, check if we have buffered split batches
1572            if let Some(batch) = self.batches.pop_front() {
1573                return Poll::Ready(Some(Ok(batch)));
1574            }
1575
1576            // Poll the inner stream for the next batch
1577            let record_batch = match futures::ready!(Pin::new(&mut self.inner).poll_next(cx)) {
1578                Some(Ok(batch)) => batch,
1579                Some(Err(e)) => return Poll::Ready(Some(Err(e))),
1580                None => return Poll::Ready(None),
1581            };
1582
1583            // Split the batch and buffer the results
1584            split_record_batch(record_batch, &mut self.batches);
1585            // Continue the loop to return the first split batch
1586        }
1587    }
1588}
1589
1590/// Splits the batch by timestamps.
1591///
1592/// # Panics
1593/// Panics if the timestamp array is invalid.
1594pub(crate) fn split_record_batch(record_batch: RecordBatch, batches: &mut VecDeque<RecordBatch>) {
1595    let batch_rows = record_batch.num_rows();
1596    if batch_rows == 0 {
1597        return;
1598    }
1599    if batch_rows < 2 {
1600        batches.push_back(record_batch);
1601        return;
1602    }
1603
1604    let time_index_pos = time_index_column_index(record_batch.num_columns());
1605    let timestamps = record_batch.column(time_index_pos);
1606    let (ts_values, _unit) = timestamp_array_to_primitive(timestamps).unwrap();
1607    let mut offsets = Vec::with_capacity(16);
1608    offsets.push(0);
1609    let values = ts_values.values();
1610    for (i, &value) in values.iter().take(batch_rows - 1).enumerate() {
1611        if value > values[i + 1] {
1612            offsets.push(i + 1);
1613        }
1614    }
1615    offsets.push(values.len());
1616
1617    // Splits the batch by offsets.
1618    for (i, &start) in offsets[..offsets.len() - 1].iter().enumerate() {
1619        let end = offsets[i + 1];
1620        let rows_in_batch = end - start;
1621        batches.push_back(record_batch.slice(start, rows_in_batch));
1622    }
1623}