mito2/read/
scan_util.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities for scanners.
16
17use std::collections::{BinaryHeap, HashMap, VecDeque};
18use std::fmt;
19use std::pin::Pin;
20use std::sync::{Arc, Mutex};
21use std::task::{Context, Poll};
22use std::time::{Duration, Instant};
23
24use async_stream::try_stream;
25use common_telemetry::tracing;
26use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time};
27use datatypes::arrow::record_batch::RecordBatch;
28use datatypes::timestamp::timestamp_array_to_primitive;
29use futures::Stream;
30use prometheus::IntGauge;
31use smallvec::SmallVec;
32use snafu::OptionExt;
33use store_api::storage::RegionId;
34
35use crate::error::{Result, UnexpectedSnafu};
36use crate::memtable::MemScanMetrics;
37use crate::metrics::{
38    IN_PROGRESS_SCAN, PRECISE_FILTER_ROWS_TOTAL, READ_BATCHES_RETURN, READ_ROW_GROUPS_TOTAL,
39    READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_RETURN, READ_STAGE_ELAPSED,
40};
41use crate::read::dedup::{DedupMetrics, DedupMetricsReport};
42use crate::read::merge::{MergeMetrics, MergeMetricsReport};
43use crate::read::pruner::PartitionPruner;
44use crate::read::range::{RangeMeta, RowGroupIndex};
45use crate::read::scan_region::StreamContext;
46use crate::read::{Batch, BoxedBatchStream, BoxedRecordBatchStream, ScannerMetrics, Source};
47use crate::sst::file::{FileTimeRange, RegionFileId};
48use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplyMetrics;
49use crate::sst::index::fulltext_index::applier::FulltextIndexApplyMetrics;
50use crate::sst::index::inverted_index::applier::InvertedIndexApplyMetrics;
51use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
52use crate::sst::parquet::file_range::FileRange;
53use crate::sst::parquet::flat_format::time_index_column_index;
54use crate::sst::parquet::reader::{MetadataCacheMetrics, ReaderFilterMetrics, ReaderMetrics};
55use crate::sst::parquet::row_group::ParquetFetchMetrics;
56
57/// Per-file scan metrics.
58#[derive(Default, Clone)]
59pub struct FileScanMetrics {
60    /// Number of ranges (row groups) read from this file.
61    pub num_ranges: usize,
62    /// Number of rows read from this file.
63    pub num_rows: usize,
64    /// Time spent building file ranges/parts (file-level preparation).
65    pub build_part_cost: Duration,
66    /// Time spent building readers for this file (accumulated across all ranges).
67    pub build_reader_cost: Duration,
68    /// Time spent scanning this file (accumulated across all ranges).
69    pub scan_cost: Duration,
70}
71
72impl fmt::Debug for FileScanMetrics {
73    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
74        write!(f, "{{\"build_part_cost\":\"{:?}\"", self.build_part_cost)?;
75
76        if self.num_ranges > 0 {
77            write!(f, ", \"num_ranges\":{}", self.num_ranges)?;
78        }
79        if self.num_rows > 0 {
80            write!(f, ", \"num_rows\":{}", self.num_rows)?;
81        }
82        if !self.build_reader_cost.is_zero() {
83            write!(
84                f,
85                ", \"build_reader_cost\":\"{:?}\"",
86                self.build_reader_cost
87            )?;
88        }
89        if !self.scan_cost.is_zero() {
90            write!(f, ", \"scan_cost\":\"{:?}\"", self.scan_cost)?;
91        }
92
93        write!(f, "}}")
94    }
95}
96
97impl FileScanMetrics {
98    /// Merges another FileMetrics into this one.
99    pub(crate) fn merge_from(&mut self, other: &FileScanMetrics) {
100        self.num_ranges += other.num_ranges;
101        self.num_rows += other.num_rows;
102        self.build_part_cost += other.build_part_cost;
103        self.build_reader_cost += other.build_reader_cost;
104        self.scan_cost += other.scan_cost;
105    }
106}
107
108/// Verbose scan metrics for a partition.
109#[derive(Default)]
110pub(crate) struct ScanMetricsSet {
111    /// Duration to prepare the scan task.
112    prepare_scan_cost: Duration,
113    /// Duration to build the (merge) reader.
114    build_reader_cost: Duration,
115    /// Duration to scan data.
116    scan_cost: Duration,
117    /// Duration while waiting for `yield`.
118    yield_cost: Duration,
119    /// Duration to convert [`Batch`]es.
120    convert_cost: Option<Time>,
121    /// Duration of the scan.
122    total_cost: Duration,
123    /// Number of rows returned.
124    num_rows: usize,
125    /// Number of batches returned.
126    num_batches: usize,
127    /// Number of mem ranges scanned.
128    num_mem_ranges: usize,
129    /// Number of file ranges scanned.
130    num_file_ranges: usize,
131
132    // Memtable related metrics:
133    /// Duration to scan memtables.
134    mem_scan_cost: Duration,
135    /// Number of rows read from memtables.
136    mem_rows: usize,
137    /// Number of batches read from memtables.
138    mem_batches: usize,
139    /// Number of series read from memtables.
140    mem_series: usize,
141
142    // SST related metrics:
143    /// Duration to build file ranges.
144    build_parts_cost: Duration,
145    /// Duration to scan SST files.
146    sst_scan_cost: Duration,
147    /// Number of row groups before filtering.
148    rg_total: usize,
149    /// Number of row groups filtered by fulltext index.
150    rg_fulltext_filtered: usize,
151    /// Number of row groups filtered by inverted index.
152    rg_inverted_filtered: usize,
153    /// Number of row groups filtered by min-max index.
154    rg_minmax_filtered: usize,
155    /// Number of row groups filtered by bloom filter index.
156    rg_bloom_filtered: usize,
157    /// Number of row groups filtered by vector index.
158    rg_vector_filtered: usize,
159    /// Number of rows in row group before filtering.
160    rows_before_filter: usize,
161    /// Number of rows in row group filtered by fulltext index.
162    rows_fulltext_filtered: usize,
163    /// Number of rows in row group filtered by inverted index.
164    rows_inverted_filtered: usize,
165    /// Number of rows in row group filtered by bloom filter index.
166    rows_bloom_filtered: usize,
167    /// Number of rows filtered by vector index.
168    rows_vector_filtered: usize,
169    /// Number of rows selected by vector index.
170    rows_vector_selected: usize,
171    /// Number of rows filtered by precise filter.
172    rows_precise_filtered: usize,
173    /// Number of index result cache hits for fulltext index.
174    fulltext_index_cache_hit: usize,
175    /// Number of index result cache misses for fulltext index.
176    fulltext_index_cache_miss: usize,
177    /// Number of index result cache hits for inverted index.
178    inverted_index_cache_hit: usize,
179    /// Number of index result cache misses for inverted index.
180    inverted_index_cache_miss: usize,
181    /// Number of index result cache hits for bloom filter index.
182    bloom_filter_cache_hit: usize,
183    /// Number of index result cache misses for bloom filter index.
184    bloom_filter_cache_miss: usize,
185    /// Number of pruner builder cache hits.
186    pruner_cache_hit: usize,
187    /// Number of pruner builder cache misses.
188    pruner_cache_miss: usize,
189    /// Duration spent waiting for pruner to build file ranges.
190    pruner_prune_cost: Duration,
191    /// Number of record batches read from SST.
192    num_sst_record_batches: usize,
193    /// Number of batches decoded from SST.
194    num_sst_batches: usize,
195    /// Number of rows read from SST.
196    num_sst_rows: usize,
197
198    /// Elapsed time before the first poll operation.
199    first_poll: Duration,
200
201    /// Number of send timeout in SeriesScan.
202    num_series_send_timeout: usize,
203    /// Number of send full in SeriesScan.
204    num_series_send_full: usize,
205    /// Number of rows the series distributor scanned.
206    num_distributor_rows: usize,
207    /// Number of batches the series distributor scanned.
208    num_distributor_batches: usize,
209    /// Duration of the series distributor to scan.
210    distributor_scan_cost: Duration,
211    /// Duration of the series distributor to yield.
212    distributor_yield_cost: Duration,
213    /// Duration spent in divider operations.
214    distributor_divider_cost: Duration,
215
216    /// Merge metrics.
217    merge_metrics: MergeMetrics,
218    /// Dedup metrics.
219    dedup_metrics: DedupMetrics,
220
221    /// The stream reached EOF
222    stream_eof: bool,
223
224    // Optional verbose metrics:
225    /// Inverted index apply metrics.
226    inverted_index_apply_metrics: Option<InvertedIndexApplyMetrics>,
227    /// Bloom filter index apply metrics.
228    bloom_filter_apply_metrics: Option<BloomFilterIndexApplyMetrics>,
229    /// Fulltext index apply metrics.
230    fulltext_index_apply_metrics: Option<FulltextIndexApplyMetrics>,
231    /// Parquet fetch metrics.
232    fetch_metrics: Option<ParquetFetchMetrics>,
233    /// Metadata cache metrics.
234    metadata_cache_metrics: Option<MetadataCacheMetrics>,
235    /// Per-file scan metrics, only populated when explain_verbose is true.
236    per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
237
238    /// Current memory usage for file range builders.
239    build_ranges_mem_size: isize,
240    /// Peak memory usage for file range builders.
241    build_ranges_peak_mem_size: isize,
242    /// Current number of file range builders.
243    num_range_builders: isize,
244    /// Peak number of file range builders.
245    num_peak_range_builders: isize,
246}
247
248/// Wrapper for file metrics that compares by total cost in reverse order.
249/// This allows using BinaryHeap as a min-heap for efficient top-K selection.
250struct CompareCostReverse<'a> {
251    total_cost: Duration,
252    file_id: RegionFileId,
253    metrics: &'a FileScanMetrics,
254}
255
256impl Ord for CompareCostReverse<'_> {
257    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
258        // Reverse comparison: smaller costs are "greater"
259        other.total_cost.cmp(&self.total_cost)
260    }
261}
262
263impl PartialOrd for CompareCostReverse<'_> {
264    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
265        Some(self.cmp(other))
266    }
267}
268
269impl Eq for CompareCostReverse<'_> {}
270
271impl PartialEq for CompareCostReverse<'_> {
272    fn eq(&self, other: &Self) -> bool {
273        self.total_cost == other.total_cost
274    }
275}
276
277impl fmt::Debug for ScanMetricsSet {
278    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
279        let ScanMetricsSet {
280            prepare_scan_cost,
281            build_reader_cost,
282            scan_cost,
283            yield_cost,
284            convert_cost,
285            total_cost,
286            num_rows,
287            num_batches,
288            num_mem_ranges,
289            num_file_ranges,
290            build_parts_cost,
291            sst_scan_cost,
292            rg_total,
293            rg_fulltext_filtered,
294            rg_inverted_filtered,
295            rg_minmax_filtered,
296            rg_bloom_filtered,
297            rg_vector_filtered,
298            rows_before_filter,
299            rows_fulltext_filtered,
300            rows_inverted_filtered,
301            rows_bloom_filtered,
302            rows_vector_filtered,
303            rows_vector_selected,
304            rows_precise_filtered,
305            fulltext_index_cache_hit,
306            fulltext_index_cache_miss,
307            inverted_index_cache_hit,
308            inverted_index_cache_miss,
309            bloom_filter_cache_hit,
310            bloom_filter_cache_miss,
311            pruner_cache_hit,
312            pruner_cache_miss,
313            pruner_prune_cost,
314            num_sst_record_batches,
315            num_sst_batches,
316            num_sst_rows,
317            first_poll,
318            num_series_send_timeout,
319            num_series_send_full,
320            num_distributor_rows,
321            num_distributor_batches,
322            distributor_scan_cost,
323            distributor_yield_cost,
324            distributor_divider_cost,
325            merge_metrics,
326            dedup_metrics,
327            stream_eof,
328            mem_scan_cost,
329            mem_rows,
330            mem_batches,
331            mem_series,
332            inverted_index_apply_metrics,
333            bloom_filter_apply_metrics,
334            fulltext_index_apply_metrics,
335            fetch_metrics,
336            metadata_cache_metrics,
337            per_file_metrics,
338            build_ranges_mem_size: _,
339            build_ranges_peak_mem_size,
340            num_range_builders: _,
341            num_peak_range_builders,
342        } = self;
343
344        // Write core metrics
345        write!(
346            f,
347            "{{\"prepare_scan_cost\":\"{prepare_scan_cost:?}\", \
348            \"build_reader_cost\":\"{build_reader_cost:?}\", \
349            \"scan_cost\":\"{scan_cost:?}\", \
350            \"yield_cost\":\"{yield_cost:?}\", \
351            \"total_cost\":\"{total_cost:?}\", \
352            \"num_rows\":{num_rows}, \
353            \"num_batches\":{num_batches}, \
354            \"num_mem_ranges\":{num_mem_ranges}, \
355            \"num_file_ranges\":{num_file_ranges}, \
356            \"build_parts_cost\":\"{build_parts_cost:?}\", \
357            \"sst_scan_cost\":\"{sst_scan_cost:?}\", \
358            \"rg_total\":{rg_total}, \
359            \"rows_before_filter\":{rows_before_filter}, \
360            \"num_sst_record_batches\":{num_sst_record_batches}, \
361            \"num_sst_batches\":{num_sst_batches}, \
362            \"num_sst_rows\":{num_sst_rows}, \
363            \"first_poll\":\"{first_poll:?}\""
364        )?;
365
366        // Write convert_cost if present
367        if let Some(time) = convert_cost {
368            let duration = Duration::from_nanos(time.value() as u64);
369            write!(f, ", \"convert_cost\":\"{duration:?}\"")?;
370        }
371
372        // Write non-zero filter counters
373        if *rg_fulltext_filtered > 0 {
374            write!(f, ", \"rg_fulltext_filtered\":{rg_fulltext_filtered}")?;
375        }
376        if *rg_inverted_filtered > 0 {
377            write!(f, ", \"rg_inverted_filtered\":{rg_inverted_filtered}")?;
378        }
379        if *rg_minmax_filtered > 0 {
380            write!(f, ", \"rg_minmax_filtered\":{rg_minmax_filtered}")?;
381        }
382        if *rg_bloom_filtered > 0 {
383            write!(f, ", \"rg_bloom_filtered\":{rg_bloom_filtered}")?;
384        }
385        if *rg_vector_filtered > 0 {
386            write!(f, ", \"rg_vector_filtered\":{rg_vector_filtered}")?;
387        }
388        if *rows_fulltext_filtered > 0 {
389            write!(f, ", \"rows_fulltext_filtered\":{rows_fulltext_filtered}")?;
390        }
391        if *rows_inverted_filtered > 0 {
392            write!(f, ", \"rows_inverted_filtered\":{rows_inverted_filtered}")?;
393        }
394        if *rows_bloom_filtered > 0 {
395            write!(f, ", \"rows_bloom_filtered\":{rows_bloom_filtered}")?;
396        }
397        if *rows_vector_filtered > 0 {
398            write!(f, ", \"rows_vector_filtered\":{rows_vector_filtered}")?;
399        }
400        if *rows_vector_selected > 0 {
401            write!(f, ", \"rows_vector_selected\":{rows_vector_selected}")?;
402        }
403        if *rows_precise_filtered > 0 {
404            write!(f, ", \"rows_precise_filtered\":{rows_precise_filtered}")?;
405        }
406        if *fulltext_index_cache_hit > 0 {
407            write!(
408                f,
409                ", \"fulltext_index_cache_hit\":{fulltext_index_cache_hit}"
410            )?;
411        }
412        if *fulltext_index_cache_miss > 0 {
413            write!(
414                f,
415                ", \"fulltext_index_cache_miss\":{fulltext_index_cache_miss}"
416            )?;
417        }
418        if *inverted_index_cache_hit > 0 {
419            write!(
420                f,
421                ", \"inverted_index_cache_hit\":{inverted_index_cache_hit}"
422            )?;
423        }
424        if *inverted_index_cache_miss > 0 {
425            write!(
426                f,
427                ", \"inverted_index_cache_miss\":{inverted_index_cache_miss}"
428            )?;
429        }
430        if *bloom_filter_cache_hit > 0 {
431            write!(f, ", \"bloom_filter_cache_hit\":{bloom_filter_cache_hit}")?;
432        }
433        if *bloom_filter_cache_miss > 0 {
434            write!(f, ", \"bloom_filter_cache_miss\":{bloom_filter_cache_miss}")?;
435        }
436        if *pruner_cache_hit > 0 {
437            write!(f, ", \"pruner_cache_hit\":{pruner_cache_hit}")?;
438        }
439        if *pruner_cache_miss > 0 {
440            write!(f, ", \"pruner_cache_miss\":{pruner_cache_miss}")?;
441        }
442        if !pruner_prune_cost.is_zero() {
443            write!(f, ", \"pruner_prune_cost\":\"{pruner_prune_cost:?}\"")?;
444        }
445
446        // Write non-zero distributor metrics
447        if *num_series_send_timeout > 0 {
448            write!(f, ", \"num_series_send_timeout\":{num_series_send_timeout}")?;
449        }
450        if *num_series_send_full > 0 {
451            write!(f, ", \"num_series_send_full\":{num_series_send_full}")?;
452        }
453        if *num_distributor_rows > 0 {
454            write!(f, ", \"num_distributor_rows\":{num_distributor_rows}")?;
455        }
456        if *num_distributor_batches > 0 {
457            write!(f, ", \"num_distributor_batches\":{num_distributor_batches}")?;
458        }
459        if !distributor_scan_cost.is_zero() {
460            write!(
461                f,
462                ", \"distributor_scan_cost\":\"{distributor_scan_cost:?}\""
463            )?;
464        }
465        if !distributor_yield_cost.is_zero() {
466            write!(
467                f,
468                ", \"distributor_yield_cost\":\"{distributor_yield_cost:?}\""
469            )?;
470        }
471        if !distributor_divider_cost.is_zero() {
472            write!(
473                f,
474                ", \"distributor_divider_cost\":\"{distributor_divider_cost:?}\""
475            )?;
476        }
477
478        // Write non-zero memtable metrics
479        if *mem_rows > 0 {
480            write!(f, ", \"mem_rows\":{mem_rows}")?;
481        }
482        if *mem_batches > 0 {
483            write!(f, ", \"mem_batches\":{mem_batches}")?;
484        }
485        if *mem_series > 0 {
486            write!(f, ", \"mem_series\":{mem_series}")?;
487        }
488        if !mem_scan_cost.is_zero() {
489            write!(f, ", \"mem_scan_cost\":\"{mem_scan_cost:?}\"")?;
490        }
491
492        // Write optional verbose metrics if they are not empty
493        if let Some(metrics) = inverted_index_apply_metrics
494            && !metrics.is_empty()
495        {
496            write!(f, ", \"inverted_index_apply_metrics\":{:?}", metrics)?;
497        }
498        if let Some(metrics) = bloom_filter_apply_metrics
499            && !metrics.is_empty()
500        {
501            write!(f, ", \"bloom_filter_apply_metrics\":{:?}", metrics)?;
502        }
503        if let Some(metrics) = fulltext_index_apply_metrics
504            && !metrics.is_empty()
505        {
506            write!(f, ", \"fulltext_index_apply_metrics\":{:?}", metrics)?;
507        }
508        if let Some(metrics) = fetch_metrics
509            && !metrics.is_empty()
510        {
511            write!(f, ", \"fetch_metrics\":{:?}", metrics)?;
512        }
513        if let Some(metrics) = metadata_cache_metrics
514            && !metrics.is_empty()
515        {
516            write!(f, ", \"metadata_cache_metrics\":{:?}", metrics)?;
517        }
518
519        // Write merge metrics if not empty
520        if !merge_metrics.scan_cost.is_zero() {
521            write!(f, ", \"merge_metrics\":{:?}", merge_metrics)?;
522        }
523
524        // Write dedup metrics if not empty
525        if !dedup_metrics.dedup_cost.is_zero() {
526            write!(f, ", \"dedup_metrics\":{:?}", dedup_metrics)?;
527        }
528
529        // Write top file metrics if present and non-empty
530        if let Some(file_metrics) = per_file_metrics
531            && !file_metrics.is_empty()
532        {
533            // Use min-heap (BinaryHeap with reverse comparison) to keep only top 10
534            let mut heap = BinaryHeap::new();
535            for (file_id, metrics) in file_metrics.iter() {
536                let total_cost =
537                    metrics.build_part_cost + metrics.build_reader_cost + metrics.scan_cost;
538
539                // If the file has been pruned by a pruner, the build part cost may be zero.
540                // If we didn't read any ranges from it, we don't output the file.
541                if total_cost.is_zero() && metrics.num_ranges == 0 {
542                    continue;
543                }
544
545                if heap.len() < 10 {
546                    // Haven't reached 10 yet, just push
547                    heap.push(CompareCostReverse {
548                        total_cost,
549                        file_id: *file_id,
550                        metrics,
551                    });
552                } else if let Some(min_entry) = heap.peek() {
553                    // If current cost is higher than the minimum in our top-10, replace it
554                    if total_cost > min_entry.total_cost {
555                        heap.pop();
556                        heap.push(CompareCostReverse {
557                            total_cost,
558                            file_id: *file_id,
559                            metrics,
560                        });
561                    }
562                }
563            }
564
565            let top_files = heap.into_sorted_vec();
566            write!(f, ", \"top_file_metrics\": {{")?;
567            for (i, item) in top_files.iter().enumerate() {
568                let CompareCostReverse {
569                    total_cost: _,
570                    file_id,
571                    metrics,
572                } = item;
573                if i > 0 {
574                    write!(f, ", ")?;
575                }
576                write!(f, "\"{}\": {:?}", file_id, metrics)?;
577            }
578            write!(f, "}}")?;
579        }
580
581        write!(
582            f,
583            ", \"build_ranges_peak_mem_size\":{build_ranges_peak_mem_size}, \
584             \"num_peak_range_builders\":{num_peak_range_builders}, \
585             \"stream_eof\":{stream_eof}}}"
586        )
587    }
588}
589impl ScanMetricsSet {
590    /// Attaches the `prepare_scan_cost` to the metrics set.
591    fn with_prepare_scan_cost(mut self, cost: Duration) -> Self {
592        self.prepare_scan_cost += cost;
593        self
594    }
595
596    /// Attaches the `convert_cost` to the metrics set.
597    fn with_convert_cost(mut self, time: Time) -> Self {
598        self.convert_cost = Some(time);
599        self
600    }
601
602    /// Merges the local scanner metrics.
603    fn merge_scanner_metrics(&mut self, other: &ScannerMetrics) {
604        let ScannerMetrics {
605            scan_cost,
606            yield_cost,
607            num_batches,
608            num_rows,
609        } = other;
610
611        self.scan_cost += *scan_cost;
612        self.yield_cost += *yield_cost;
613        self.num_rows += *num_rows;
614        self.num_batches += *num_batches;
615    }
616
617    /// Merges the local reader metrics.
618    fn merge_reader_metrics(&mut self, other: &ReaderMetrics) {
619        let ReaderMetrics {
620            build_cost,
621            filter_metrics:
622                ReaderFilterMetrics {
623                    rg_total,
624                    rg_fulltext_filtered,
625                    rg_inverted_filtered,
626                    rg_minmax_filtered,
627                    rg_bloom_filtered,
628                    rg_vector_filtered,
629                    rows_total,
630                    rows_fulltext_filtered,
631                    rows_inverted_filtered,
632                    rows_bloom_filtered,
633                    rows_vector_filtered,
634                    rows_vector_selected,
635                    rows_precise_filtered,
636                    fulltext_index_cache_hit,
637                    fulltext_index_cache_miss,
638                    inverted_index_cache_hit,
639                    inverted_index_cache_miss,
640                    bloom_filter_cache_hit,
641                    bloom_filter_cache_miss,
642                    pruner_cache_hit,
643                    pruner_cache_miss,
644                    pruner_prune_cost,
645                    inverted_index_apply_metrics,
646                    bloom_filter_apply_metrics,
647                    fulltext_index_apply_metrics,
648                },
649            num_record_batches,
650            num_batches,
651            num_rows,
652            scan_cost,
653            metadata_cache_metrics,
654            fetch_metrics,
655            metadata_mem_size,
656            num_range_builders,
657        } = other;
658
659        self.build_parts_cost += *build_cost;
660        self.sst_scan_cost += *scan_cost;
661
662        self.rg_total += *rg_total;
663        self.rg_fulltext_filtered += *rg_fulltext_filtered;
664        self.rg_inverted_filtered += *rg_inverted_filtered;
665        self.rg_minmax_filtered += *rg_minmax_filtered;
666        self.rg_bloom_filtered += *rg_bloom_filtered;
667        self.rg_vector_filtered += *rg_vector_filtered;
668
669        self.rows_before_filter += *rows_total;
670        self.rows_fulltext_filtered += *rows_fulltext_filtered;
671        self.rows_inverted_filtered += *rows_inverted_filtered;
672        self.rows_bloom_filtered += *rows_bloom_filtered;
673        self.rows_vector_filtered += *rows_vector_filtered;
674        self.rows_vector_selected += *rows_vector_selected;
675        self.rows_precise_filtered += *rows_precise_filtered;
676
677        self.fulltext_index_cache_hit += *fulltext_index_cache_hit;
678        self.fulltext_index_cache_miss += *fulltext_index_cache_miss;
679        self.inverted_index_cache_hit += *inverted_index_cache_hit;
680        self.inverted_index_cache_miss += *inverted_index_cache_miss;
681        self.bloom_filter_cache_hit += *bloom_filter_cache_hit;
682        self.bloom_filter_cache_miss += *bloom_filter_cache_miss;
683        self.pruner_cache_hit += *pruner_cache_hit;
684        self.pruner_cache_miss += *pruner_cache_miss;
685        self.pruner_prune_cost += *pruner_prune_cost;
686
687        self.num_sst_record_batches += *num_record_batches;
688        self.num_sst_batches += *num_batches;
689        self.num_sst_rows += *num_rows;
690
691        // Merge optional verbose metrics
692        if let Some(metrics) = inverted_index_apply_metrics {
693            self.inverted_index_apply_metrics
694                .get_or_insert_with(InvertedIndexApplyMetrics::default)
695                .merge_from(metrics);
696        }
697        if let Some(metrics) = bloom_filter_apply_metrics {
698            self.bloom_filter_apply_metrics
699                .get_or_insert_with(BloomFilterIndexApplyMetrics::default)
700                .merge_from(metrics);
701        }
702        if let Some(metrics) = fulltext_index_apply_metrics {
703            self.fulltext_index_apply_metrics
704                .get_or_insert_with(FulltextIndexApplyMetrics::default)
705                .merge_from(metrics);
706        }
707        if let Some(metrics) = fetch_metrics {
708            self.fetch_metrics
709                .get_or_insert_with(ParquetFetchMetrics::default)
710                .merge_from(metrics);
711        }
712        self.metadata_cache_metrics
713            .get_or_insert_with(MetadataCacheMetrics::default)
714            .merge_from(metadata_cache_metrics);
715
716        // Track memory usage and update peak.
717        self.build_ranges_mem_size += *metadata_mem_size;
718        if self.build_ranges_mem_size > self.build_ranges_peak_mem_size {
719            self.build_ranges_peak_mem_size = self.build_ranges_mem_size;
720        }
721
722        // Track number of builders and update peak.
723        self.num_range_builders += *num_range_builders;
724        if self.num_range_builders > self.num_peak_range_builders {
725            self.num_peak_range_builders = self.num_range_builders;
726        }
727    }
728
729    /// Merges per-file metrics.
730    fn merge_per_file_metrics(&mut self, other: &HashMap<RegionFileId, FileScanMetrics>) {
731        let self_file_metrics = self.per_file_metrics.get_or_insert_with(HashMap::new);
732        for (file_id, metrics) in other {
733            self_file_metrics
734                .entry(*file_id)
735                .or_default()
736                .merge_from(metrics);
737        }
738    }
739
740    /// Sets distributor metrics.
741    fn set_distributor_metrics(&mut self, distributor_metrics: &SeriesDistributorMetrics) {
742        let SeriesDistributorMetrics {
743            num_series_send_timeout,
744            num_series_send_full,
745            num_rows,
746            num_batches,
747            scan_cost,
748            yield_cost,
749            divider_cost,
750        } = distributor_metrics;
751
752        self.num_series_send_timeout += *num_series_send_timeout;
753        self.num_series_send_full += *num_series_send_full;
754        self.num_distributor_rows += *num_rows;
755        self.num_distributor_batches += *num_batches;
756        self.distributor_scan_cost += *scan_cost;
757        self.distributor_yield_cost += *yield_cost;
758        self.distributor_divider_cost += *divider_cost;
759    }
760
761    /// Observes metrics.
762    fn observe_metrics(&self) {
763        READ_STAGE_ELAPSED
764            .with_label_values(&["prepare_scan"])
765            .observe(self.prepare_scan_cost.as_secs_f64());
766        READ_STAGE_ELAPSED
767            .with_label_values(&["build_reader"])
768            .observe(self.build_reader_cost.as_secs_f64());
769        READ_STAGE_ELAPSED
770            .with_label_values(&["scan"])
771            .observe(self.scan_cost.as_secs_f64());
772        READ_STAGE_ELAPSED
773            .with_label_values(&["yield"])
774            .observe(self.yield_cost.as_secs_f64());
775        if let Some(time) = &self.convert_cost {
776            READ_STAGE_ELAPSED
777                .with_label_values(&["convert"])
778                .observe(Duration::from_nanos(time.value() as u64).as_secs_f64());
779        }
780        READ_STAGE_ELAPSED
781            .with_label_values(&["total"])
782            .observe(self.total_cost.as_secs_f64());
783        READ_ROWS_RETURN.observe(self.num_rows as f64);
784        READ_BATCHES_RETURN.observe(self.num_batches as f64);
785
786        READ_STAGE_ELAPSED
787            .with_label_values(&["build_parts"])
788            .observe(self.build_parts_cost.as_secs_f64());
789
790        READ_ROW_GROUPS_TOTAL
791            .with_label_values(&["before_filtering"])
792            .inc_by(self.rg_total as u64);
793        READ_ROW_GROUPS_TOTAL
794            .with_label_values(&["fulltext_index_filtered"])
795            .inc_by(self.rg_fulltext_filtered as u64);
796        READ_ROW_GROUPS_TOTAL
797            .with_label_values(&["inverted_index_filtered"])
798            .inc_by(self.rg_inverted_filtered as u64);
799        READ_ROW_GROUPS_TOTAL
800            .with_label_values(&["minmax_index_filtered"])
801            .inc_by(self.rg_minmax_filtered as u64);
802        READ_ROW_GROUPS_TOTAL
803            .with_label_values(&["bloom_filter_index_filtered"])
804            .inc_by(self.rg_bloom_filtered as u64);
805        #[cfg(feature = "vector_index")]
806        READ_ROW_GROUPS_TOTAL
807            .with_label_values(&["vector_index_filtered"])
808            .inc_by(self.rg_vector_filtered as u64);
809
810        PRECISE_FILTER_ROWS_TOTAL
811            .with_label_values(&["parquet"])
812            .inc_by(self.rows_precise_filtered as u64);
813        READ_ROWS_IN_ROW_GROUP_TOTAL
814            .with_label_values(&["before_filtering"])
815            .inc_by(self.rows_before_filter as u64);
816        READ_ROWS_IN_ROW_GROUP_TOTAL
817            .with_label_values(&["fulltext_index_filtered"])
818            .inc_by(self.rows_fulltext_filtered as u64);
819        READ_ROWS_IN_ROW_GROUP_TOTAL
820            .with_label_values(&["inverted_index_filtered"])
821            .inc_by(self.rows_inverted_filtered as u64);
822        READ_ROWS_IN_ROW_GROUP_TOTAL
823            .with_label_values(&["bloom_filter_index_filtered"])
824            .inc_by(self.rows_bloom_filtered as u64);
825        #[cfg(feature = "vector_index")]
826        READ_ROWS_IN_ROW_GROUP_TOTAL
827            .with_label_values(&["vector_index_filtered"])
828            .inc_by(self.rows_vector_filtered as u64);
829    }
830}
831
832struct PartitionMetricsInner {
833    region_id: RegionId,
834    /// Index of the partition to scan.
835    partition: usize,
836    /// Label to distinguish different scan operation.
837    scanner_type: &'static str,
838    /// Query start time.
839    query_start: Instant,
840    /// Whether to use verbose logging.
841    explain_verbose: bool,
842    /// Verbose scan metrics that only log to debug logs by default.
843    metrics: Mutex<ScanMetricsSet>,
844    in_progress_scan: IntGauge,
845
846    // Normal metrics that always report to the [ExecutionPlanMetricsSet]:
847    /// Duration to build file ranges.
848    build_parts_cost: Time,
849    /// Duration to build the (merge) reader.
850    build_reader_cost: Time,
851    /// Duration to scan data.
852    scan_cost: Time,
853    /// Duration while waiting for `yield`.
854    yield_cost: Time,
855    /// Duration to convert [`Batch`]es.
856    convert_cost: Time,
857    /// Aggregated compute time reported to DataFusion.
858    elapsed_compute: Time,
859}
860
861impl PartitionMetricsInner {
862    fn on_finish(&self, stream_eof: bool) {
863        let mut metrics = self.metrics.lock().unwrap();
864        if metrics.total_cost.is_zero() {
865            metrics.total_cost = self.query_start.elapsed();
866        }
867        if !metrics.stream_eof {
868            metrics.stream_eof = stream_eof;
869        }
870    }
871}
872
873impl MergeMetricsReport for PartitionMetricsInner {
874    fn report(&self, metrics: &mut MergeMetrics) {
875        let mut scan_metrics = self.metrics.lock().unwrap();
876        // Merge the metrics into scan_metrics
877        scan_metrics.merge_metrics.merge(metrics);
878
879        // Reset the input metrics
880        *metrics = MergeMetrics::default();
881    }
882}
883
884impl DedupMetricsReport for PartitionMetricsInner {
885    fn report(&self, metrics: &mut DedupMetrics) {
886        let mut scan_metrics = self.metrics.lock().unwrap();
887        // Merge the metrics into scan_metrics
888        scan_metrics.dedup_metrics.merge(metrics);
889
890        // Reset the input metrics
891        *metrics = DedupMetrics::default();
892    }
893}
894
895impl Drop for PartitionMetricsInner {
896    fn drop(&mut self) {
897        self.on_finish(false);
898        let metrics = self.metrics.lock().unwrap();
899        metrics.observe_metrics();
900        self.in_progress_scan.dec();
901
902        if self.explain_verbose {
903            common_telemetry::info!(
904                "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}",
905                self.scanner_type,
906                self.region_id,
907                self.partition,
908                metrics,
909            );
910        } else {
911            common_telemetry::debug!(
912                "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}",
913                self.scanner_type,
914                self.region_id,
915                self.partition,
916                metrics,
917            );
918        }
919    }
920}
921
922/// List of PartitionMetrics.
923#[derive(Default)]
924pub(crate) struct PartitionMetricsList(Mutex<Vec<Option<PartitionMetrics>>>);
925
926impl PartitionMetricsList {
927    /// Sets a new [PartitionMetrics] at the specified partition.
928    pub(crate) fn set(&self, partition: usize, metrics: PartitionMetrics) {
929        let mut list = self.0.lock().unwrap();
930        if list.len() <= partition {
931            list.resize(partition + 1, None);
932        }
933        list[partition] = Some(metrics);
934    }
935
936    /// Format verbose metrics for each partition for explain.
937    pub(crate) fn format_verbose_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result {
938        let list = self.0.lock().unwrap();
939        write!(f, ", \"metrics_per_partition\": ")?;
940        f.debug_list()
941            .entries(list.iter().filter_map(|p| p.as_ref()))
942            .finish()?;
943        write!(f, "}}")
944    }
945}
946
947/// Metrics while reading a partition.
948#[derive(Clone)]
949pub struct PartitionMetrics(Arc<PartitionMetricsInner>);
950
951impl PartitionMetrics {
952    pub(crate) fn new(
953        region_id: RegionId,
954        partition: usize,
955        scanner_type: &'static str,
956        query_start: Instant,
957        explain_verbose: bool,
958        metrics_set: &ExecutionPlanMetricsSet,
959    ) -> Self {
960        let partition_str = partition.to_string();
961        let in_progress_scan = IN_PROGRESS_SCAN.with_label_values(&[scanner_type, &partition_str]);
962        in_progress_scan.inc();
963        let convert_cost = MetricBuilder::new(metrics_set).subset_time("convert_cost", partition);
964        let metrics = ScanMetricsSet::default()
965            .with_prepare_scan_cost(query_start.elapsed())
966            .with_convert_cost(convert_cost.clone());
967        let inner = PartitionMetricsInner {
968            region_id,
969            partition,
970            scanner_type,
971            query_start,
972            explain_verbose,
973            metrics: Mutex::new(metrics),
974            in_progress_scan,
975            build_parts_cost: MetricBuilder::new(metrics_set)
976                .subset_time("build_parts_cost", partition),
977            build_reader_cost: MetricBuilder::new(metrics_set)
978                .subset_time("build_reader_cost", partition),
979            scan_cost: MetricBuilder::new(metrics_set).subset_time("scan_cost", partition),
980            yield_cost: MetricBuilder::new(metrics_set).subset_time("yield_cost", partition),
981            convert_cost,
982            elapsed_compute: MetricBuilder::new(metrics_set).elapsed_compute(partition),
983        };
984        Self(Arc::new(inner))
985    }
986
987    pub(crate) fn on_first_poll(&self) {
988        let mut metrics = self.0.metrics.lock().unwrap();
989        metrics.first_poll = self.0.query_start.elapsed();
990    }
991
992    pub(crate) fn inc_num_mem_ranges(&self, num: usize) {
993        let mut metrics = self.0.metrics.lock().unwrap();
994        metrics.num_mem_ranges += num;
995    }
996
997    pub fn inc_num_file_ranges(&self, num: usize) {
998        let mut metrics = self.0.metrics.lock().unwrap();
999        metrics.num_file_ranges += num;
1000    }
1001
1002    fn record_elapsed_compute(&self, duration: Duration) {
1003        if duration.is_zero() {
1004            return;
1005        }
1006        self.0.elapsed_compute.add_duration(duration);
1007    }
1008
1009    /// Merges `build_reader_cost`.
1010    pub(crate) fn inc_build_reader_cost(&self, cost: Duration) {
1011        self.0.build_reader_cost.add_duration(cost);
1012
1013        let mut metrics = self.0.metrics.lock().unwrap();
1014        metrics.build_reader_cost += cost;
1015    }
1016
1017    pub(crate) fn inc_convert_batch_cost(&self, cost: Duration) {
1018        self.0.convert_cost.add_duration(cost);
1019        self.record_elapsed_compute(cost);
1020    }
1021
1022    /// Reports memtable scan metrics.
1023    pub(crate) fn report_mem_scan_metrics(&self, data: &crate::memtable::MemScanMetricsData) {
1024        let mut metrics = self.0.metrics.lock().unwrap();
1025        metrics.mem_scan_cost += data.scan_cost;
1026        metrics.mem_rows += data.num_rows;
1027        metrics.mem_batches += data.num_batches;
1028        metrics.mem_series += data.total_series;
1029    }
1030
1031    /// Merges [ScannerMetrics], `build_reader_cost`, `scan_cost` and `yield_cost`.
1032    pub(crate) fn merge_metrics(&self, metrics: &ScannerMetrics) {
1033        self.0.scan_cost.add_duration(metrics.scan_cost);
1034        self.record_elapsed_compute(metrics.scan_cost);
1035        self.0.yield_cost.add_duration(metrics.yield_cost);
1036        self.record_elapsed_compute(metrics.yield_cost);
1037
1038        let mut metrics_set = self.0.metrics.lock().unwrap();
1039        metrics_set.merge_scanner_metrics(metrics);
1040    }
1041
1042    /// Merges [ReaderMetrics] and `build_reader_cost`.
1043    pub fn merge_reader_metrics(
1044        &self,
1045        metrics: &ReaderMetrics,
1046        per_file_metrics: Option<&HashMap<RegionFileId, FileScanMetrics>>,
1047    ) {
1048        self.0.build_parts_cost.add_duration(metrics.build_cost);
1049
1050        let mut metrics_set = self.0.metrics.lock().unwrap();
1051        metrics_set.merge_reader_metrics(metrics);
1052
1053        // Merge per-file metrics if provided
1054        if let Some(file_metrics) = per_file_metrics {
1055            metrics_set.merge_per_file_metrics(file_metrics);
1056        }
1057    }
1058
1059    /// Finishes the query.
1060    pub(crate) fn on_finish(&self) {
1061        self.0.on_finish(true);
1062    }
1063
1064    /// Sets the distributor metrics.
1065    pub(crate) fn set_distributor_metrics(&self, metrics: &SeriesDistributorMetrics) {
1066        let mut metrics_set = self.0.metrics.lock().unwrap();
1067        metrics_set.set_distributor_metrics(metrics);
1068    }
1069
1070    /// Returns whether verbose explain is enabled.
1071    pub(crate) fn explain_verbose(&self) -> bool {
1072        self.0.explain_verbose
1073    }
1074
1075    /// Returns a MergeMetricsReport trait object for reporting merge metrics.
1076    pub(crate) fn merge_metrics_reporter(&self) -> Arc<dyn MergeMetricsReport> {
1077        self.0.clone()
1078    }
1079
1080    /// Returns a DedupMetricsReport trait object for reporting dedup metrics.
1081    pub(crate) fn dedup_metrics_reporter(&self) -> Arc<dyn DedupMetricsReport> {
1082        self.0.clone()
1083    }
1084}
1085
1086impl fmt::Debug for PartitionMetrics {
1087    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1088        let metrics = self.0.metrics.lock().unwrap();
1089        write!(
1090            f,
1091            r#"{{"partition":{}, "metrics":{:?}}}"#,
1092            self.0.partition, metrics
1093        )
1094    }
1095}
1096
1097/// Metrics for the series distributor.
1098#[derive(Default)]
1099pub(crate) struct SeriesDistributorMetrics {
1100    /// Number of send timeout in SeriesScan.
1101    pub(crate) num_series_send_timeout: usize,
1102    /// Number of send full in SeriesScan.
1103    pub(crate) num_series_send_full: usize,
1104    /// Number of rows the series distributor scanned.
1105    pub(crate) num_rows: usize,
1106    /// Number of batches the series distributor scanned.
1107    pub(crate) num_batches: usize,
1108    /// Duration of the series distributor to scan.
1109    pub(crate) scan_cost: Duration,
1110    /// Duration of the series distributor to yield.
1111    pub(crate) yield_cost: Duration,
1112    /// Duration spent in divider operations.
1113    pub(crate) divider_cost: Duration,
1114}
1115
1116/// Scans memtable ranges at `index`.
1117#[tracing::instrument(
1118    skip_all,
1119    fields(
1120        region_id = %stream_ctx.input.region_metadata().region_id,
1121        file_or_mem_index = %index.index,
1122        row_group_index = %index.row_group_index,
1123        source = "mem"
1124    )
1125)]
1126pub(crate) fn scan_mem_ranges(
1127    stream_ctx: Arc<StreamContext>,
1128    part_metrics: PartitionMetrics,
1129    index: RowGroupIndex,
1130    time_range: FileTimeRange,
1131) -> impl Stream<Item = Result<Batch>> {
1132    try_stream! {
1133        let ranges = stream_ctx.input.build_mem_ranges(index);
1134        part_metrics.inc_num_mem_ranges(ranges.len());
1135        for range in ranges {
1136            let build_reader_start = Instant::now();
1137            let mem_scan_metrics = Some(MemScanMetrics::default());
1138            let iter = range.build_prune_iter(time_range, mem_scan_metrics.clone())?;
1139            part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
1140
1141            let mut source = Source::Iter(iter);
1142            while let Some(batch) = source.next_batch().await? {
1143                yield batch;
1144            }
1145
1146            // Report the memtable scan metrics to partition metrics
1147            if let Some(ref metrics) = mem_scan_metrics {
1148                let data = metrics.data();
1149                part_metrics.report_mem_scan_metrics(&data);
1150            }
1151        }
1152    }
1153}
1154
1155/// Scans memtable ranges at `index` using flat format that returns RecordBatch.
1156#[tracing::instrument(
1157    skip_all,
1158    fields(
1159        region_id = %stream_ctx.input.region_metadata().region_id,
1160        row_group_index = %index.index,
1161        source = "mem_flat"
1162    )
1163)]
1164pub(crate) fn scan_flat_mem_ranges(
1165    stream_ctx: Arc<StreamContext>,
1166    part_metrics: PartitionMetrics,
1167    index: RowGroupIndex,
1168) -> impl Stream<Item = Result<RecordBatch>> {
1169    try_stream! {
1170        let ranges = stream_ctx.input.build_mem_ranges(index);
1171        part_metrics.inc_num_mem_ranges(ranges.len());
1172        for range in ranges {
1173            let build_reader_start = Instant::now();
1174            let mem_scan_metrics = Some(MemScanMetrics::default());
1175            let mut iter = range.build_record_batch_iter(mem_scan_metrics.clone())?;
1176            part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
1177
1178            while let Some(record_batch) = iter.next().transpose()? {
1179                yield record_batch;
1180            }
1181
1182            // Report the memtable scan metrics to partition metrics
1183            if let Some(ref metrics) = mem_scan_metrics {
1184                let data = metrics.data();
1185                part_metrics.report_mem_scan_metrics(&data);
1186            }
1187        }
1188    }
1189}
1190
1191/// Files with row count greater than this threshold can contribute to the estimation.
1192const SPLIT_ROW_THRESHOLD: u64 = DEFAULT_ROW_GROUP_SIZE as u64;
1193/// Number of series threshold for splitting batches.
1194const NUM_SERIES_THRESHOLD: u64 = 10240;
1195/// Minimum batch size after splitting. The batch size is less than 60 because a series may only have
1196/// 60 samples per hour.
1197const BATCH_SIZE_THRESHOLD: u64 = 50;
1198
1199/// Returns true if splitting flat record batches may improve merge performance.
1200pub(crate) fn should_split_flat_batches_for_merge(
1201    stream_ctx: &Arc<StreamContext>,
1202    range_meta: &RangeMeta,
1203) -> bool {
1204    // Number of files to split and scan.
1205    let mut num_files_to_split = 0;
1206    let mut num_mem_rows = 0;
1207    let mut num_mem_series = 0;
1208    // Checks each file range, returns early if any range is not splittable.
1209    // For mem ranges, we collect the total number of rows and series because the number of rows in a
1210    // mem range may be too small.
1211    for index in &range_meta.row_group_indices {
1212        if stream_ctx.is_mem_range_index(*index) {
1213            let memtable = &stream_ctx.input.memtables[index.index];
1214            // Is mem range
1215            let stats = memtable.stats();
1216            num_mem_rows += stats.num_rows();
1217            num_mem_series += stats.series_count();
1218        } else if stream_ctx.is_file_range_index(*index) {
1219            // This is a file range.
1220            let file_index = index.index - stream_ctx.input.num_memtables();
1221            let file = &stream_ctx.input.files[file_index];
1222            if file.meta_ref().num_rows < SPLIT_ROW_THRESHOLD || file.meta_ref().num_series == 0 {
1223                // If the file doesn't have enough rows, or the number of series is unavailable, skips it.
1224                continue;
1225            }
1226            debug_assert!(file.meta_ref().num_rows > 0);
1227            if !can_split_series(file.meta_ref().num_rows, file.meta_ref().num_series) {
1228                // We can't split batches in a file.
1229                return false;
1230            } else {
1231                num_files_to_split += 1;
1232            }
1233        }
1234        // Skips non-file and non-mem ranges.
1235    }
1236
1237    if num_files_to_split > 0 {
1238        // We mainly consider file ranges because they have enough data for sampling.
1239        true
1240    } else if num_mem_series > 0 && num_mem_rows > 0 {
1241        // If we don't have files to scan, we check whether to split by the memtable.
1242        can_split_series(num_mem_rows as u64, num_mem_series as u64)
1243    } else {
1244        false
1245    }
1246}
1247
1248fn can_split_series(num_rows: u64, num_series: u64) -> bool {
1249    assert!(num_series > 0);
1250    assert!(num_rows > 0);
1251
1252    // It doesn't have too many series or it will have enough rows for each batch.
1253    num_series < NUM_SERIES_THRESHOLD || num_rows / num_series >= BATCH_SIZE_THRESHOLD
1254}
1255
1256/// Creates a new [ReaderFilterMetrics] with optional apply metrics initialized
1257/// based on the `explain_verbose` flag.
1258fn new_filter_metrics(explain_verbose: bool) -> ReaderFilterMetrics {
1259    if explain_verbose {
1260        ReaderFilterMetrics {
1261            inverted_index_apply_metrics: Some(InvertedIndexApplyMetrics::default()),
1262            bloom_filter_apply_metrics: Some(BloomFilterIndexApplyMetrics::default()),
1263            fulltext_index_apply_metrics: Some(FulltextIndexApplyMetrics::default()),
1264            ..Default::default()
1265        }
1266    } else {
1267        ReaderFilterMetrics::default()
1268    }
1269}
1270
1271/// Scans file ranges at `index`.
1272#[tracing::instrument(
1273    skip_all,
1274    fields(
1275        region_id = %stream_ctx.input.region_metadata().region_id,
1276        row_group_index = %index.index,
1277        source = read_type
1278    )
1279)]
1280pub(crate) async fn scan_file_ranges(
1281    stream_ctx: Arc<StreamContext>,
1282    part_metrics: PartitionMetrics,
1283    index: RowGroupIndex,
1284    read_type: &'static str,
1285    partition_pruner: Arc<PartitionPruner>,
1286) -> Result<impl Stream<Item = Result<Batch>>> {
1287    let mut reader_metrics = ReaderMetrics {
1288        filter_metrics: new_filter_metrics(part_metrics.explain_verbose()),
1289        ..Default::default()
1290    };
1291    let ranges = partition_pruner
1292        .build_file_ranges(index, &part_metrics, &mut reader_metrics)
1293        .await?;
1294    part_metrics.inc_num_file_ranges(ranges.len());
1295    part_metrics.merge_reader_metrics(&reader_metrics, None);
1296
1297    // Creates initial per-file metrics with build_part_cost.
1298    let init_per_file_metrics = if part_metrics.explain_verbose() {
1299        let file = stream_ctx.input.file_from_index(index);
1300        let file_id = file.file_id();
1301
1302        let mut map = HashMap::new();
1303        map.insert(
1304            file_id,
1305            FileScanMetrics {
1306                build_part_cost: reader_metrics.build_cost,
1307                ..Default::default()
1308            },
1309        );
1310        Some(map)
1311    } else {
1312        None
1313    };
1314
1315    Ok(build_file_range_scan_stream(
1316        stream_ctx,
1317        part_metrics,
1318        read_type,
1319        ranges,
1320        init_per_file_metrics,
1321    ))
1322}
1323
1324/// Scans file ranges at `index` using flat reader that returns RecordBatch.
1325#[tracing::instrument(
1326    skip_all,
1327    fields(
1328        region_id = %stream_ctx.input.region_metadata().region_id,
1329        row_group_index = %index.index,
1330        source = read_type
1331    )
1332)]
1333pub(crate) async fn scan_flat_file_ranges(
1334    stream_ctx: Arc<StreamContext>,
1335    part_metrics: PartitionMetrics,
1336    index: RowGroupIndex,
1337    read_type: &'static str,
1338    partition_pruner: Arc<PartitionPruner>,
1339) -> Result<impl Stream<Item = Result<RecordBatch>>> {
1340    let mut reader_metrics = ReaderMetrics {
1341        filter_metrics: new_filter_metrics(part_metrics.explain_verbose()),
1342        ..Default::default()
1343    };
1344    let ranges = partition_pruner
1345        .build_file_ranges(index, &part_metrics, &mut reader_metrics)
1346        .await?;
1347    part_metrics.inc_num_file_ranges(ranges.len());
1348    part_metrics.merge_reader_metrics(&reader_metrics, None);
1349
1350    // Creates initial per-file metrics with build_part_cost.
1351    let init_per_file_metrics = if part_metrics.explain_verbose() {
1352        let file = stream_ctx.input.file_from_index(index);
1353        let file_id = file.file_id();
1354
1355        let mut map = HashMap::new();
1356        map.insert(
1357            file_id,
1358            FileScanMetrics {
1359                build_part_cost: reader_metrics.build_cost,
1360                ..Default::default()
1361            },
1362        );
1363        Some(map)
1364    } else {
1365        None
1366    };
1367
1368    Ok(build_flat_file_range_scan_stream(
1369        stream_ctx,
1370        part_metrics,
1371        read_type,
1372        ranges,
1373        init_per_file_metrics,
1374    ))
1375}
1376
1377/// Build the stream of scanning the input [`FileRange`]s.
1378#[tracing::instrument(
1379    skip_all,
1380    fields(read_type = read_type, range_count = ranges.len())
1381)]
1382pub fn build_file_range_scan_stream(
1383    stream_ctx: Arc<StreamContext>,
1384    part_metrics: PartitionMetrics,
1385    read_type: &'static str,
1386    ranges: SmallVec<[FileRange; 2]>,
1387    mut per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
1388) -> impl Stream<Item = Result<Batch>> {
1389    try_stream! {
1390        let fetch_metrics = if part_metrics.explain_verbose() {
1391            Some(Arc::new(ParquetFetchMetrics::default()))
1392        } else {
1393            None
1394        };
1395        let reader_metrics = &mut ReaderMetrics {
1396            fetch_metrics: fetch_metrics.clone(),
1397            ..Default::default()
1398        };
1399        for range in ranges {
1400            let build_reader_start = Instant::now();
1401            let Some(reader) = range.reader(stream_ctx.input.series_row_selector, fetch_metrics.as_deref()).await? else {
1402                continue;
1403            };
1404            let build_cost = build_reader_start.elapsed();
1405            part_metrics.inc_build_reader_cost(build_cost);
1406            let compat_batch = range.compat_batch();
1407            let mut source = Source::PruneReader(reader);
1408            while let Some(mut batch) = source.next_batch().await? {
1409                if let Some(compact_batch) = compat_batch {
1410                    batch = compact_batch.as_primary_key().unwrap().compat_batch(batch)?;
1411                }
1412                yield batch;
1413            }
1414            if let Source::PruneReader(reader) = source {
1415                let prune_metrics = reader.metrics();
1416
1417                // Update per-file metrics if tracking is enabled
1418                if let Some(file_metrics_map) = per_file_metrics.as_mut() {
1419                    let file_id = range.file_handle().file_id();
1420                    let file_metrics = file_metrics_map
1421                        .entry(file_id)
1422                        .or_insert_with(FileScanMetrics::default);
1423
1424                    file_metrics.num_ranges += 1;
1425                    file_metrics.num_rows += prune_metrics.num_rows;
1426                    file_metrics.build_reader_cost += build_cost;
1427                    file_metrics.scan_cost += prune_metrics.scan_cost;
1428                }
1429
1430                reader_metrics.merge_from(&prune_metrics);
1431            }
1432        }
1433
1434        // Reports metrics.
1435        reader_metrics.observe_rows(read_type);
1436        reader_metrics.filter_metrics.observe();
1437        part_metrics.merge_reader_metrics(reader_metrics, per_file_metrics.as_ref());
1438    }
1439}
1440
1441/// Build the stream of scanning the input [`FileRange`]s using flat reader that returns RecordBatch.
1442#[tracing::instrument(
1443    skip_all,
1444    fields(read_type = read_type, range_count = ranges.len())
1445)]
1446pub fn build_flat_file_range_scan_stream(
1447    _stream_ctx: Arc<StreamContext>,
1448    part_metrics: PartitionMetrics,
1449    read_type: &'static str,
1450    ranges: SmallVec<[FileRange; 2]>,
1451    mut per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
1452) -> impl Stream<Item = Result<RecordBatch>> {
1453    try_stream! {
1454        let fetch_metrics = if part_metrics.explain_verbose() {
1455            Some(Arc::new(ParquetFetchMetrics::default()))
1456        } else {
1457            None
1458        };
1459        let reader_metrics = &mut ReaderMetrics {
1460            fetch_metrics: fetch_metrics.clone(),
1461            ..Default::default()
1462        };
1463        for range in ranges {
1464            let build_reader_start = Instant::now();
1465            let Some(mut reader) = range.flat_reader(fetch_metrics.as_deref()).await? else{continue};
1466            let build_cost = build_reader_start.elapsed();
1467            part_metrics.inc_build_reader_cost(build_cost);
1468
1469            let may_compat = range
1470                .compat_batch()
1471                .map(|compat| {
1472                    compat.as_flat().context(UnexpectedSnafu {
1473                        reason: "Invalid compat for flat format",
1474                    })
1475                })
1476                .transpose()?;
1477
1478            let mapper = range.compaction_projection_mapper();
1479            while let Some(record_batch) = reader.next_batch()? {
1480                let record_batch = if let Some(mapper) = mapper {
1481                    let batch = mapper.project(record_batch)?;
1482                    batch
1483                } else {
1484                    record_batch
1485                };
1486
1487                if let Some(flat_compat) = may_compat {
1488                    let batch = flat_compat.compat(record_batch)?;
1489                    yield batch;
1490                } else {
1491                    yield record_batch;
1492                }
1493            }
1494
1495            let prune_metrics = reader.metrics();
1496
1497            // Update per-file metrics if tracking is enabled
1498            if let Some(file_metrics_map) = per_file_metrics.as_mut() {
1499                let file_id = range.file_handle().file_id();
1500                let file_metrics = file_metrics_map
1501                    .entry(file_id)
1502                    .or_insert_with(FileScanMetrics::default);
1503
1504                file_metrics.num_ranges += 1;
1505                file_metrics.num_rows += prune_metrics.num_rows;
1506                file_metrics.build_reader_cost += build_cost;
1507                file_metrics.scan_cost += prune_metrics.scan_cost;
1508            }
1509
1510            reader_metrics.merge_from(&prune_metrics);
1511        }
1512
1513        // Reports metrics.
1514        reader_metrics.observe_rows(read_type);
1515        reader_metrics.filter_metrics.observe();
1516        part_metrics.merge_reader_metrics(reader_metrics, per_file_metrics.as_ref());
1517    }
1518}
1519
1520/// Build the stream of scanning the extension range denoted by the [`RowGroupIndex`].
1521#[cfg(feature = "enterprise")]
1522pub(crate) async fn scan_extension_range(
1523    context: Arc<StreamContext>,
1524    index: RowGroupIndex,
1525    partition_metrics: PartitionMetrics,
1526) -> Result<BoxedBatchStream> {
1527    use snafu::ResultExt;
1528
1529    let range = context.input.extension_range(index.index);
1530    let reader = range.reader(context.as_ref());
1531    let stream = reader
1532        .read(context, partition_metrics, index)
1533        .await
1534        .context(crate::error::ScanExternalRangeSnafu)?;
1535    Ok(stream)
1536}
1537
1538pub(crate) async fn maybe_scan_other_ranges(
1539    context: &Arc<StreamContext>,
1540    index: RowGroupIndex,
1541    metrics: &PartitionMetrics,
1542) -> Result<BoxedBatchStream> {
1543    #[cfg(feature = "enterprise")]
1544    {
1545        scan_extension_range(context.clone(), index, metrics.clone()).await
1546    }
1547
1548    #[cfg(not(feature = "enterprise"))]
1549    {
1550        let _ = context;
1551        let _ = index;
1552        let _ = metrics;
1553
1554        crate::error::UnexpectedSnafu {
1555            reason: "no other ranges scannable",
1556        }
1557        .fail()
1558    }
1559}
1560
1561/// Build the stream of scanning the extension range in flat format denoted by the [`RowGroupIndex`].
1562#[cfg(feature = "enterprise")]
1563pub(crate) async fn scan_flat_extension_range(
1564    context: Arc<StreamContext>,
1565    index: RowGroupIndex,
1566    partition_metrics: PartitionMetrics,
1567) -> Result<BoxedRecordBatchStream> {
1568    use snafu::ResultExt;
1569
1570    let range = context.input.extension_range(index.index);
1571    let reader = range.flat_reader(context.as_ref());
1572    let stream = reader
1573        .read(context, partition_metrics, index)
1574        .await
1575        .context(crate::error::ScanExternalRangeSnafu)?;
1576    Ok(stream)
1577}
1578
1579pub(crate) async fn maybe_scan_flat_other_ranges(
1580    context: &Arc<StreamContext>,
1581    index: RowGroupIndex,
1582    metrics: &PartitionMetrics,
1583) -> Result<BoxedRecordBatchStream> {
1584    #[cfg(feature = "enterprise")]
1585    {
1586        scan_flat_extension_range(context.clone(), index, metrics.clone()).await
1587    }
1588
1589    #[cfg(not(feature = "enterprise"))]
1590    {
1591        let _ = context;
1592        let _ = index;
1593        let _ = metrics;
1594
1595        crate::error::UnexpectedSnafu {
1596            reason: "no other ranges scannable in flat format",
1597        }
1598        .fail()
1599    }
1600}
1601
1602/// A stream wrapper that splits record batches from an inner stream.
1603pub(crate) struct SplitRecordBatchStream<S> {
1604    /// The inner stream that yields record batches.
1605    inner: S,
1606    /// Buffer for split batches.
1607    batches: VecDeque<RecordBatch>,
1608}
1609
1610impl<S> SplitRecordBatchStream<S> {
1611    /// Creates a new splitting stream wrapper.
1612    pub(crate) fn new(inner: S) -> Self {
1613        Self {
1614            inner,
1615            batches: VecDeque::new(),
1616        }
1617    }
1618}
1619
1620impl<S> Stream for SplitRecordBatchStream<S>
1621where
1622    S: Stream<Item = Result<RecordBatch>> + Unpin,
1623{
1624    type Item = Result<RecordBatch>;
1625
1626    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1627        loop {
1628            // First, check if we have buffered split batches
1629            if let Some(batch) = self.batches.pop_front() {
1630                return Poll::Ready(Some(Ok(batch)));
1631            }
1632
1633            // Poll the inner stream for the next batch
1634            let record_batch = match futures::ready!(Pin::new(&mut self.inner).poll_next(cx)) {
1635                Some(Ok(batch)) => batch,
1636                Some(Err(e)) => return Poll::Ready(Some(Err(e))),
1637                None => return Poll::Ready(None),
1638            };
1639
1640            // Split the batch and buffer the results
1641            split_record_batch(record_batch, &mut self.batches);
1642            // Continue the loop to return the first split batch
1643        }
1644    }
1645}
1646
1647/// Splits the batch by timestamps.
1648///
1649/// # Panics
1650/// Panics if the timestamp array is invalid.
1651pub(crate) fn split_record_batch(record_batch: RecordBatch, batches: &mut VecDeque<RecordBatch>) {
1652    let batch_rows = record_batch.num_rows();
1653    if batch_rows == 0 {
1654        return;
1655    }
1656    if batch_rows < 2 {
1657        batches.push_back(record_batch);
1658        return;
1659    }
1660
1661    let time_index_pos = time_index_column_index(record_batch.num_columns());
1662    let timestamps = record_batch.column(time_index_pos);
1663    let (ts_values, _unit) = timestamp_array_to_primitive(timestamps).unwrap();
1664    let mut offsets = Vec::with_capacity(16);
1665    offsets.push(0);
1666    let values = ts_values.values();
1667    for (i, &value) in values.iter().take(batch_rows - 1).enumerate() {
1668        if value > values[i + 1] {
1669            offsets.push(i + 1);
1670        }
1671    }
1672    offsets.push(values.len());
1673
1674    // Splits the batch by offsets.
1675    for (i, &start) in offsets[..offsets.len() - 1].iter().enumerate() {
1676        let end = offsets[i + 1];
1677        let rows_in_batch = end - start;
1678        batches.push_back(record_batch.slice(start, rows_in_batch));
1679    }
1680}