mito2/read/
scan_util.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities for scanners.
16
17use std::collections::{BinaryHeap, HashMap, VecDeque};
18use std::fmt;
19use std::pin::Pin;
20use std::sync::{Arc, Mutex};
21use std::task::{Context, Poll};
22use std::time::{Duration, Instant};
23
24use async_stream::try_stream;
25use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time};
26use datatypes::arrow::record_batch::RecordBatch;
27use datatypes::timestamp::timestamp_array_to_primitive;
28use futures::Stream;
29use prometheus::IntGauge;
30use smallvec::SmallVec;
31use snafu::OptionExt;
32use store_api::storage::RegionId;
33
34use crate::error::{Result, UnexpectedSnafu};
35use crate::memtable::MemScanMetrics;
36use crate::metrics::{
37    IN_PROGRESS_SCAN, PRECISE_FILTER_ROWS_TOTAL, READ_BATCHES_RETURN, READ_ROW_GROUPS_TOTAL,
38    READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_RETURN, READ_STAGE_ELAPSED,
39};
40use crate::read::dedup::{DedupMetrics, DedupMetricsReport};
41use crate::read::merge::{MergeMetrics, MergeMetricsReport};
42use crate::read::range::{RangeBuilderList, RangeMeta, RowGroupIndex};
43use crate::read::scan_region::StreamContext;
44use crate::read::{Batch, BoxedBatchStream, BoxedRecordBatchStream, ScannerMetrics, Source};
45use crate::sst::file::{FileTimeRange, RegionFileId};
46use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplyMetrics;
47use crate::sst::index::fulltext_index::applier::FulltextIndexApplyMetrics;
48use crate::sst::index::inverted_index::applier::InvertedIndexApplyMetrics;
49use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
50use crate::sst::parquet::file_range::FileRange;
51use crate::sst::parquet::flat_format::time_index_column_index;
52use crate::sst::parquet::reader::{MetadataCacheMetrics, ReaderFilterMetrics, ReaderMetrics};
53use crate::sst::parquet::row_group::ParquetFetchMetrics;
54
55/// Per-file scan metrics.
56#[derive(Default, Clone)]
57pub struct FileScanMetrics {
58    /// Number of ranges (row groups) read from this file.
59    pub num_ranges: usize,
60    /// Number of rows read from this file.
61    pub num_rows: usize,
62    /// Time spent building file ranges/parts (file-level preparation).
63    pub build_part_cost: Duration,
64    /// Time spent building readers for this file (accumulated across all ranges).
65    pub build_reader_cost: Duration,
66    /// Time spent scanning this file (accumulated across all ranges).
67    pub scan_cost: Duration,
68}
69
70impl fmt::Debug for FileScanMetrics {
71    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
72        write!(f, "{{\"build_part_cost\":\"{:?}\"", self.build_part_cost)?;
73
74        if self.num_ranges > 0 {
75            write!(f, ", \"num_ranges\":{}", self.num_ranges)?;
76        }
77        if self.num_rows > 0 {
78            write!(f, ", \"num_rows\":{}", self.num_rows)?;
79        }
80        if !self.build_reader_cost.is_zero() {
81            write!(
82                f,
83                ", \"build_reader_cost\":\"{:?}\"",
84                self.build_reader_cost
85            )?;
86        }
87        if !self.scan_cost.is_zero() {
88            write!(f, ", \"scan_cost\":\"{:?}\"", self.scan_cost)?;
89        }
90
91        write!(f, "}}")
92    }
93}
94
95impl FileScanMetrics {
96    /// Merges another FileMetrics into this one.
97    pub(crate) fn merge_from(&mut self, other: &FileScanMetrics) {
98        self.num_ranges += other.num_ranges;
99        self.num_rows += other.num_rows;
100        self.build_part_cost += other.build_part_cost;
101        self.build_reader_cost += other.build_reader_cost;
102        self.scan_cost += other.scan_cost;
103    }
104}
105
106/// Verbose scan metrics for a partition.
107#[derive(Default)]
108pub(crate) struct ScanMetricsSet {
109    /// Duration to prepare the scan task.
110    prepare_scan_cost: Duration,
111    /// Duration to build the (merge) reader.
112    build_reader_cost: Duration,
113    /// Duration to scan data.
114    scan_cost: Duration,
115    /// Duration while waiting for `yield`.
116    yield_cost: Duration,
117    /// Duration of the scan.
118    total_cost: Duration,
119    /// Number of rows returned.
120    num_rows: usize,
121    /// Number of batches returned.
122    num_batches: usize,
123    /// Number of mem ranges scanned.
124    num_mem_ranges: usize,
125    /// Number of file ranges scanned.
126    num_file_ranges: usize,
127
128    // Memtable related metrics:
129    /// Duration to scan memtables.
130    mem_scan_cost: Duration,
131    /// Number of rows read from memtables.
132    mem_rows: usize,
133    /// Number of batches read from memtables.
134    mem_batches: usize,
135    /// Number of series read from memtables.
136    mem_series: usize,
137
138    // SST related metrics:
139    /// Duration to build file ranges.
140    build_parts_cost: Duration,
141    /// Duration to scan SST files.
142    sst_scan_cost: Duration,
143    /// Number of row groups before filtering.
144    rg_total: usize,
145    /// Number of row groups filtered by fulltext index.
146    rg_fulltext_filtered: usize,
147    /// Number of row groups filtered by inverted index.
148    rg_inverted_filtered: usize,
149    /// Number of row groups filtered by min-max index.
150    rg_minmax_filtered: usize,
151    /// Number of row groups filtered by bloom filter index.
152    rg_bloom_filtered: usize,
153    /// Number of rows in row group before filtering.
154    rows_before_filter: usize,
155    /// Number of rows in row group filtered by fulltext index.
156    rows_fulltext_filtered: usize,
157    /// Number of rows in row group filtered by inverted index.
158    rows_inverted_filtered: usize,
159    /// Number of rows in row group filtered by bloom filter index.
160    rows_bloom_filtered: usize,
161    /// Number of rows filtered by precise filter.
162    rows_precise_filtered: usize,
163    /// Number of record batches read from SST.
164    num_sst_record_batches: usize,
165    /// Number of batches decoded from SST.
166    num_sst_batches: usize,
167    /// Number of rows read from SST.
168    num_sst_rows: usize,
169
170    /// Elapsed time before the first poll operation.
171    first_poll: Duration,
172
173    /// Number of send timeout in SeriesScan.
174    num_series_send_timeout: usize,
175    /// Number of send full in SeriesScan.
176    num_series_send_full: usize,
177    /// Number of rows the series distributor scanned.
178    num_distributor_rows: usize,
179    /// Number of batches the series distributor scanned.
180    num_distributor_batches: usize,
181    /// Duration of the series distributor to scan.
182    distributor_scan_cost: Duration,
183    /// Duration of the series distributor to yield.
184    distributor_yield_cost: Duration,
185
186    /// Merge metrics.
187    merge_metrics: MergeMetrics,
188    /// Dedup metrics.
189    dedup_metrics: DedupMetrics,
190
191    /// The stream reached EOF
192    stream_eof: bool,
193
194    // Optional verbose metrics:
195    /// Inverted index apply metrics.
196    inverted_index_apply_metrics: Option<InvertedIndexApplyMetrics>,
197    /// Bloom filter index apply metrics.
198    bloom_filter_apply_metrics: Option<BloomFilterIndexApplyMetrics>,
199    /// Fulltext index apply metrics.
200    fulltext_index_apply_metrics: Option<FulltextIndexApplyMetrics>,
201    /// Parquet fetch metrics.
202    fetch_metrics: Option<ParquetFetchMetrics>,
203    /// Metadata cache metrics.
204    metadata_cache_metrics: Option<MetadataCacheMetrics>,
205    /// Per-file scan metrics, only populated when explain_verbose is true.
206    per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
207}
208
209/// Wrapper for file metrics that compares by total cost in reverse order.
210/// This allows using BinaryHeap as a min-heap for efficient top-K selection.
211struct CompareCostReverse<'a> {
212    total_cost: Duration,
213    file_id: RegionFileId,
214    metrics: &'a FileScanMetrics,
215}
216
217impl Ord for CompareCostReverse<'_> {
218    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
219        // Reverse comparison: smaller costs are "greater"
220        other.total_cost.cmp(&self.total_cost)
221    }
222}
223
224impl PartialOrd for CompareCostReverse<'_> {
225    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
226        Some(self.cmp(other))
227    }
228}
229
230impl Eq for CompareCostReverse<'_> {}
231
232impl PartialEq for CompareCostReverse<'_> {
233    fn eq(&self, other: &Self) -> bool {
234        self.total_cost == other.total_cost
235    }
236}
237
238impl fmt::Debug for ScanMetricsSet {
239    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
240        let ScanMetricsSet {
241            prepare_scan_cost,
242            build_reader_cost,
243            scan_cost,
244            yield_cost,
245            total_cost,
246            num_rows,
247            num_batches,
248            num_mem_ranges,
249            num_file_ranges,
250            build_parts_cost,
251            sst_scan_cost,
252            rg_total,
253            rg_fulltext_filtered,
254            rg_inverted_filtered,
255            rg_minmax_filtered,
256            rg_bloom_filtered,
257            rows_before_filter,
258            rows_fulltext_filtered,
259            rows_inverted_filtered,
260            rows_bloom_filtered,
261            rows_precise_filtered,
262            num_sst_record_batches,
263            num_sst_batches,
264            num_sst_rows,
265            first_poll,
266            num_series_send_timeout,
267            num_series_send_full,
268            num_distributor_rows,
269            num_distributor_batches,
270            distributor_scan_cost,
271            distributor_yield_cost,
272            merge_metrics,
273            dedup_metrics,
274            stream_eof,
275            mem_scan_cost,
276            mem_rows,
277            mem_batches,
278            mem_series,
279            inverted_index_apply_metrics,
280            bloom_filter_apply_metrics,
281            fulltext_index_apply_metrics,
282            fetch_metrics,
283            metadata_cache_metrics,
284            per_file_metrics,
285        } = self;
286
287        // Write core metrics
288        write!(
289            f,
290            "{{\"prepare_scan_cost\":\"{prepare_scan_cost:?}\", \
291            \"build_reader_cost\":\"{build_reader_cost:?}\", \
292            \"scan_cost\":\"{scan_cost:?}\", \
293            \"yield_cost\":\"{yield_cost:?}\", \
294            \"total_cost\":\"{total_cost:?}\", \
295            \"num_rows\":{num_rows}, \
296            \"num_batches\":{num_batches}, \
297            \"num_mem_ranges\":{num_mem_ranges}, \
298            \"num_file_ranges\":{num_file_ranges}, \
299            \"build_parts_cost\":\"{build_parts_cost:?}\", \
300            \"sst_scan_cost\":\"{sst_scan_cost:?}\", \
301            \"rg_total\":{rg_total}, \
302            \"rows_before_filter\":{rows_before_filter}, \
303            \"num_sst_record_batches\":{num_sst_record_batches}, \
304            \"num_sst_batches\":{num_sst_batches}, \
305            \"num_sst_rows\":{num_sst_rows}, \
306            \"first_poll\":\"{first_poll:?}\""
307        )?;
308
309        // Write non-zero filter counters
310        if *rg_fulltext_filtered > 0 {
311            write!(f, ", \"rg_fulltext_filtered\":{rg_fulltext_filtered}")?;
312        }
313        if *rg_inverted_filtered > 0 {
314            write!(f, ", \"rg_inverted_filtered\":{rg_inverted_filtered}")?;
315        }
316        if *rg_minmax_filtered > 0 {
317            write!(f, ", \"rg_minmax_filtered\":{rg_minmax_filtered}")?;
318        }
319        if *rg_bloom_filtered > 0 {
320            write!(f, ", \"rg_bloom_filtered\":{rg_bloom_filtered}")?;
321        }
322        if *rows_fulltext_filtered > 0 {
323            write!(f, ", \"rows_fulltext_filtered\":{rows_fulltext_filtered}")?;
324        }
325        if *rows_inverted_filtered > 0 {
326            write!(f, ", \"rows_inverted_filtered\":{rows_inverted_filtered}")?;
327        }
328        if *rows_bloom_filtered > 0 {
329            write!(f, ", \"rows_bloom_filtered\":{rows_bloom_filtered}")?;
330        }
331        if *rows_precise_filtered > 0 {
332            write!(f, ", \"rows_precise_filtered\":{rows_precise_filtered}")?;
333        }
334
335        // Write non-zero distributor metrics
336        if *num_series_send_timeout > 0 {
337            write!(f, ", \"num_series_send_timeout\":{num_series_send_timeout}")?;
338        }
339        if *num_series_send_full > 0 {
340            write!(f, ", \"num_series_send_full\":{num_series_send_full}")?;
341        }
342        if *num_distributor_rows > 0 {
343            write!(f, ", \"num_distributor_rows\":{num_distributor_rows}")?;
344        }
345        if *num_distributor_batches > 0 {
346            write!(f, ", \"num_distributor_batches\":{num_distributor_batches}")?;
347        }
348        if !distributor_scan_cost.is_zero() {
349            write!(
350                f,
351                ", \"distributor_scan_cost\":\"{distributor_scan_cost:?}\""
352            )?;
353        }
354        if !distributor_yield_cost.is_zero() {
355            write!(
356                f,
357                ", \"distributor_yield_cost\":\"{distributor_yield_cost:?}\""
358            )?;
359        }
360
361        // Write non-zero memtable metrics
362        if *mem_rows > 0 {
363            write!(f, ", \"mem_rows\":{mem_rows}")?;
364        }
365        if *mem_batches > 0 {
366            write!(f, ", \"mem_batches\":{mem_batches}")?;
367        }
368        if *mem_series > 0 {
369            write!(f, ", \"mem_series\":{mem_series}")?;
370        }
371        if !mem_scan_cost.is_zero() {
372            write!(f, ", \"mem_scan_cost\":\"{mem_scan_cost:?}\"")?;
373        }
374
375        // Write optional verbose metrics if they are not empty
376        if let Some(metrics) = inverted_index_apply_metrics
377            && !metrics.is_empty()
378        {
379            write!(f, ", \"inverted_index_apply_metrics\":{:?}", metrics)?;
380        }
381        if let Some(metrics) = bloom_filter_apply_metrics
382            && !metrics.is_empty()
383        {
384            write!(f, ", \"bloom_filter_apply_metrics\":{:?}", metrics)?;
385        }
386        if let Some(metrics) = fulltext_index_apply_metrics
387            && !metrics.is_empty()
388        {
389            write!(f, ", \"fulltext_index_apply_metrics\":{:?}", metrics)?;
390        }
391        if let Some(metrics) = fetch_metrics
392            && !metrics.is_empty()
393        {
394            write!(f, ", \"fetch_metrics\":{:?}", metrics)?;
395        }
396        if let Some(metrics) = metadata_cache_metrics
397            && !metrics.is_empty()
398        {
399            write!(f, ", \"metadata_cache_metrics\":{:?}", metrics)?;
400        }
401
402        // Write merge metrics if not empty
403        if !merge_metrics.scan_cost.is_zero() {
404            write!(f, ", \"merge_metrics\":{:?}", merge_metrics)?;
405        }
406
407        // Write dedup metrics if not empty
408        if !dedup_metrics.dedup_cost.is_zero() {
409            write!(f, ", \"dedup_metrics\":{:?}", dedup_metrics)?;
410        }
411
412        // Write top file metrics if present and non-empty
413        if let Some(file_metrics) = per_file_metrics
414            && !file_metrics.is_empty()
415        {
416            // Use min-heap (BinaryHeap with reverse comparison) to keep only top 10
417            let mut heap = BinaryHeap::new();
418            for (file_id, metrics) in file_metrics.iter() {
419                let total_cost =
420                    metrics.build_part_cost + metrics.build_reader_cost + metrics.scan_cost;
421
422                if heap.len() < 10 {
423                    // Haven't reached 10 yet, just push
424                    heap.push(CompareCostReverse {
425                        total_cost,
426                        file_id: *file_id,
427                        metrics,
428                    });
429                } else if let Some(min_entry) = heap.peek() {
430                    // If current cost is higher than the minimum in our top-10, replace it
431                    if total_cost > min_entry.total_cost {
432                        heap.pop();
433                        heap.push(CompareCostReverse {
434                            total_cost,
435                            file_id: *file_id,
436                            metrics,
437                        });
438                    }
439                }
440            }
441
442            let top_files = heap.into_sorted_vec();
443            write!(f, ", \"top_file_metrics\": {{")?;
444            for (i, item) in top_files.iter().enumerate() {
445                let CompareCostReverse {
446                    total_cost: _,
447                    file_id,
448                    metrics,
449                } = item;
450                if i > 0 {
451                    write!(f, ", ")?;
452                }
453                write!(f, "\"{}\": {:?}", file_id, metrics)?;
454            }
455            write!(f, "}}")?;
456        }
457
458        write!(f, ", \"stream_eof\":{stream_eof}}}")
459    }
460}
461impl ScanMetricsSet {
462    /// Attaches the `prepare_scan_cost` to the metrics set.
463    fn with_prepare_scan_cost(mut self, cost: Duration) -> Self {
464        self.prepare_scan_cost += cost;
465        self
466    }
467
468    /// Merges the local scanner metrics.
469    fn merge_scanner_metrics(&mut self, other: &ScannerMetrics) {
470        let ScannerMetrics {
471            prepare_scan_cost,
472            build_reader_cost,
473            scan_cost,
474            yield_cost,
475            num_batches,
476            num_rows,
477            num_mem_ranges,
478            num_file_ranges,
479        } = other;
480
481        self.prepare_scan_cost += *prepare_scan_cost;
482        self.build_reader_cost += *build_reader_cost;
483        self.scan_cost += *scan_cost;
484        self.yield_cost += *yield_cost;
485        self.num_rows += *num_rows;
486        self.num_batches += *num_batches;
487        self.num_mem_ranges += *num_mem_ranges;
488        self.num_file_ranges += *num_file_ranges;
489    }
490
491    /// Merges the local reader metrics.
492    fn merge_reader_metrics(&mut self, other: &ReaderMetrics) {
493        let ReaderMetrics {
494            build_cost,
495            filter_metrics:
496                ReaderFilterMetrics {
497                    rg_total,
498                    rg_fulltext_filtered,
499                    rg_inverted_filtered,
500                    rg_minmax_filtered,
501                    rg_bloom_filtered,
502                    rows_total,
503                    rows_fulltext_filtered,
504                    rows_inverted_filtered,
505                    rows_bloom_filtered,
506                    rows_precise_filtered,
507                    inverted_index_apply_metrics,
508                    bloom_filter_apply_metrics,
509                    fulltext_index_apply_metrics,
510                },
511            num_record_batches,
512            num_batches,
513            num_rows,
514            scan_cost,
515            metadata_cache_metrics,
516            fetch_metrics,
517        } = other;
518
519        self.build_parts_cost += *build_cost;
520        self.sst_scan_cost += *scan_cost;
521
522        self.rg_total += *rg_total;
523        self.rg_fulltext_filtered += *rg_fulltext_filtered;
524        self.rg_inverted_filtered += *rg_inverted_filtered;
525        self.rg_minmax_filtered += *rg_minmax_filtered;
526        self.rg_bloom_filtered += *rg_bloom_filtered;
527
528        self.rows_before_filter += *rows_total;
529        self.rows_fulltext_filtered += *rows_fulltext_filtered;
530        self.rows_inverted_filtered += *rows_inverted_filtered;
531        self.rows_bloom_filtered += *rows_bloom_filtered;
532        self.rows_precise_filtered += *rows_precise_filtered;
533
534        self.num_sst_record_batches += *num_record_batches;
535        self.num_sst_batches += *num_batches;
536        self.num_sst_rows += *num_rows;
537
538        // Merge optional verbose metrics
539        if let Some(metrics) = inverted_index_apply_metrics {
540            self.inverted_index_apply_metrics
541                .get_or_insert_with(InvertedIndexApplyMetrics::default)
542                .merge_from(metrics);
543        }
544        if let Some(metrics) = bloom_filter_apply_metrics {
545            self.bloom_filter_apply_metrics
546                .get_or_insert_with(BloomFilterIndexApplyMetrics::default)
547                .merge_from(metrics);
548        }
549        if let Some(metrics) = fulltext_index_apply_metrics {
550            self.fulltext_index_apply_metrics
551                .get_or_insert_with(FulltextIndexApplyMetrics::default)
552                .merge_from(metrics);
553        }
554        if let Some(metrics) = fetch_metrics {
555            self.fetch_metrics
556                .get_or_insert_with(ParquetFetchMetrics::default)
557                .merge_from(metrics);
558        }
559        self.metadata_cache_metrics
560            .get_or_insert_with(MetadataCacheMetrics::default)
561            .merge_from(metadata_cache_metrics);
562    }
563
564    /// Merges per-file metrics.
565    fn merge_per_file_metrics(&mut self, other: &HashMap<RegionFileId, FileScanMetrics>) {
566        let self_file_metrics = self.per_file_metrics.get_or_insert_with(HashMap::new);
567        for (file_id, metrics) in other {
568            self_file_metrics
569                .entry(*file_id)
570                .or_default()
571                .merge_from(metrics);
572        }
573    }
574
575    /// Sets distributor metrics.
576    fn set_distributor_metrics(&mut self, distributor_metrics: &SeriesDistributorMetrics) {
577        let SeriesDistributorMetrics {
578            num_series_send_timeout,
579            num_series_send_full,
580            num_rows,
581            num_batches,
582            scan_cost,
583            yield_cost,
584        } = distributor_metrics;
585
586        self.num_series_send_timeout += *num_series_send_timeout;
587        self.num_series_send_full += *num_series_send_full;
588        self.num_distributor_rows += *num_rows;
589        self.num_distributor_batches += *num_batches;
590        self.distributor_scan_cost += *scan_cost;
591        self.distributor_yield_cost += *yield_cost;
592    }
593
594    /// Observes metrics.
595    fn observe_metrics(&self) {
596        READ_STAGE_ELAPSED
597            .with_label_values(&["prepare_scan"])
598            .observe(self.prepare_scan_cost.as_secs_f64());
599        READ_STAGE_ELAPSED
600            .with_label_values(&["build_reader"])
601            .observe(self.build_reader_cost.as_secs_f64());
602        READ_STAGE_ELAPSED
603            .with_label_values(&["scan"])
604            .observe(self.scan_cost.as_secs_f64());
605        READ_STAGE_ELAPSED
606            .with_label_values(&["yield"])
607            .observe(self.yield_cost.as_secs_f64());
608        READ_STAGE_ELAPSED
609            .with_label_values(&["total"])
610            .observe(self.total_cost.as_secs_f64());
611        READ_ROWS_RETURN.observe(self.num_rows as f64);
612        READ_BATCHES_RETURN.observe(self.num_batches as f64);
613
614        READ_STAGE_ELAPSED
615            .with_label_values(&["build_parts"])
616            .observe(self.build_parts_cost.as_secs_f64());
617
618        READ_ROW_GROUPS_TOTAL
619            .with_label_values(&["before_filtering"])
620            .inc_by(self.rg_total as u64);
621        READ_ROW_GROUPS_TOTAL
622            .with_label_values(&["fulltext_index_filtered"])
623            .inc_by(self.rg_fulltext_filtered as u64);
624        READ_ROW_GROUPS_TOTAL
625            .with_label_values(&["inverted_index_filtered"])
626            .inc_by(self.rg_inverted_filtered as u64);
627        READ_ROW_GROUPS_TOTAL
628            .with_label_values(&["minmax_index_filtered"])
629            .inc_by(self.rg_minmax_filtered as u64);
630        READ_ROW_GROUPS_TOTAL
631            .with_label_values(&["bloom_filter_index_filtered"])
632            .inc_by(self.rg_bloom_filtered as u64);
633
634        PRECISE_FILTER_ROWS_TOTAL
635            .with_label_values(&["parquet"])
636            .inc_by(self.rows_precise_filtered as u64);
637        READ_ROWS_IN_ROW_GROUP_TOTAL
638            .with_label_values(&["before_filtering"])
639            .inc_by(self.rows_before_filter as u64);
640        READ_ROWS_IN_ROW_GROUP_TOTAL
641            .with_label_values(&["fulltext_index_filtered"])
642            .inc_by(self.rows_fulltext_filtered as u64);
643        READ_ROWS_IN_ROW_GROUP_TOTAL
644            .with_label_values(&["inverted_index_filtered"])
645            .inc_by(self.rows_inverted_filtered as u64);
646        READ_ROWS_IN_ROW_GROUP_TOTAL
647            .with_label_values(&["bloom_filter_index_filtered"])
648            .inc_by(self.rows_bloom_filtered as u64);
649    }
650}
651
652struct PartitionMetricsInner {
653    region_id: RegionId,
654    /// Index of the partition to scan.
655    partition: usize,
656    /// Label to distinguish different scan operation.
657    scanner_type: &'static str,
658    /// Query start time.
659    query_start: Instant,
660    /// Whether to use verbose logging.
661    explain_verbose: bool,
662    /// Verbose scan metrics that only log to debug logs by default.
663    metrics: Mutex<ScanMetricsSet>,
664    in_progress_scan: IntGauge,
665
666    // Normal metrics that always report to the [ExecutionPlanMetricsSet]:
667    /// Duration to build file ranges.
668    build_parts_cost: Time,
669    /// Duration to build the (merge) reader.
670    build_reader_cost: Time,
671    /// Duration to scan data.
672    scan_cost: Time,
673    /// Duration while waiting for `yield`.
674    yield_cost: Time,
675    /// Duration to convert [`Batch`]es.
676    convert_cost: Time,
677    /// Aggregated compute time reported to DataFusion.
678    elapsed_compute: Time,
679}
680
681impl PartitionMetricsInner {
682    fn on_finish(&self, stream_eof: bool) {
683        let mut metrics = self.metrics.lock().unwrap();
684        if metrics.total_cost.is_zero() {
685            metrics.total_cost = self.query_start.elapsed();
686        }
687        if !metrics.stream_eof {
688            metrics.stream_eof = stream_eof;
689        }
690    }
691}
692
693impl MergeMetricsReport for PartitionMetricsInner {
694    fn report(&self, metrics: &mut MergeMetrics) {
695        let mut scan_metrics = self.metrics.lock().unwrap();
696        // Merge the metrics into scan_metrics
697        scan_metrics.merge_metrics.merge(metrics);
698
699        // Reset the input metrics
700        *metrics = MergeMetrics::default();
701    }
702}
703
704impl DedupMetricsReport for PartitionMetricsInner {
705    fn report(&self, metrics: &mut DedupMetrics) {
706        let mut scan_metrics = self.metrics.lock().unwrap();
707        // Merge the metrics into scan_metrics
708        scan_metrics.dedup_metrics.merge(metrics);
709
710        // Reset the input metrics
711        *metrics = DedupMetrics::default();
712    }
713}
714
715impl Drop for PartitionMetricsInner {
716    fn drop(&mut self) {
717        self.on_finish(false);
718        let metrics = self.metrics.lock().unwrap();
719        metrics.observe_metrics();
720        self.in_progress_scan.dec();
721
722        if self.explain_verbose {
723            common_telemetry::info!(
724                "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}, convert_batch_costs: {}",
725                self.scanner_type,
726                self.region_id,
727                self.partition,
728                metrics,
729                self.convert_cost,
730            );
731        } else {
732            common_telemetry::debug!(
733                "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}, convert_batch_costs: {}",
734                self.scanner_type,
735                self.region_id,
736                self.partition,
737                metrics,
738                self.convert_cost,
739            );
740        }
741    }
742}
743
744/// List of PartitionMetrics.
745#[derive(Default)]
746pub(crate) struct PartitionMetricsList(Mutex<Vec<Option<PartitionMetrics>>>);
747
748impl PartitionMetricsList {
749    /// Sets a new [PartitionMetrics] at the specified partition.
750    pub(crate) fn set(&self, partition: usize, metrics: PartitionMetrics) {
751        let mut list = self.0.lock().unwrap();
752        if list.len() <= partition {
753            list.resize(partition + 1, None);
754        }
755        list[partition] = Some(metrics);
756    }
757
758    /// Format verbose metrics for each partition for explain.
759    pub(crate) fn format_verbose_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result {
760        let list = self.0.lock().unwrap();
761        write!(f, ", \"metrics_per_partition\": ")?;
762        f.debug_list()
763            .entries(list.iter().filter_map(|p| p.as_ref()))
764            .finish()?;
765        write!(f, "}}")
766    }
767}
768
769/// Metrics while reading a partition.
770#[derive(Clone)]
771pub struct PartitionMetrics(Arc<PartitionMetricsInner>);
772
773impl PartitionMetrics {
774    pub(crate) fn new(
775        region_id: RegionId,
776        partition: usize,
777        scanner_type: &'static str,
778        query_start: Instant,
779        explain_verbose: bool,
780        metrics_set: &ExecutionPlanMetricsSet,
781    ) -> Self {
782        let partition_str = partition.to_string();
783        let in_progress_scan = IN_PROGRESS_SCAN.with_label_values(&[scanner_type, &partition_str]);
784        in_progress_scan.inc();
785        let metrics = ScanMetricsSet::default().with_prepare_scan_cost(query_start.elapsed());
786        let inner = PartitionMetricsInner {
787            region_id,
788            partition,
789            scanner_type,
790            query_start,
791            explain_verbose,
792            metrics: Mutex::new(metrics),
793            in_progress_scan,
794            build_parts_cost: MetricBuilder::new(metrics_set)
795                .subset_time("build_parts_cost", partition),
796            build_reader_cost: MetricBuilder::new(metrics_set)
797                .subset_time("build_reader_cost", partition),
798            scan_cost: MetricBuilder::new(metrics_set).subset_time("scan_cost", partition),
799            yield_cost: MetricBuilder::new(metrics_set).subset_time("yield_cost", partition),
800            convert_cost: MetricBuilder::new(metrics_set).subset_time("convert_cost", partition),
801            elapsed_compute: MetricBuilder::new(metrics_set).elapsed_compute(partition),
802        };
803        Self(Arc::new(inner))
804    }
805
806    pub(crate) fn on_first_poll(&self) {
807        let mut metrics = self.0.metrics.lock().unwrap();
808        metrics.first_poll = self.0.query_start.elapsed();
809    }
810
811    pub(crate) fn inc_num_mem_ranges(&self, num: usize) {
812        let mut metrics = self.0.metrics.lock().unwrap();
813        metrics.num_mem_ranges += num;
814    }
815
816    pub fn inc_num_file_ranges(&self, num: usize) {
817        let mut metrics = self.0.metrics.lock().unwrap();
818        metrics.num_file_ranges += num;
819    }
820
821    fn record_elapsed_compute(&self, duration: Duration) {
822        if duration.is_zero() {
823            return;
824        }
825        self.0.elapsed_compute.add_duration(duration);
826    }
827
828    /// Merges `build_reader_cost`.
829    pub(crate) fn inc_build_reader_cost(&self, cost: Duration) {
830        self.0.build_reader_cost.add_duration(cost);
831
832        let mut metrics = self.0.metrics.lock().unwrap();
833        metrics.build_reader_cost += cost;
834    }
835
836    pub(crate) fn inc_convert_batch_cost(&self, cost: Duration) {
837        self.0.convert_cost.add_duration(cost);
838        self.record_elapsed_compute(cost);
839    }
840
841    /// Reports memtable scan metrics.
842    pub(crate) fn report_mem_scan_metrics(&self, data: &crate::memtable::MemScanMetricsData) {
843        let mut metrics = self.0.metrics.lock().unwrap();
844        metrics.mem_scan_cost += data.scan_cost;
845        metrics.mem_rows += data.num_rows;
846        metrics.mem_batches += data.num_batches;
847        metrics.mem_series += data.total_series;
848    }
849
850    /// Merges [ScannerMetrics], `build_reader_cost`, `scan_cost` and `yield_cost`.
851    pub(crate) fn merge_metrics(&self, metrics: &ScannerMetrics) {
852        self.0
853            .build_reader_cost
854            .add_duration(metrics.build_reader_cost);
855        self.0.scan_cost.add_duration(metrics.scan_cost);
856        self.record_elapsed_compute(metrics.scan_cost);
857        self.0.yield_cost.add_duration(metrics.yield_cost);
858        self.record_elapsed_compute(metrics.yield_cost);
859
860        let mut metrics_set = self.0.metrics.lock().unwrap();
861        metrics_set.merge_scanner_metrics(metrics);
862    }
863
864    /// Merges [ReaderMetrics] and `build_reader_cost`.
865    pub fn merge_reader_metrics(
866        &self,
867        metrics: &ReaderMetrics,
868        per_file_metrics: Option<&HashMap<RegionFileId, FileScanMetrics>>,
869    ) {
870        self.0.build_parts_cost.add_duration(metrics.build_cost);
871
872        let mut metrics_set = self.0.metrics.lock().unwrap();
873        metrics_set.merge_reader_metrics(metrics);
874
875        // Merge per-file metrics if provided
876        if let Some(file_metrics) = per_file_metrics {
877            metrics_set.merge_per_file_metrics(file_metrics);
878        }
879    }
880
881    /// Finishes the query.
882    pub(crate) fn on_finish(&self) {
883        self.0.on_finish(true);
884    }
885
886    /// Sets the distributor metrics.
887    pub(crate) fn set_distributor_metrics(&self, metrics: &SeriesDistributorMetrics) {
888        let mut metrics_set = self.0.metrics.lock().unwrap();
889        metrics_set.set_distributor_metrics(metrics);
890    }
891
892    /// Returns whether verbose explain is enabled.
893    pub(crate) fn explain_verbose(&self) -> bool {
894        self.0.explain_verbose
895    }
896
897    /// Returns a MergeMetricsReport trait object for reporting merge metrics.
898    pub(crate) fn merge_metrics_reporter(&self) -> Arc<dyn MergeMetricsReport> {
899        self.0.clone()
900    }
901
902    /// Returns a DedupMetricsReport trait object for reporting dedup metrics.
903    pub(crate) fn dedup_metrics_reporter(&self) -> Arc<dyn DedupMetricsReport> {
904        self.0.clone()
905    }
906}
907
908impl fmt::Debug for PartitionMetrics {
909    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
910        let metrics = self.0.metrics.lock().unwrap();
911        write!(
912            f,
913            r#"{{"partition":{}, "metrics":{:?}}}"#,
914            self.0.partition, metrics
915        )
916    }
917}
918
919/// Metrics for the series distributor.
920#[derive(Default)]
921pub(crate) struct SeriesDistributorMetrics {
922    /// Number of send timeout in SeriesScan.
923    pub(crate) num_series_send_timeout: usize,
924    /// Number of send full in SeriesScan.
925    pub(crate) num_series_send_full: usize,
926    /// Number of rows the series distributor scanned.
927    pub(crate) num_rows: usize,
928    /// Number of batches the series distributor scanned.
929    pub(crate) num_batches: usize,
930    /// Duration of the series distributor to scan.
931    pub(crate) scan_cost: Duration,
932    /// Duration of the series distributor to yield.
933    pub(crate) yield_cost: Duration,
934}
935
936/// Scans memtable ranges at `index`.
937pub(crate) fn scan_mem_ranges(
938    stream_ctx: Arc<StreamContext>,
939    part_metrics: PartitionMetrics,
940    index: RowGroupIndex,
941    time_range: FileTimeRange,
942) -> impl Stream<Item = Result<Batch>> {
943    try_stream! {
944        let ranges = stream_ctx.input.build_mem_ranges(index);
945        part_metrics.inc_num_mem_ranges(ranges.len());
946        for range in ranges {
947            let build_reader_start = Instant::now();
948            let mem_scan_metrics = Some(MemScanMetrics::default());
949            let iter = range.build_prune_iter(time_range, mem_scan_metrics.clone())?;
950            part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
951
952            let mut source = Source::Iter(iter);
953            while let Some(batch) = source.next_batch().await? {
954                yield batch;
955            }
956
957            // Report the memtable scan metrics to partition metrics
958            if let Some(ref metrics) = mem_scan_metrics {
959                let data = metrics.data();
960                part_metrics.report_mem_scan_metrics(&data);
961            }
962        }
963    }
964}
965
966/// Scans memtable ranges at `index` using flat format that returns RecordBatch.
967pub(crate) fn scan_flat_mem_ranges(
968    stream_ctx: Arc<StreamContext>,
969    part_metrics: PartitionMetrics,
970    index: RowGroupIndex,
971) -> impl Stream<Item = Result<RecordBatch>> {
972    try_stream! {
973        let ranges = stream_ctx.input.build_mem_ranges(index);
974        part_metrics.inc_num_mem_ranges(ranges.len());
975        for range in ranges {
976            let build_reader_start = Instant::now();
977            let mem_scan_metrics = Some(MemScanMetrics::default());
978            let mut iter = range.build_record_batch_iter(mem_scan_metrics.clone())?;
979            part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
980
981            while let Some(record_batch) = iter.next().transpose()? {
982                yield record_batch;
983            }
984
985            // Report the memtable scan metrics to partition metrics
986            if let Some(ref metrics) = mem_scan_metrics {
987                let data = metrics.data();
988                part_metrics.report_mem_scan_metrics(&data);
989            }
990        }
991    }
992}
993
994/// Files with row count greater than this threshold can contribute to the estimation.
995const SPLIT_ROW_THRESHOLD: u64 = DEFAULT_ROW_GROUP_SIZE as u64;
996/// Number of series threshold for splitting batches.
997const NUM_SERIES_THRESHOLD: u64 = 10240;
998/// Minimum batch size after splitting. The batch size is less than 60 because a series may only have
999/// 60 samples per hour.
1000const BATCH_SIZE_THRESHOLD: u64 = 50;
1001
1002/// Returns true if splitting flat record batches may improve merge performance.
1003pub(crate) fn should_split_flat_batches_for_merge(
1004    stream_ctx: &Arc<StreamContext>,
1005    range_meta: &RangeMeta,
1006) -> bool {
1007    // Number of files to split and scan.
1008    let mut num_files_to_split = 0;
1009    let mut num_mem_rows = 0;
1010    let mut num_mem_series = 0;
1011    // Checks each file range, returns early if any range is not splittable.
1012    // For mem ranges, we collect the total number of rows and series because the number of rows in a
1013    // mem range may be too small.
1014    for index in &range_meta.row_group_indices {
1015        if stream_ctx.is_mem_range_index(*index) {
1016            let memtable = &stream_ctx.input.memtables[index.index];
1017            // Is mem range
1018            let stats = memtable.stats();
1019            num_mem_rows += stats.num_rows();
1020            num_mem_series += stats.series_count();
1021        } else if stream_ctx.is_file_range_index(*index) {
1022            // This is a file range.
1023            let file_index = index.index - stream_ctx.input.num_memtables();
1024            let file = &stream_ctx.input.files[file_index];
1025            if file.meta_ref().num_rows < SPLIT_ROW_THRESHOLD || file.meta_ref().num_series == 0 {
1026                // If the file doesn't have enough rows, or the number of series is unavailable, skips it.
1027                continue;
1028            }
1029            debug_assert!(file.meta_ref().num_rows > 0);
1030            if !can_split_series(file.meta_ref().num_rows, file.meta_ref().num_series) {
1031                // We can't split batches in a file.
1032                return false;
1033            } else {
1034                num_files_to_split += 1;
1035            }
1036        }
1037        // Skips non-file and non-mem ranges.
1038    }
1039
1040    if num_files_to_split > 0 {
1041        // We mainly consider file ranges because they have enough data for sampling.
1042        true
1043    } else if num_mem_series > 0 && num_mem_rows > 0 {
1044        // If we don't have files to scan, we check whether to split by the memtable.
1045        can_split_series(num_mem_rows as u64, num_mem_series as u64)
1046    } else {
1047        false
1048    }
1049}
1050
1051fn can_split_series(num_rows: u64, num_series: u64) -> bool {
1052    assert!(num_series > 0);
1053    assert!(num_rows > 0);
1054
1055    // It doesn't have too many series or it will have enough rows for each batch.
1056    num_series < NUM_SERIES_THRESHOLD || num_rows / num_series >= BATCH_SIZE_THRESHOLD
1057}
1058
1059/// Creates a new [ReaderFilterMetrics] with optional apply metrics initialized
1060/// based on the `explain_verbose` flag.
1061fn new_filter_metrics(explain_verbose: bool) -> ReaderFilterMetrics {
1062    if explain_verbose {
1063        ReaderFilterMetrics {
1064            inverted_index_apply_metrics: Some(InvertedIndexApplyMetrics::default()),
1065            bloom_filter_apply_metrics: Some(BloomFilterIndexApplyMetrics::default()),
1066            fulltext_index_apply_metrics: Some(FulltextIndexApplyMetrics::default()),
1067            ..Default::default()
1068        }
1069    } else {
1070        ReaderFilterMetrics::default()
1071    }
1072}
1073
1074/// Scans file ranges at `index`.
1075pub(crate) async fn scan_file_ranges(
1076    stream_ctx: Arc<StreamContext>,
1077    part_metrics: PartitionMetrics,
1078    index: RowGroupIndex,
1079    read_type: &'static str,
1080    range_builder: Arc<RangeBuilderList>,
1081) -> Result<impl Stream<Item = Result<Batch>>> {
1082    let mut reader_metrics = ReaderMetrics {
1083        filter_metrics: new_filter_metrics(part_metrics.explain_verbose()),
1084        ..Default::default()
1085    };
1086    let ranges = range_builder
1087        .build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
1088        .await?;
1089    part_metrics.inc_num_file_ranges(ranges.len());
1090    part_metrics.merge_reader_metrics(&reader_metrics, None);
1091
1092    // Creates initial per-file metrics with build_part_cost.
1093    let init_per_file_metrics = if part_metrics.explain_verbose() {
1094        let file = stream_ctx.input.file_from_index(index);
1095        let file_id = file.file_id();
1096
1097        let mut map = HashMap::new();
1098        map.insert(
1099            file_id,
1100            FileScanMetrics {
1101                build_part_cost: reader_metrics.build_cost,
1102                ..Default::default()
1103            },
1104        );
1105        Some(map)
1106    } else {
1107        None
1108    };
1109
1110    Ok(build_file_range_scan_stream(
1111        stream_ctx,
1112        part_metrics,
1113        read_type,
1114        ranges,
1115        init_per_file_metrics,
1116    ))
1117}
1118
1119/// Scans file ranges at `index` using flat reader that returns RecordBatch.
1120pub(crate) async fn scan_flat_file_ranges(
1121    stream_ctx: Arc<StreamContext>,
1122    part_metrics: PartitionMetrics,
1123    index: RowGroupIndex,
1124    read_type: &'static str,
1125    range_builder: Arc<RangeBuilderList>,
1126) -> Result<impl Stream<Item = Result<RecordBatch>>> {
1127    let mut reader_metrics = ReaderMetrics {
1128        filter_metrics: new_filter_metrics(part_metrics.explain_verbose()),
1129        ..Default::default()
1130    };
1131    let ranges = range_builder
1132        .build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
1133        .await?;
1134    part_metrics.inc_num_file_ranges(ranges.len());
1135    part_metrics.merge_reader_metrics(&reader_metrics, None);
1136
1137    // Creates initial per-file metrics with build_part_cost.
1138    let init_per_file_metrics = if part_metrics.explain_verbose() {
1139        let file = stream_ctx.input.file_from_index(index);
1140        let file_id = file.file_id();
1141
1142        let mut map = HashMap::new();
1143        map.insert(
1144            file_id,
1145            FileScanMetrics {
1146                build_part_cost: reader_metrics.build_cost,
1147                ..Default::default()
1148            },
1149        );
1150        Some(map)
1151    } else {
1152        None
1153    };
1154
1155    Ok(build_flat_file_range_scan_stream(
1156        stream_ctx,
1157        part_metrics,
1158        read_type,
1159        ranges,
1160        init_per_file_metrics,
1161    ))
1162}
1163
1164/// Build the stream of scanning the input [`FileRange`]s.
1165pub fn build_file_range_scan_stream(
1166    stream_ctx: Arc<StreamContext>,
1167    part_metrics: PartitionMetrics,
1168    read_type: &'static str,
1169    ranges: SmallVec<[FileRange; 2]>,
1170    mut per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
1171) -> impl Stream<Item = Result<Batch>> {
1172    try_stream! {
1173        let fetch_metrics = if part_metrics.explain_verbose() {
1174            Some(Arc::new(ParquetFetchMetrics::default()))
1175        } else {
1176            None
1177        };
1178        let reader_metrics = &mut ReaderMetrics {
1179            fetch_metrics: fetch_metrics.clone(),
1180            ..Default::default()
1181        };
1182        for range in ranges {
1183            let build_reader_start = Instant::now();
1184            let reader = range.reader(stream_ctx.input.series_row_selector, fetch_metrics.as_deref()).await?;
1185            let build_cost = build_reader_start.elapsed();
1186            part_metrics.inc_build_reader_cost(build_cost);
1187            let compat_batch = range.compat_batch();
1188            let mut source = Source::PruneReader(reader);
1189            while let Some(mut batch) = source.next_batch().await? {
1190                if let Some(compact_batch) = compat_batch {
1191                    batch = compact_batch.as_primary_key().unwrap().compat_batch(batch)?;
1192                }
1193                yield batch;
1194            }
1195            if let Source::PruneReader(reader) = source {
1196                let prune_metrics = reader.metrics();
1197
1198                // Update per-file metrics if tracking is enabled
1199                if let Some(file_metrics_map) = per_file_metrics.as_mut() {
1200                    let file_id = range.file_handle().file_id();
1201                    let file_metrics = file_metrics_map
1202                        .entry(file_id)
1203                        .or_insert_with(FileScanMetrics::default);
1204
1205                    file_metrics.num_ranges += 1;
1206                    file_metrics.num_rows += prune_metrics.num_rows;
1207                    file_metrics.build_reader_cost += build_cost;
1208                    file_metrics.scan_cost += prune_metrics.scan_cost;
1209                }
1210
1211                reader_metrics.merge_from(&prune_metrics);
1212            }
1213        }
1214
1215        // Reports metrics.
1216        reader_metrics.observe_rows(read_type);
1217        reader_metrics.filter_metrics.observe();
1218        part_metrics.merge_reader_metrics(reader_metrics, per_file_metrics.as_ref());
1219    }
1220}
1221
1222/// Build the stream of scanning the input [`FileRange`]s using flat reader that returns RecordBatch.
1223pub fn build_flat_file_range_scan_stream(
1224    _stream_ctx: Arc<StreamContext>,
1225    part_metrics: PartitionMetrics,
1226    read_type: &'static str,
1227    ranges: SmallVec<[FileRange; 2]>,
1228    mut per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
1229) -> impl Stream<Item = Result<RecordBatch>> {
1230    try_stream! {
1231        let fetch_metrics = if part_metrics.explain_verbose() {
1232            Some(Arc::new(ParquetFetchMetrics::default()))
1233        } else {
1234            None
1235        };
1236        let reader_metrics = &mut ReaderMetrics {
1237            fetch_metrics: fetch_metrics.clone(),
1238            ..Default::default()
1239        };
1240        for range in ranges {
1241            let build_reader_start = Instant::now();
1242            let mut reader = range.flat_reader(fetch_metrics.as_deref()).await?;
1243            let build_cost = build_reader_start.elapsed();
1244            part_metrics.inc_build_reader_cost(build_cost);
1245
1246            let may_compat = range
1247                .compat_batch()
1248                .map(|compat| {
1249                    compat.as_flat().context(UnexpectedSnafu {
1250                        reason: "Invalid compat for flat format",
1251                    })
1252                })
1253                .transpose()?;
1254            while let Some(record_batch) = reader.next_batch()? {
1255                if let Some(flat_compat) = may_compat {
1256                    let batch = flat_compat.compat(record_batch)?;
1257                    yield batch;
1258                } else {
1259                    yield record_batch;
1260                }
1261            }
1262
1263            let prune_metrics = reader.metrics();
1264
1265            // Update per-file metrics if tracking is enabled
1266            if let Some(file_metrics_map) = per_file_metrics.as_mut() {
1267                let file_id = range.file_handle().file_id();
1268                let file_metrics = file_metrics_map
1269                    .entry(file_id)
1270                    .or_insert_with(FileScanMetrics::default);
1271
1272                file_metrics.num_ranges += 1;
1273                file_metrics.num_rows += prune_metrics.num_rows;
1274                file_metrics.build_reader_cost += build_cost;
1275                file_metrics.scan_cost += prune_metrics.scan_cost;
1276            }
1277
1278            reader_metrics.merge_from(&prune_metrics);
1279        }
1280
1281        // Reports metrics.
1282        reader_metrics.observe_rows(read_type);
1283        reader_metrics.filter_metrics.observe();
1284        part_metrics.merge_reader_metrics(reader_metrics, per_file_metrics.as_ref());
1285    }
1286}
1287
1288/// Build the stream of scanning the extension range denoted by the [`RowGroupIndex`].
1289#[cfg(feature = "enterprise")]
1290pub(crate) async fn scan_extension_range(
1291    context: Arc<StreamContext>,
1292    index: RowGroupIndex,
1293    partition_metrics: PartitionMetrics,
1294) -> Result<BoxedBatchStream> {
1295    use snafu::ResultExt;
1296
1297    let range = context.input.extension_range(index.index);
1298    let reader = range.reader(context.as_ref());
1299    let stream = reader
1300        .read(context, partition_metrics, index)
1301        .await
1302        .context(crate::error::ScanExternalRangeSnafu)?;
1303    Ok(stream)
1304}
1305
1306pub(crate) async fn maybe_scan_other_ranges(
1307    context: &Arc<StreamContext>,
1308    index: RowGroupIndex,
1309    metrics: &PartitionMetrics,
1310) -> Result<BoxedBatchStream> {
1311    #[cfg(feature = "enterprise")]
1312    {
1313        scan_extension_range(context.clone(), index, metrics.clone()).await
1314    }
1315
1316    #[cfg(not(feature = "enterprise"))]
1317    {
1318        let _ = context;
1319        let _ = index;
1320        let _ = metrics;
1321
1322        crate::error::UnexpectedSnafu {
1323            reason: "no other ranges scannable",
1324        }
1325        .fail()
1326    }
1327}
1328
1329pub(crate) async fn maybe_scan_flat_other_ranges(
1330    context: &Arc<StreamContext>,
1331    index: RowGroupIndex,
1332    metrics: &PartitionMetrics,
1333) -> Result<BoxedRecordBatchStream> {
1334    let _ = context;
1335    let _ = index;
1336    let _ = metrics;
1337
1338    crate::error::UnexpectedSnafu {
1339        reason: "no other ranges scannable in flat format",
1340    }
1341    .fail()
1342}
1343
1344/// A stream wrapper that splits record batches from an inner stream.
1345pub(crate) struct SplitRecordBatchStream<S> {
1346    /// The inner stream that yields record batches.
1347    inner: S,
1348    /// Buffer for split batches.
1349    batches: VecDeque<RecordBatch>,
1350}
1351
1352impl<S> SplitRecordBatchStream<S> {
1353    /// Creates a new splitting stream wrapper.
1354    pub(crate) fn new(inner: S) -> Self {
1355        Self {
1356            inner,
1357            batches: VecDeque::new(),
1358        }
1359    }
1360}
1361
1362impl<S> Stream for SplitRecordBatchStream<S>
1363where
1364    S: Stream<Item = Result<RecordBatch>> + Unpin,
1365{
1366    type Item = Result<RecordBatch>;
1367
1368    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1369        loop {
1370            // First, check if we have buffered split batches
1371            if let Some(batch) = self.batches.pop_front() {
1372                return Poll::Ready(Some(Ok(batch)));
1373            }
1374
1375            // Poll the inner stream for the next batch
1376            let record_batch = match futures::ready!(Pin::new(&mut self.inner).poll_next(cx)) {
1377                Some(Ok(batch)) => batch,
1378                Some(Err(e)) => return Poll::Ready(Some(Err(e))),
1379                None => return Poll::Ready(None),
1380            };
1381
1382            // Split the batch and buffer the results
1383            split_record_batch(record_batch, &mut self.batches);
1384            // Continue the loop to return the first split batch
1385        }
1386    }
1387}
1388
1389/// Splits the batch by timestamps.
1390///
1391/// # Panics
1392/// Panics if the timestamp array is invalid.
1393pub(crate) fn split_record_batch(record_batch: RecordBatch, batches: &mut VecDeque<RecordBatch>) {
1394    let batch_rows = record_batch.num_rows();
1395    if batch_rows == 0 {
1396        return;
1397    }
1398    if batch_rows < 2 {
1399        batches.push_back(record_batch);
1400        return;
1401    }
1402
1403    let time_index_pos = time_index_column_index(record_batch.num_columns());
1404    let timestamps = record_batch.column(time_index_pos);
1405    let (ts_values, _unit) = timestamp_array_to_primitive(timestamps).unwrap();
1406    let mut offsets = Vec::with_capacity(16);
1407    offsets.push(0);
1408    let values = ts_values.values();
1409    for (i, &value) in values.iter().take(batch_rows - 1).enumerate() {
1410        if value > values[i + 1] {
1411            offsets.push(i + 1);
1412        }
1413    }
1414    offsets.push(values.len());
1415
1416    // Splits the batch by offsets.
1417    for (i, &start) in offsets[..offsets.len() - 1].iter().enumerate() {
1418        let end = offsets[i + 1];
1419        let rows_in_batch = end - start;
1420        batches.push_back(record_batch.slice(start, rows_in_batch));
1421    }
1422}