mito2/read/
scan_util.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities for scanners.
16
17use std::collections::{BinaryHeap, HashMap, VecDeque};
18use std::fmt;
19use std::pin::Pin;
20use std::sync::{Arc, Mutex};
21use std::task::{Context, Poll};
22use std::time::{Duration, Instant};
23
24use async_stream::try_stream;
25use common_telemetry::tracing;
26use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time};
27use datatypes::arrow::record_batch::RecordBatch;
28use datatypes::timestamp::timestamp_array_to_primitive;
29use futures::Stream;
30use prometheus::IntGauge;
31use smallvec::SmallVec;
32use snafu::OptionExt;
33use store_api::storage::RegionId;
34
35use crate::error::{Result, UnexpectedSnafu};
36use crate::memtable::MemScanMetrics;
37use crate::metrics::{
38    IN_PROGRESS_SCAN, PRECISE_FILTER_ROWS_TOTAL, READ_BATCHES_RETURN, READ_ROW_GROUPS_TOTAL,
39    READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_RETURN, READ_STAGE_ELAPSED,
40};
41use crate::read::dedup::{DedupMetrics, DedupMetricsReport};
42use crate::read::merge::{MergeMetrics, MergeMetricsReport};
43use crate::read::range::{RangeBuilderList, RangeMeta, RowGroupIndex};
44use crate::read::scan_region::StreamContext;
45use crate::read::{Batch, BoxedBatchStream, BoxedRecordBatchStream, ScannerMetrics, Source};
46use crate::sst::file::{FileTimeRange, RegionFileId};
47use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplyMetrics;
48use crate::sst::index::fulltext_index::applier::FulltextIndexApplyMetrics;
49use crate::sst::index::inverted_index::applier::InvertedIndexApplyMetrics;
50use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
51use crate::sst::parquet::file_range::FileRange;
52use crate::sst::parquet::flat_format::time_index_column_index;
53use crate::sst::parquet::reader::{MetadataCacheMetrics, ReaderFilterMetrics, ReaderMetrics};
54use crate::sst::parquet::row_group::ParquetFetchMetrics;
55
56/// Per-file scan metrics.
57#[derive(Default, Clone)]
58pub struct FileScanMetrics {
59    /// Number of ranges (row groups) read from this file.
60    pub num_ranges: usize,
61    /// Number of rows read from this file.
62    pub num_rows: usize,
63    /// Time spent building file ranges/parts (file-level preparation).
64    pub build_part_cost: Duration,
65    /// Time spent building readers for this file (accumulated across all ranges).
66    pub build_reader_cost: Duration,
67    /// Time spent scanning this file (accumulated across all ranges).
68    pub scan_cost: Duration,
69}
70
71impl fmt::Debug for FileScanMetrics {
72    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73        write!(f, "{{\"build_part_cost\":\"{:?}\"", self.build_part_cost)?;
74
75        if self.num_ranges > 0 {
76            write!(f, ", \"num_ranges\":{}", self.num_ranges)?;
77        }
78        if self.num_rows > 0 {
79            write!(f, ", \"num_rows\":{}", self.num_rows)?;
80        }
81        if !self.build_reader_cost.is_zero() {
82            write!(
83                f,
84                ", \"build_reader_cost\":\"{:?}\"",
85                self.build_reader_cost
86            )?;
87        }
88        if !self.scan_cost.is_zero() {
89            write!(f, ", \"scan_cost\":\"{:?}\"", self.scan_cost)?;
90        }
91
92        write!(f, "}}")
93    }
94}
95
96impl FileScanMetrics {
97    /// Merges another FileMetrics into this one.
98    pub(crate) fn merge_from(&mut self, other: &FileScanMetrics) {
99        self.num_ranges += other.num_ranges;
100        self.num_rows += other.num_rows;
101        self.build_part_cost += other.build_part_cost;
102        self.build_reader_cost += other.build_reader_cost;
103        self.scan_cost += other.scan_cost;
104    }
105}
106
107/// Verbose scan metrics for a partition.
108#[derive(Default)]
109pub(crate) struct ScanMetricsSet {
110    /// Duration to prepare the scan task.
111    prepare_scan_cost: Duration,
112    /// Duration to build the (merge) reader.
113    build_reader_cost: Duration,
114    /// Duration to scan data.
115    scan_cost: Duration,
116    /// Duration while waiting for `yield`.
117    yield_cost: Duration,
118    /// Duration of the scan.
119    total_cost: Duration,
120    /// Number of rows returned.
121    num_rows: usize,
122    /// Number of batches returned.
123    num_batches: usize,
124    /// Number of mem ranges scanned.
125    num_mem_ranges: usize,
126    /// Number of file ranges scanned.
127    num_file_ranges: usize,
128
129    // Memtable related metrics:
130    /// Duration to scan memtables.
131    mem_scan_cost: Duration,
132    /// Number of rows read from memtables.
133    mem_rows: usize,
134    /// Number of batches read from memtables.
135    mem_batches: usize,
136    /// Number of series read from memtables.
137    mem_series: usize,
138
139    // SST related metrics:
140    /// Duration to build file ranges.
141    build_parts_cost: Duration,
142    /// Duration to scan SST files.
143    sst_scan_cost: Duration,
144    /// Number of row groups before filtering.
145    rg_total: usize,
146    /// Number of row groups filtered by fulltext index.
147    rg_fulltext_filtered: usize,
148    /// Number of row groups filtered by inverted index.
149    rg_inverted_filtered: usize,
150    /// Number of row groups filtered by min-max index.
151    rg_minmax_filtered: usize,
152    /// Number of row groups filtered by bloom filter index.
153    rg_bloom_filtered: usize,
154    /// Number of rows in row group before filtering.
155    rows_before_filter: usize,
156    /// Number of rows in row group filtered by fulltext index.
157    rows_fulltext_filtered: usize,
158    /// Number of rows in row group filtered by inverted index.
159    rows_inverted_filtered: usize,
160    /// Number of rows in row group filtered by bloom filter index.
161    rows_bloom_filtered: usize,
162    /// Number of rows filtered by precise filter.
163    rows_precise_filtered: usize,
164    /// Number of record batches read from SST.
165    num_sst_record_batches: usize,
166    /// Number of batches decoded from SST.
167    num_sst_batches: usize,
168    /// Number of rows read from SST.
169    num_sst_rows: usize,
170
171    /// Elapsed time before the first poll operation.
172    first_poll: Duration,
173
174    /// Number of send timeout in SeriesScan.
175    num_series_send_timeout: usize,
176    /// Number of send full in SeriesScan.
177    num_series_send_full: usize,
178    /// Number of rows the series distributor scanned.
179    num_distributor_rows: usize,
180    /// Number of batches the series distributor scanned.
181    num_distributor_batches: usize,
182    /// Duration of the series distributor to scan.
183    distributor_scan_cost: Duration,
184    /// Duration of the series distributor to yield.
185    distributor_yield_cost: Duration,
186
187    /// Merge metrics.
188    merge_metrics: MergeMetrics,
189    /// Dedup metrics.
190    dedup_metrics: DedupMetrics,
191
192    /// The stream reached EOF
193    stream_eof: bool,
194
195    // Optional verbose metrics:
196    /// Inverted index apply metrics.
197    inverted_index_apply_metrics: Option<InvertedIndexApplyMetrics>,
198    /// Bloom filter index apply metrics.
199    bloom_filter_apply_metrics: Option<BloomFilterIndexApplyMetrics>,
200    /// Fulltext index apply metrics.
201    fulltext_index_apply_metrics: Option<FulltextIndexApplyMetrics>,
202    /// Parquet fetch metrics.
203    fetch_metrics: Option<ParquetFetchMetrics>,
204    /// Metadata cache metrics.
205    metadata_cache_metrics: Option<MetadataCacheMetrics>,
206    /// Per-file scan metrics, only populated when explain_verbose is true.
207    per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
208}
209
210/// Wrapper for file metrics that compares by total cost in reverse order.
211/// This allows using BinaryHeap as a min-heap for efficient top-K selection.
212struct CompareCostReverse<'a> {
213    total_cost: Duration,
214    file_id: RegionFileId,
215    metrics: &'a FileScanMetrics,
216}
217
218impl Ord for CompareCostReverse<'_> {
219    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
220        // Reverse comparison: smaller costs are "greater"
221        other.total_cost.cmp(&self.total_cost)
222    }
223}
224
225impl PartialOrd for CompareCostReverse<'_> {
226    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
227        Some(self.cmp(other))
228    }
229}
230
231impl Eq for CompareCostReverse<'_> {}
232
233impl PartialEq for CompareCostReverse<'_> {
234    fn eq(&self, other: &Self) -> bool {
235        self.total_cost == other.total_cost
236    }
237}
238
239impl fmt::Debug for ScanMetricsSet {
240    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
241        let ScanMetricsSet {
242            prepare_scan_cost,
243            build_reader_cost,
244            scan_cost,
245            yield_cost,
246            total_cost,
247            num_rows,
248            num_batches,
249            num_mem_ranges,
250            num_file_ranges,
251            build_parts_cost,
252            sst_scan_cost,
253            rg_total,
254            rg_fulltext_filtered,
255            rg_inverted_filtered,
256            rg_minmax_filtered,
257            rg_bloom_filtered,
258            rows_before_filter,
259            rows_fulltext_filtered,
260            rows_inverted_filtered,
261            rows_bloom_filtered,
262            rows_precise_filtered,
263            num_sst_record_batches,
264            num_sst_batches,
265            num_sst_rows,
266            first_poll,
267            num_series_send_timeout,
268            num_series_send_full,
269            num_distributor_rows,
270            num_distributor_batches,
271            distributor_scan_cost,
272            distributor_yield_cost,
273            merge_metrics,
274            dedup_metrics,
275            stream_eof,
276            mem_scan_cost,
277            mem_rows,
278            mem_batches,
279            mem_series,
280            inverted_index_apply_metrics,
281            bloom_filter_apply_metrics,
282            fulltext_index_apply_metrics,
283            fetch_metrics,
284            metadata_cache_metrics,
285            per_file_metrics,
286        } = self;
287
288        // Write core metrics
289        write!(
290            f,
291            "{{\"prepare_scan_cost\":\"{prepare_scan_cost:?}\", \
292            \"build_reader_cost\":\"{build_reader_cost:?}\", \
293            \"scan_cost\":\"{scan_cost:?}\", \
294            \"yield_cost\":\"{yield_cost:?}\", \
295            \"total_cost\":\"{total_cost:?}\", \
296            \"num_rows\":{num_rows}, \
297            \"num_batches\":{num_batches}, \
298            \"num_mem_ranges\":{num_mem_ranges}, \
299            \"num_file_ranges\":{num_file_ranges}, \
300            \"build_parts_cost\":\"{build_parts_cost:?}\", \
301            \"sst_scan_cost\":\"{sst_scan_cost:?}\", \
302            \"rg_total\":{rg_total}, \
303            \"rows_before_filter\":{rows_before_filter}, \
304            \"num_sst_record_batches\":{num_sst_record_batches}, \
305            \"num_sst_batches\":{num_sst_batches}, \
306            \"num_sst_rows\":{num_sst_rows}, \
307            \"first_poll\":\"{first_poll:?}\""
308        )?;
309
310        // Write non-zero filter counters
311        if *rg_fulltext_filtered > 0 {
312            write!(f, ", \"rg_fulltext_filtered\":{rg_fulltext_filtered}")?;
313        }
314        if *rg_inverted_filtered > 0 {
315            write!(f, ", \"rg_inverted_filtered\":{rg_inverted_filtered}")?;
316        }
317        if *rg_minmax_filtered > 0 {
318            write!(f, ", \"rg_minmax_filtered\":{rg_minmax_filtered}")?;
319        }
320        if *rg_bloom_filtered > 0 {
321            write!(f, ", \"rg_bloom_filtered\":{rg_bloom_filtered}")?;
322        }
323        if *rows_fulltext_filtered > 0 {
324            write!(f, ", \"rows_fulltext_filtered\":{rows_fulltext_filtered}")?;
325        }
326        if *rows_inverted_filtered > 0 {
327            write!(f, ", \"rows_inverted_filtered\":{rows_inverted_filtered}")?;
328        }
329        if *rows_bloom_filtered > 0 {
330            write!(f, ", \"rows_bloom_filtered\":{rows_bloom_filtered}")?;
331        }
332        if *rows_precise_filtered > 0 {
333            write!(f, ", \"rows_precise_filtered\":{rows_precise_filtered}")?;
334        }
335
336        // Write non-zero distributor metrics
337        if *num_series_send_timeout > 0 {
338            write!(f, ", \"num_series_send_timeout\":{num_series_send_timeout}")?;
339        }
340        if *num_series_send_full > 0 {
341            write!(f, ", \"num_series_send_full\":{num_series_send_full}")?;
342        }
343        if *num_distributor_rows > 0 {
344            write!(f, ", \"num_distributor_rows\":{num_distributor_rows}")?;
345        }
346        if *num_distributor_batches > 0 {
347            write!(f, ", \"num_distributor_batches\":{num_distributor_batches}")?;
348        }
349        if !distributor_scan_cost.is_zero() {
350            write!(
351                f,
352                ", \"distributor_scan_cost\":\"{distributor_scan_cost:?}\""
353            )?;
354        }
355        if !distributor_yield_cost.is_zero() {
356            write!(
357                f,
358                ", \"distributor_yield_cost\":\"{distributor_yield_cost:?}\""
359            )?;
360        }
361
362        // Write non-zero memtable metrics
363        if *mem_rows > 0 {
364            write!(f, ", \"mem_rows\":{mem_rows}")?;
365        }
366        if *mem_batches > 0 {
367            write!(f, ", \"mem_batches\":{mem_batches}")?;
368        }
369        if *mem_series > 0 {
370            write!(f, ", \"mem_series\":{mem_series}")?;
371        }
372        if !mem_scan_cost.is_zero() {
373            write!(f, ", \"mem_scan_cost\":\"{mem_scan_cost:?}\"")?;
374        }
375
376        // Write optional verbose metrics if they are not empty
377        if let Some(metrics) = inverted_index_apply_metrics
378            && !metrics.is_empty()
379        {
380            write!(f, ", \"inverted_index_apply_metrics\":{:?}", metrics)?;
381        }
382        if let Some(metrics) = bloom_filter_apply_metrics
383            && !metrics.is_empty()
384        {
385            write!(f, ", \"bloom_filter_apply_metrics\":{:?}", metrics)?;
386        }
387        if let Some(metrics) = fulltext_index_apply_metrics
388            && !metrics.is_empty()
389        {
390            write!(f, ", \"fulltext_index_apply_metrics\":{:?}", metrics)?;
391        }
392        if let Some(metrics) = fetch_metrics
393            && !metrics.is_empty()
394        {
395            write!(f, ", \"fetch_metrics\":{:?}", metrics)?;
396        }
397        if let Some(metrics) = metadata_cache_metrics
398            && !metrics.is_empty()
399        {
400            write!(f, ", \"metadata_cache_metrics\":{:?}", metrics)?;
401        }
402
403        // Write merge metrics if not empty
404        if !merge_metrics.scan_cost.is_zero() {
405            write!(f, ", \"merge_metrics\":{:?}", merge_metrics)?;
406        }
407
408        // Write dedup metrics if not empty
409        if !dedup_metrics.dedup_cost.is_zero() {
410            write!(f, ", \"dedup_metrics\":{:?}", dedup_metrics)?;
411        }
412
413        // Write top file metrics if present and non-empty
414        if let Some(file_metrics) = per_file_metrics
415            && !file_metrics.is_empty()
416        {
417            // Use min-heap (BinaryHeap with reverse comparison) to keep only top 10
418            let mut heap = BinaryHeap::new();
419            for (file_id, metrics) in file_metrics.iter() {
420                let total_cost =
421                    metrics.build_part_cost + metrics.build_reader_cost + metrics.scan_cost;
422
423                if heap.len() < 10 {
424                    // Haven't reached 10 yet, just push
425                    heap.push(CompareCostReverse {
426                        total_cost,
427                        file_id: *file_id,
428                        metrics,
429                    });
430                } else if let Some(min_entry) = heap.peek() {
431                    // If current cost is higher than the minimum in our top-10, replace it
432                    if total_cost > min_entry.total_cost {
433                        heap.pop();
434                        heap.push(CompareCostReverse {
435                            total_cost,
436                            file_id: *file_id,
437                            metrics,
438                        });
439                    }
440                }
441            }
442
443            let top_files = heap.into_sorted_vec();
444            write!(f, ", \"top_file_metrics\": {{")?;
445            for (i, item) in top_files.iter().enumerate() {
446                let CompareCostReverse {
447                    total_cost: _,
448                    file_id,
449                    metrics,
450                } = item;
451                if i > 0 {
452                    write!(f, ", ")?;
453                }
454                write!(f, "\"{}\": {:?}", file_id, metrics)?;
455            }
456            write!(f, "}}")?;
457        }
458
459        write!(f, ", \"stream_eof\":{stream_eof}}}")
460    }
461}
462impl ScanMetricsSet {
463    /// Attaches the `prepare_scan_cost` to the metrics set.
464    fn with_prepare_scan_cost(mut self, cost: Duration) -> Self {
465        self.prepare_scan_cost += cost;
466        self
467    }
468
469    /// Merges the local scanner metrics.
470    fn merge_scanner_metrics(&mut self, other: &ScannerMetrics) {
471        let ScannerMetrics {
472            prepare_scan_cost,
473            build_reader_cost,
474            scan_cost,
475            yield_cost,
476            num_batches,
477            num_rows,
478            num_mem_ranges,
479            num_file_ranges,
480        } = other;
481
482        self.prepare_scan_cost += *prepare_scan_cost;
483        self.build_reader_cost += *build_reader_cost;
484        self.scan_cost += *scan_cost;
485        self.yield_cost += *yield_cost;
486        self.num_rows += *num_rows;
487        self.num_batches += *num_batches;
488        self.num_mem_ranges += *num_mem_ranges;
489        self.num_file_ranges += *num_file_ranges;
490    }
491
492    /// Merges the local reader metrics.
493    fn merge_reader_metrics(&mut self, other: &ReaderMetrics) {
494        let ReaderMetrics {
495            build_cost,
496            filter_metrics:
497                ReaderFilterMetrics {
498                    rg_total,
499                    rg_fulltext_filtered,
500                    rg_inverted_filtered,
501                    rg_minmax_filtered,
502                    rg_bloom_filtered,
503                    rows_total,
504                    rows_fulltext_filtered,
505                    rows_inverted_filtered,
506                    rows_bloom_filtered,
507                    rows_precise_filtered,
508                    inverted_index_apply_metrics,
509                    bloom_filter_apply_metrics,
510                    fulltext_index_apply_metrics,
511                },
512            num_record_batches,
513            num_batches,
514            num_rows,
515            scan_cost,
516            metadata_cache_metrics,
517            fetch_metrics,
518        } = other;
519
520        self.build_parts_cost += *build_cost;
521        self.sst_scan_cost += *scan_cost;
522
523        self.rg_total += *rg_total;
524        self.rg_fulltext_filtered += *rg_fulltext_filtered;
525        self.rg_inverted_filtered += *rg_inverted_filtered;
526        self.rg_minmax_filtered += *rg_minmax_filtered;
527        self.rg_bloom_filtered += *rg_bloom_filtered;
528
529        self.rows_before_filter += *rows_total;
530        self.rows_fulltext_filtered += *rows_fulltext_filtered;
531        self.rows_inverted_filtered += *rows_inverted_filtered;
532        self.rows_bloom_filtered += *rows_bloom_filtered;
533        self.rows_precise_filtered += *rows_precise_filtered;
534
535        self.num_sst_record_batches += *num_record_batches;
536        self.num_sst_batches += *num_batches;
537        self.num_sst_rows += *num_rows;
538
539        // Merge optional verbose metrics
540        if let Some(metrics) = inverted_index_apply_metrics {
541            self.inverted_index_apply_metrics
542                .get_or_insert_with(InvertedIndexApplyMetrics::default)
543                .merge_from(metrics);
544        }
545        if let Some(metrics) = bloom_filter_apply_metrics {
546            self.bloom_filter_apply_metrics
547                .get_or_insert_with(BloomFilterIndexApplyMetrics::default)
548                .merge_from(metrics);
549        }
550        if let Some(metrics) = fulltext_index_apply_metrics {
551            self.fulltext_index_apply_metrics
552                .get_or_insert_with(FulltextIndexApplyMetrics::default)
553                .merge_from(metrics);
554        }
555        if let Some(metrics) = fetch_metrics {
556            self.fetch_metrics
557                .get_or_insert_with(ParquetFetchMetrics::default)
558                .merge_from(metrics);
559        }
560        self.metadata_cache_metrics
561            .get_or_insert_with(MetadataCacheMetrics::default)
562            .merge_from(metadata_cache_metrics);
563    }
564
565    /// Merges per-file metrics.
566    fn merge_per_file_metrics(&mut self, other: &HashMap<RegionFileId, FileScanMetrics>) {
567        let self_file_metrics = self.per_file_metrics.get_or_insert_with(HashMap::new);
568        for (file_id, metrics) in other {
569            self_file_metrics
570                .entry(*file_id)
571                .or_default()
572                .merge_from(metrics);
573        }
574    }
575
576    /// Sets distributor metrics.
577    fn set_distributor_metrics(&mut self, distributor_metrics: &SeriesDistributorMetrics) {
578        let SeriesDistributorMetrics {
579            num_series_send_timeout,
580            num_series_send_full,
581            num_rows,
582            num_batches,
583            scan_cost,
584            yield_cost,
585        } = distributor_metrics;
586
587        self.num_series_send_timeout += *num_series_send_timeout;
588        self.num_series_send_full += *num_series_send_full;
589        self.num_distributor_rows += *num_rows;
590        self.num_distributor_batches += *num_batches;
591        self.distributor_scan_cost += *scan_cost;
592        self.distributor_yield_cost += *yield_cost;
593    }
594
595    /// Observes metrics.
596    fn observe_metrics(&self) {
597        READ_STAGE_ELAPSED
598            .with_label_values(&["prepare_scan"])
599            .observe(self.prepare_scan_cost.as_secs_f64());
600        READ_STAGE_ELAPSED
601            .with_label_values(&["build_reader"])
602            .observe(self.build_reader_cost.as_secs_f64());
603        READ_STAGE_ELAPSED
604            .with_label_values(&["scan"])
605            .observe(self.scan_cost.as_secs_f64());
606        READ_STAGE_ELAPSED
607            .with_label_values(&["yield"])
608            .observe(self.yield_cost.as_secs_f64());
609        READ_STAGE_ELAPSED
610            .with_label_values(&["total"])
611            .observe(self.total_cost.as_secs_f64());
612        READ_ROWS_RETURN.observe(self.num_rows as f64);
613        READ_BATCHES_RETURN.observe(self.num_batches as f64);
614
615        READ_STAGE_ELAPSED
616            .with_label_values(&["build_parts"])
617            .observe(self.build_parts_cost.as_secs_f64());
618
619        READ_ROW_GROUPS_TOTAL
620            .with_label_values(&["before_filtering"])
621            .inc_by(self.rg_total as u64);
622        READ_ROW_GROUPS_TOTAL
623            .with_label_values(&["fulltext_index_filtered"])
624            .inc_by(self.rg_fulltext_filtered as u64);
625        READ_ROW_GROUPS_TOTAL
626            .with_label_values(&["inverted_index_filtered"])
627            .inc_by(self.rg_inverted_filtered as u64);
628        READ_ROW_GROUPS_TOTAL
629            .with_label_values(&["minmax_index_filtered"])
630            .inc_by(self.rg_minmax_filtered as u64);
631        READ_ROW_GROUPS_TOTAL
632            .with_label_values(&["bloom_filter_index_filtered"])
633            .inc_by(self.rg_bloom_filtered as u64);
634
635        PRECISE_FILTER_ROWS_TOTAL
636            .with_label_values(&["parquet"])
637            .inc_by(self.rows_precise_filtered as u64);
638        READ_ROWS_IN_ROW_GROUP_TOTAL
639            .with_label_values(&["before_filtering"])
640            .inc_by(self.rows_before_filter as u64);
641        READ_ROWS_IN_ROW_GROUP_TOTAL
642            .with_label_values(&["fulltext_index_filtered"])
643            .inc_by(self.rows_fulltext_filtered as u64);
644        READ_ROWS_IN_ROW_GROUP_TOTAL
645            .with_label_values(&["inverted_index_filtered"])
646            .inc_by(self.rows_inverted_filtered as u64);
647        READ_ROWS_IN_ROW_GROUP_TOTAL
648            .with_label_values(&["bloom_filter_index_filtered"])
649            .inc_by(self.rows_bloom_filtered as u64);
650    }
651}
652
653struct PartitionMetricsInner {
654    region_id: RegionId,
655    /// Index of the partition to scan.
656    partition: usize,
657    /// Label to distinguish different scan operation.
658    scanner_type: &'static str,
659    /// Query start time.
660    query_start: Instant,
661    /// Whether to use verbose logging.
662    explain_verbose: bool,
663    /// Verbose scan metrics that only log to debug logs by default.
664    metrics: Mutex<ScanMetricsSet>,
665    in_progress_scan: IntGauge,
666
667    // Normal metrics that always report to the [ExecutionPlanMetricsSet]:
668    /// Duration to build file ranges.
669    build_parts_cost: Time,
670    /// Duration to build the (merge) reader.
671    build_reader_cost: Time,
672    /// Duration to scan data.
673    scan_cost: Time,
674    /// Duration while waiting for `yield`.
675    yield_cost: Time,
676    /// Duration to convert [`Batch`]es.
677    convert_cost: Time,
678    /// Aggregated compute time reported to DataFusion.
679    elapsed_compute: Time,
680}
681
682impl PartitionMetricsInner {
683    fn on_finish(&self, stream_eof: bool) {
684        let mut metrics = self.metrics.lock().unwrap();
685        if metrics.total_cost.is_zero() {
686            metrics.total_cost = self.query_start.elapsed();
687        }
688        if !metrics.stream_eof {
689            metrics.stream_eof = stream_eof;
690        }
691    }
692}
693
694impl MergeMetricsReport for PartitionMetricsInner {
695    fn report(&self, metrics: &mut MergeMetrics) {
696        let mut scan_metrics = self.metrics.lock().unwrap();
697        // Merge the metrics into scan_metrics
698        scan_metrics.merge_metrics.merge(metrics);
699
700        // Reset the input metrics
701        *metrics = MergeMetrics::default();
702    }
703}
704
705impl DedupMetricsReport for PartitionMetricsInner {
706    fn report(&self, metrics: &mut DedupMetrics) {
707        let mut scan_metrics = self.metrics.lock().unwrap();
708        // Merge the metrics into scan_metrics
709        scan_metrics.dedup_metrics.merge(metrics);
710
711        // Reset the input metrics
712        *metrics = DedupMetrics::default();
713    }
714}
715
716impl Drop for PartitionMetricsInner {
717    fn drop(&mut self) {
718        self.on_finish(false);
719        let metrics = self.metrics.lock().unwrap();
720        metrics.observe_metrics();
721        self.in_progress_scan.dec();
722
723        if self.explain_verbose {
724            common_telemetry::info!(
725                "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}, convert_batch_costs: {}",
726                self.scanner_type,
727                self.region_id,
728                self.partition,
729                metrics,
730                self.convert_cost,
731            );
732        } else {
733            common_telemetry::debug!(
734                "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}, convert_batch_costs: {}",
735                self.scanner_type,
736                self.region_id,
737                self.partition,
738                metrics,
739                self.convert_cost,
740            );
741        }
742    }
743}
744
745/// List of PartitionMetrics.
746#[derive(Default)]
747pub(crate) struct PartitionMetricsList(Mutex<Vec<Option<PartitionMetrics>>>);
748
749impl PartitionMetricsList {
750    /// Sets a new [PartitionMetrics] at the specified partition.
751    pub(crate) fn set(&self, partition: usize, metrics: PartitionMetrics) {
752        let mut list = self.0.lock().unwrap();
753        if list.len() <= partition {
754            list.resize(partition + 1, None);
755        }
756        list[partition] = Some(metrics);
757    }
758
759    /// Format verbose metrics for each partition for explain.
760    pub(crate) fn format_verbose_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result {
761        let list = self.0.lock().unwrap();
762        write!(f, ", \"metrics_per_partition\": ")?;
763        f.debug_list()
764            .entries(list.iter().filter_map(|p| p.as_ref()))
765            .finish()?;
766        write!(f, "}}")
767    }
768}
769
770/// Metrics while reading a partition.
771#[derive(Clone)]
772pub struct PartitionMetrics(Arc<PartitionMetricsInner>);
773
774impl PartitionMetrics {
775    pub(crate) fn new(
776        region_id: RegionId,
777        partition: usize,
778        scanner_type: &'static str,
779        query_start: Instant,
780        explain_verbose: bool,
781        metrics_set: &ExecutionPlanMetricsSet,
782    ) -> Self {
783        let partition_str = partition.to_string();
784        let in_progress_scan = IN_PROGRESS_SCAN.with_label_values(&[scanner_type, &partition_str]);
785        in_progress_scan.inc();
786        let metrics = ScanMetricsSet::default().with_prepare_scan_cost(query_start.elapsed());
787        let inner = PartitionMetricsInner {
788            region_id,
789            partition,
790            scanner_type,
791            query_start,
792            explain_verbose,
793            metrics: Mutex::new(metrics),
794            in_progress_scan,
795            build_parts_cost: MetricBuilder::new(metrics_set)
796                .subset_time("build_parts_cost", partition),
797            build_reader_cost: MetricBuilder::new(metrics_set)
798                .subset_time("build_reader_cost", partition),
799            scan_cost: MetricBuilder::new(metrics_set).subset_time("scan_cost", partition),
800            yield_cost: MetricBuilder::new(metrics_set).subset_time("yield_cost", partition),
801            convert_cost: MetricBuilder::new(metrics_set).subset_time("convert_cost", partition),
802            elapsed_compute: MetricBuilder::new(metrics_set).elapsed_compute(partition),
803        };
804        Self(Arc::new(inner))
805    }
806
807    pub(crate) fn on_first_poll(&self) {
808        let mut metrics = self.0.metrics.lock().unwrap();
809        metrics.first_poll = self.0.query_start.elapsed();
810    }
811
812    pub(crate) fn inc_num_mem_ranges(&self, num: usize) {
813        let mut metrics = self.0.metrics.lock().unwrap();
814        metrics.num_mem_ranges += num;
815    }
816
817    pub fn inc_num_file_ranges(&self, num: usize) {
818        let mut metrics = self.0.metrics.lock().unwrap();
819        metrics.num_file_ranges += num;
820    }
821
822    fn record_elapsed_compute(&self, duration: Duration) {
823        if duration.is_zero() {
824            return;
825        }
826        self.0.elapsed_compute.add_duration(duration);
827    }
828
829    /// Merges `build_reader_cost`.
830    pub(crate) fn inc_build_reader_cost(&self, cost: Duration) {
831        self.0.build_reader_cost.add_duration(cost);
832
833        let mut metrics = self.0.metrics.lock().unwrap();
834        metrics.build_reader_cost += cost;
835    }
836
837    pub(crate) fn inc_convert_batch_cost(&self, cost: Duration) {
838        self.0.convert_cost.add_duration(cost);
839        self.record_elapsed_compute(cost);
840    }
841
842    /// Reports memtable scan metrics.
843    pub(crate) fn report_mem_scan_metrics(&self, data: &crate::memtable::MemScanMetricsData) {
844        let mut metrics = self.0.metrics.lock().unwrap();
845        metrics.mem_scan_cost += data.scan_cost;
846        metrics.mem_rows += data.num_rows;
847        metrics.mem_batches += data.num_batches;
848        metrics.mem_series += data.total_series;
849    }
850
851    /// Merges [ScannerMetrics], `build_reader_cost`, `scan_cost` and `yield_cost`.
852    pub(crate) fn merge_metrics(&self, metrics: &ScannerMetrics) {
853        self.0
854            .build_reader_cost
855            .add_duration(metrics.build_reader_cost);
856        self.0.scan_cost.add_duration(metrics.scan_cost);
857        self.record_elapsed_compute(metrics.scan_cost);
858        self.0.yield_cost.add_duration(metrics.yield_cost);
859        self.record_elapsed_compute(metrics.yield_cost);
860
861        let mut metrics_set = self.0.metrics.lock().unwrap();
862        metrics_set.merge_scanner_metrics(metrics);
863    }
864
865    /// Merges [ReaderMetrics] and `build_reader_cost`.
866    pub fn merge_reader_metrics(
867        &self,
868        metrics: &ReaderMetrics,
869        per_file_metrics: Option<&HashMap<RegionFileId, FileScanMetrics>>,
870    ) {
871        self.0.build_parts_cost.add_duration(metrics.build_cost);
872
873        let mut metrics_set = self.0.metrics.lock().unwrap();
874        metrics_set.merge_reader_metrics(metrics);
875
876        // Merge per-file metrics if provided
877        if let Some(file_metrics) = per_file_metrics {
878            metrics_set.merge_per_file_metrics(file_metrics);
879        }
880    }
881
882    /// Finishes the query.
883    pub(crate) fn on_finish(&self) {
884        self.0.on_finish(true);
885    }
886
887    /// Sets the distributor metrics.
888    pub(crate) fn set_distributor_metrics(&self, metrics: &SeriesDistributorMetrics) {
889        let mut metrics_set = self.0.metrics.lock().unwrap();
890        metrics_set.set_distributor_metrics(metrics);
891    }
892
893    /// Returns whether verbose explain is enabled.
894    pub(crate) fn explain_verbose(&self) -> bool {
895        self.0.explain_verbose
896    }
897
898    /// Returns a MergeMetricsReport trait object for reporting merge metrics.
899    pub(crate) fn merge_metrics_reporter(&self) -> Arc<dyn MergeMetricsReport> {
900        self.0.clone()
901    }
902
903    /// Returns a DedupMetricsReport trait object for reporting dedup metrics.
904    pub(crate) fn dedup_metrics_reporter(&self) -> Arc<dyn DedupMetricsReport> {
905        self.0.clone()
906    }
907}
908
909impl fmt::Debug for PartitionMetrics {
910    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
911        let metrics = self.0.metrics.lock().unwrap();
912        write!(
913            f,
914            r#"{{"partition":{}, "metrics":{:?}}}"#,
915            self.0.partition, metrics
916        )
917    }
918}
919
920/// Metrics for the series distributor.
921#[derive(Default)]
922pub(crate) struct SeriesDistributorMetrics {
923    /// Number of send timeout in SeriesScan.
924    pub(crate) num_series_send_timeout: usize,
925    /// Number of send full in SeriesScan.
926    pub(crate) num_series_send_full: usize,
927    /// Number of rows the series distributor scanned.
928    pub(crate) num_rows: usize,
929    /// Number of batches the series distributor scanned.
930    pub(crate) num_batches: usize,
931    /// Duration of the series distributor to scan.
932    pub(crate) scan_cost: Duration,
933    /// Duration of the series distributor to yield.
934    pub(crate) yield_cost: Duration,
935}
936
937/// Scans memtable ranges at `index`.
938#[tracing::instrument(
939    skip_all,
940    fields(
941        region_id = %stream_ctx.input.region_metadata().region_id,
942        file_or_mem_index = %index.index,
943        row_group_index = %index.row_group_index,
944        source = "mem"
945    )
946)]
947pub(crate) fn scan_mem_ranges(
948    stream_ctx: Arc<StreamContext>,
949    part_metrics: PartitionMetrics,
950    index: RowGroupIndex,
951    time_range: FileTimeRange,
952) -> impl Stream<Item = Result<Batch>> {
953    try_stream! {
954        let ranges = stream_ctx.input.build_mem_ranges(index);
955        part_metrics.inc_num_mem_ranges(ranges.len());
956        for range in ranges {
957            let build_reader_start = Instant::now();
958            let mem_scan_metrics = Some(MemScanMetrics::default());
959            let iter = range.build_prune_iter(time_range, mem_scan_metrics.clone())?;
960            part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
961
962            let mut source = Source::Iter(iter);
963            while let Some(batch) = source.next_batch().await? {
964                yield batch;
965            }
966
967            // Report the memtable scan metrics to partition metrics
968            if let Some(ref metrics) = mem_scan_metrics {
969                let data = metrics.data();
970                part_metrics.report_mem_scan_metrics(&data);
971            }
972        }
973    }
974}
975
976/// Scans memtable ranges at `index` using flat format that returns RecordBatch.
977#[tracing::instrument(
978    skip_all,
979    fields(
980        region_id = %stream_ctx.input.region_metadata().region_id,
981        row_group_index = %index.index,
982        source = "mem_flat"
983    )
984)]
985pub(crate) fn scan_flat_mem_ranges(
986    stream_ctx: Arc<StreamContext>,
987    part_metrics: PartitionMetrics,
988    index: RowGroupIndex,
989) -> impl Stream<Item = Result<RecordBatch>> {
990    try_stream! {
991        let ranges = stream_ctx.input.build_mem_ranges(index);
992        part_metrics.inc_num_mem_ranges(ranges.len());
993        for range in ranges {
994            let build_reader_start = Instant::now();
995            let mem_scan_metrics = Some(MemScanMetrics::default());
996            let mut iter = range.build_record_batch_iter(mem_scan_metrics.clone())?;
997            part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
998
999            while let Some(record_batch) = iter.next().transpose()? {
1000                yield record_batch;
1001            }
1002
1003            // Report the memtable scan metrics to partition metrics
1004            if let Some(ref metrics) = mem_scan_metrics {
1005                let data = metrics.data();
1006                part_metrics.report_mem_scan_metrics(&data);
1007            }
1008        }
1009    }
1010}
1011
1012/// Files with row count greater than this threshold can contribute to the estimation.
1013const SPLIT_ROW_THRESHOLD: u64 = DEFAULT_ROW_GROUP_SIZE as u64;
1014/// Number of series threshold for splitting batches.
1015const NUM_SERIES_THRESHOLD: u64 = 10240;
1016/// Minimum batch size after splitting. The batch size is less than 60 because a series may only have
1017/// 60 samples per hour.
1018const BATCH_SIZE_THRESHOLD: u64 = 50;
1019
1020/// Returns true if splitting flat record batches may improve merge performance.
1021pub(crate) fn should_split_flat_batches_for_merge(
1022    stream_ctx: &Arc<StreamContext>,
1023    range_meta: &RangeMeta,
1024) -> bool {
1025    // Number of files to split and scan.
1026    let mut num_files_to_split = 0;
1027    let mut num_mem_rows = 0;
1028    let mut num_mem_series = 0;
1029    // Checks each file range, returns early if any range is not splittable.
1030    // For mem ranges, we collect the total number of rows and series because the number of rows in a
1031    // mem range may be too small.
1032    for index in &range_meta.row_group_indices {
1033        if stream_ctx.is_mem_range_index(*index) {
1034            let memtable = &stream_ctx.input.memtables[index.index];
1035            // Is mem range
1036            let stats = memtable.stats();
1037            num_mem_rows += stats.num_rows();
1038            num_mem_series += stats.series_count();
1039        } else if stream_ctx.is_file_range_index(*index) {
1040            // This is a file range.
1041            let file_index = index.index - stream_ctx.input.num_memtables();
1042            let file = &stream_ctx.input.files[file_index];
1043            if file.meta_ref().num_rows < SPLIT_ROW_THRESHOLD || file.meta_ref().num_series == 0 {
1044                // If the file doesn't have enough rows, or the number of series is unavailable, skips it.
1045                continue;
1046            }
1047            debug_assert!(file.meta_ref().num_rows > 0);
1048            if !can_split_series(file.meta_ref().num_rows, file.meta_ref().num_series) {
1049                // We can't split batches in a file.
1050                return false;
1051            } else {
1052                num_files_to_split += 1;
1053            }
1054        }
1055        // Skips non-file and non-mem ranges.
1056    }
1057
1058    if num_files_to_split > 0 {
1059        // We mainly consider file ranges because they have enough data for sampling.
1060        true
1061    } else if num_mem_series > 0 && num_mem_rows > 0 {
1062        // If we don't have files to scan, we check whether to split by the memtable.
1063        can_split_series(num_mem_rows as u64, num_mem_series as u64)
1064    } else {
1065        false
1066    }
1067}
1068
1069fn can_split_series(num_rows: u64, num_series: u64) -> bool {
1070    assert!(num_series > 0);
1071    assert!(num_rows > 0);
1072
1073    // It doesn't have too many series or it will have enough rows for each batch.
1074    num_series < NUM_SERIES_THRESHOLD || num_rows / num_series >= BATCH_SIZE_THRESHOLD
1075}
1076
1077/// Creates a new [ReaderFilterMetrics] with optional apply metrics initialized
1078/// based on the `explain_verbose` flag.
1079fn new_filter_metrics(explain_verbose: bool) -> ReaderFilterMetrics {
1080    if explain_verbose {
1081        ReaderFilterMetrics {
1082            inverted_index_apply_metrics: Some(InvertedIndexApplyMetrics::default()),
1083            bloom_filter_apply_metrics: Some(BloomFilterIndexApplyMetrics::default()),
1084            fulltext_index_apply_metrics: Some(FulltextIndexApplyMetrics::default()),
1085            ..Default::default()
1086        }
1087    } else {
1088        ReaderFilterMetrics::default()
1089    }
1090}
1091
1092/// Scans file ranges at `index`.
1093#[tracing::instrument(
1094    skip_all,
1095    fields(
1096        region_id = %stream_ctx.input.region_metadata().region_id,
1097        row_group_index = %index.index,
1098        source = read_type
1099    )
1100)]
1101pub(crate) async fn scan_file_ranges(
1102    stream_ctx: Arc<StreamContext>,
1103    part_metrics: PartitionMetrics,
1104    index: RowGroupIndex,
1105    read_type: &'static str,
1106    range_builder: Arc<RangeBuilderList>,
1107) -> Result<impl Stream<Item = Result<Batch>>> {
1108    let mut reader_metrics = ReaderMetrics {
1109        filter_metrics: new_filter_metrics(part_metrics.explain_verbose()),
1110        ..Default::default()
1111    };
1112    let ranges = range_builder
1113        .build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
1114        .await?;
1115    part_metrics.inc_num_file_ranges(ranges.len());
1116    part_metrics.merge_reader_metrics(&reader_metrics, None);
1117
1118    // Creates initial per-file metrics with build_part_cost.
1119    let init_per_file_metrics = if part_metrics.explain_verbose() {
1120        let file = stream_ctx.input.file_from_index(index);
1121        let file_id = file.file_id();
1122
1123        let mut map = HashMap::new();
1124        map.insert(
1125            file_id,
1126            FileScanMetrics {
1127                build_part_cost: reader_metrics.build_cost,
1128                ..Default::default()
1129            },
1130        );
1131        Some(map)
1132    } else {
1133        None
1134    };
1135
1136    Ok(build_file_range_scan_stream(
1137        stream_ctx,
1138        part_metrics,
1139        read_type,
1140        ranges,
1141        init_per_file_metrics,
1142    ))
1143}
1144
1145/// Scans file ranges at `index` using flat reader that returns RecordBatch.
1146#[tracing::instrument(
1147    skip_all,
1148    fields(
1149        region_id = %stream_ctx.input.region_metadata().region_id,
1150        row_group_index = %index.index,
1151        source = read_type
1152    )
1153)]
1154pub(crate) async fn scan_flat_file_ranges(
1155    stream_ctx: Arc<StreamContext>,
1156    part_metrics: PartitionMetrics,
1157    index: RowGroupIndex,
1158    read_type: &'static str,
1159    range_builder: Arc<RangeBuilderList>,
1160) -> Result<impl Stream<Item = Result<RecordBatch>>> {
1161    let mut reader_metrics = ReaderMetrics {
1162        filter_metrics: new_filter_metrics(part_metrics.explain_verbose()),
1163        ..Default::default()
1164    };
1165    let ranges = range_builder
1166        .build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
1167        .await?;
1168    part_metrics.inc_num_file_ranges(ranges.len());
1169    part_metrics.merge_reader_metrics(&reader_metrics, None);
1170
1171    // Creates initial per-file metrics with build_part_cost.
1172    let init_per_file_metrics = if part_metrics.explain_verbose() {
1173        let file = stream_ctx.input.file_from_index(index);
1174        let file_id = file.file_id();
1175
1176        let mut map = HashMap::new();
1177        map.insert(
1178            file_id,
1179            FileScanMetrics {
1180                build_part_cost: reader_metrics.build_cost,
1181                ..Default::default()
1182            },
1183        );
1184        Some(map)
1185    } else {
1186        None
1187    };
1188
1189    Ok(build_flat_file_range_scan_stream(
1190        stream_ctx,
1191        part_metrics,
1192        read_type,
1193        ranges,
1194        init_per_file_metrics,
1195    ))
1196}
1197
1198/// Build the stream of scanning the input [`FileRange`]s.
1199#[tracing::instrument(
1200    skip_all,
1201    fields(read_type = read_type, range_count = ranges.len())
1202)]
1203pub fn build_file_range_scan_stream(
1204    stream_ctx: Arc<StreamContext>,
1205    part_metrics: PartitionMetrics,
1206    read_type: &'static str,
1207    ranges: SmallVec<[FileRange; 2]>,
1208    mut per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
1209) -> impl Stream<Item = Result<Batch>> {
1210    try_stream! {
1211        let fetch_metrics = if part_metrics.explain_verbose() {
1212            Some(Arc::new(ParquetFetchMetrics::default()))
1213        } else {
1214            None
1215        };
1216        let reader_metrics = &mut ReaderMetrics {
1217            fetch_metrics: fetch_metrics.clone(),
1218            ..Default::default()
1219        };
1220        for range in ranges {
1221            let build_reader_start = Instant::now();
1222            let Some(reader) = range.reader(stream_ctx.input.series_row_selector, fetch_metrics.as_deref()).await? else {
1223                continue;
1224            };
1225            let build_cost = build_reader_start.elapsed();
1226            part_metrics.inc_build_reader_cost(build_cost);
1227            let compat_batch = range.compat_batch();
1228            let mut source = Source::PruneReader(reader);
1229            while let Some(mut batch) = source.next_batch().await? {
1230                if let Some(compact_batch) = compat_batch {
1231                    batch = compact_batch.as_primary_key().unwrap().compat_batch(batch)?;
1232                }
1233                yield batch;
1234            }
1235            if let Source::PruneReader(reader) = source {
1236                let prune_metrics = reader.metrics();
1237
1238                // Update per-file metrics if tracking is enabled
1239                if let Some(file_metrics_map) = per_file_metrics.as_mut() {
1240                    let file_id = range.file_handle().file_id();
1241                    let file_metrics = file_metrics_map
1242                        .entry(file_id)
1243                        .or_insert_with(FileScanMetrics::default);
1244
1245                    file_metrics.num_ranges += 1;
1246                    file_metrics.num_rows += prune_metrics.num_rows;
1247                    file_metrics.build_reader_cost += build_cost;
1248                    file_metrics.scan_cost += prune_metrics.scan_cost;
1249                }
1250
1251                reader_metrics.merge_from(&prune_metrics);
1252            }
1253        }
1254
1255        // Reports metrics.
1256        reader_metrics.observe_rows(read_type);
1257        reader_metrics.filter_metrics.observe();
1258        part_metrics.merge_reader_metrics(reader_metrics, per_file_metrics.as_ref());
1259    }
1260}
1261
1262/// Build the stream of scanning the input [`FileRange`]s using flat reader that returns RecordBatch.
1263#[tracing::instrument(
1264    skip_all,
1265    fields(read_type = read_type, range_count = ranges.len())
1266)]
1267pub fn build_flat_file_range_scan_stream(
1268    _stream_ctx: Arc<StreamContext>,
1269    part_metrics: PartitionMetrics,
1270    read_type: &'static str,
1271    ranges: SmallVec<[FileRange; 2]>,
1272    mut per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
1273) -> impl Stream<Item = Result<RecordBatch>> {
1274    try_stream! {
1275        let fetch_metrics = if part_metrics.explain_verbose() {
1276            Some(Arc::new(ParquetFetchMetrics::default()))
1277        } else {
1278            None
1279        };
1280        let reader_metrics = &mut ReaderMetrics {
1281            fetch_metrics: fetch_metrics.clone(),
1282            ..Default::default()
1283        };
1284        for range in ranges {
1285            let build_reader_start = Instant::now();
1286            let Some(mut reader) = range.flat_reader(fetch_metrics.as_deref()).await? else{continue};
1287            let build_cost = build_reader_start.elapsed();
1288            part_metrics.inc_build_reader_cost(build_cost);
1289
1290            let may_compat = range
1291                .compat_batch()
1292                .map(|compat| {
1293                    compat.as_flat().context(UnexpectedSnafu {
1294                        reason: "Invalid compat for flat format",
1295                    })
1296                })
1297                .transpose()?;
1298            while let Some(record_batch) = reader.next_batch()? {
1299                if let Some(flat_compat) = may_compat {
1300                    let batch = flat_compat.compat(record_batch)?;
1301                    yield batch;
1302                } else {
1303                    yield record_batch;
1304                }
1305            }
1306
1307            let prune_metrics = reader.metrics();
1308
1309            // Update per-file metrics if tracking is enabled
1310            if let Some(file_metrics_map) = per_file_metrics.as_mut() {
1311                let file_id = range.file_handle().file_id();
1312                let file_metrics = file_metrics_map
1313                    .entry(file_id)
1314                    .or_insert_with(FileScanMetrics::default);
1315
1316                file_metrics.num_ranges += 1;
1317                file_metrics.num_rows += prune_metrics.num_rows;
1318                file_metrics.build_reader_cost += build_cost;
1319                file_metrics.scan_cost += prune_metrics.scan_cost;
1320            }
1321
1322            reader_metrics.merge_from(&prune_metrics);
1323        }
1324
1325        // Reports metrics.
1326        reader_metrics.observe_rows(read_type);
1327        reader_metrics.filter_metrics.observe();
1328        part_metrics.merge_reader_metrics(reader_metrics, per_file_metrics.as_ref());
1329    }
1330}
1331
1332/// Build the stream of scanning the extension range denoted by the [`RowGroupIndex`].
1333#[cfg(feature = "enterprise")]
1334pub(crate) async fn scan_extension_range(
1335    context: Arc<StreamContext>,
1336    index: RowGroupIndex,
1337    partition_metrics: PartitionMetrics,
1338) -> Result<BoxedBatchStream> {
1339    use snafu::ResultExt;
1340
1341    let range = context.input.extension_range(index.index);
1342    let reader = range.reader(context.as_ref());
1343    let stream = reader
1344        .read(context, partition_metrics, index)
1345        .await
1346        .context(crate::error::ScanExternalRangeSnafu)?;
1347    Ok(stream)
1348}
1349
1350pub(crate) async fn maybe_scan_other_ranges(
1351    context: &Arc<StreamContext>,
1352    index: RowGroupIndex,
1353    metrics: &PartitionMetrics,
1354) -> Result<BoxedBatchStream> {
1355    #[cfg(feature = "enterprise")]
1356    {
1357        scan_extension_range(context.clone(), index, metrics.clone()).await
1358    }
1359
1360    #[cfg(not(feature = "enterprise"))]
1361    {
1362        let _ = context;
1363        let _ = index;
1364        let _ = metrics;
1365
1366        crate::error::UnexpectedSnafu {
1367            reason: "no other ranges scannable",
1368        }
1369        .fail()
1370    }
1371}
1372
1373pub(crate) async fn maybe_scan_flat_other_ranges(
1374    context: &Arc<StreamContext>,
1375    index: RowGroupIndex,
1376    metrics: &PartitionMetrics,
1377) -> Result<BoxedRecordBatchStream> {
1378    let _ = context;
1379    let _ = index;
1380    let _ = metrics;
1381
1382    crate::error::UnexpectedSnafu {
1383        reason: "no other ranges scannable in flat format",
1384    }
1385    .fail()
1386}
1387
1388/// A stream wrapper that splits record batches from an inner stream.
1389pub(crate) struct SplitRecordBatchStream<S> {
1390    /// The inner stream that yields record batches.
1391    inner: S,
1392    /// Buffer for split batches.
1393    batches: VecDeque<RecordBatch>,
1394}
1395
1396impl<S> SplitRecordBatchStream<S> {
1397    /// Creates a new splitting stream wrapper.
1398    pub(crate) fn new(inner: S) -> Self {
1399        Self {
1400            inner,
1401            batches: VecDeque::new(),
1402        }
1403    }
1404}
1405
1406impl<S> Stream for SplitRecordBatchStream<S>
1407where
1408    S: Stream<Item = Result<RecordBatch>> + Unpin,
1409{
1410    type Item = Result<RecordBatch>;
1411
1412    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1413        loop {
1414            // First, check if we have buffered split batches
1415            if let Some(batch) = self.batches.pop_front() {
1416                return Poll::Ready(Some(Ok(batch)));
1417            }
1418
1419            // Poll the inner stream for the next batch
1420            let record_batch = match futures::ready!(Pin::new(&mut self.inner).poll_next(cx)) {
1421                Some(Ok(batch)) => batch,
1422                Some(Err(e)) => return Poll::Ready(Some(Err(e))),
1423                None => return Poll::Ready(None),
1424            };
1425
1426            // Split the batch and buffer the results
1427            split_record_batch(record_batch, &mut self.batches);
1428            // Continue the loop to return the first split batch
1429        }
1430    }
1431}
1432
1433/// Splits the batch by timestamps.
1434///
1435/// # Panics
1436/// Panics if the timestamp array is invalid.
1437pub(crate) fn split_record_batch(record_batch: RecordBatch, batches: &mut VecDeque<RecordBatch>) {
1438    let batch_rows = record_batch.num_rows();
1439    if batch_rows == 0 {
1440        return;
1441    }
1442    if batch_rows < 2 {
1443        batches.push_back(record_batch);
1444        return;
1445    }
1446
1447    let time_index_pos = time_index_column_index(record_batch.num_columns());
1448    let timestamps = record_batch.column(time_index_pos);
1449    let (ts_values, _unit) = timestamp_array_to_primitive(timestamps).unwrap();
1450    let mut offsets = Vec::with_capacity(16);
1451    offsets.push(0);
1452    let values = ts_values.values();
1453    for (i, &value) in values.iter().take(batch_rows - 1).enumerate() {
1454        if value > values[i + 1] {
1455            offsets.push(i + 1);
1456        }
1457    }
1458    offsets.push(values.len());
1459
1460    // Splits the batch by offsets.
1461    for (i, &start) in offsets[..offsets.len() - 1].iter().enumerate() {
1462        let end = offsets[i + 1];
1463        let rows_in_batch = end - start;
1464        batches.push_back(record_batch.slice(start, rows_in_batch));
1465    }
1466}