Skip to main content

mito2/read/
scan_util.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities for scanners.
16
17use std::collections::{BinaryHeap, HashMap, VecDeque};
18use std::fmt;
19use std::pin::Pin;
20use std::sync::{Arc, Mutex};
21use std::task::{Context, Poll};
22use std::time::{Duration, Instant};
23
24use async_stream::try_stream;
25use common_telemetry::tracing;
26use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time};
27use datatypes::arrow::record_batch::RecordBatch;
28use datatypes::timestamp::timestamp_array_to_primitive;
29use futures::Stream;
30use prometheus::IntGauge;
31use smallvec::SmallVec;
32use store_api::storage::RegionId;
33
34use crate::error::Result;
35use crate::memtable::MemScanMetrics;
36use crate::metrics::{
37    IN_PROGRESS_SCAN, PRECISE_FILTER_ROWS_TOTAL, READ_BATCHES_RETURN, READ_ROW_GROUPS_TOTAL,
38    READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_RETURN, READ_STAGE_ELAPSED,
39};
40use crate::read::dedup::{DedupMetrics, DedupMetricsReport};
41use crate::read::flat_merge::{MergeMetrics, MergeMetricsReport};
42use crate::read::pruner::PartitionPruner;
43use crate::read::range::{RangeMeta, RowGroupIndex};
44use crate::read::scan_region::StreamContext;
45use crate::read::{BoxedRecordBatchStream, ScannerMetrics};
46use crate::sst::file::{FileTimeRange, RegionFileId};
47use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplyMetrics;
48use crate::sst::index::fulltext_index::applier::FulltextIndexApplyMetrics;
49use crate::sst::index::inverted_index::applier::InvertedIndexApplyMetrics;
50use crate::sst::parquet::file_range::{FileRange, PreFilterMode};
51use crate::sst::parquet::flat_format::time_index_column_index;
52use crate::sst::parquet::reader::{MetadataCacheMetrics, ReaderFilterMetrics, ReaderMetrics};
53use crate::sst::parquet::row_group::ParquetFetchMetrics;
54use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE};
55
56/// Per-file scan metrics.
57#[derive(Default, Clone)]
58pub struct FileScanMetrics {
59    /// Number of ranges (row groups) read from this file.
60    pub num_ranges: usize,
61    /// Number of rows read from this file.
62    pub num_rows: usize,
63    /// Time spent building file ranges/parts (file-level preparation).
64    pub build_part_cost: Duration,
65    /// Time spent building readers for this file (accumulated across all ranges).
66    pub build_reader_cost: Duration,
67    /// Time spent scanning this file (accumulated across all ranges).
68    pub scan_cost: Duration,
69}
70
71impl fmt::Debug for FileScanMetrics {
72    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73        write!(f, "{{\"build_part_cost\":\"{:?}\"", self.build_part_cost)?;
74
75        if self.num_ranges > 0 {
76            write!(f, ", \"num_ranges\":{}", self.num_ranges)?;
77        }
78        if self.num_rows > 0 {
79            write!(f, ", \"num_rows\":{}", self.num_rows)?;
80        }
81        if !self.build_reader_cost.is_zero() {
82            write!(
83                f,
84                ", \"build_reader_cost\":\"{:?}\"",
85                self.build_reader_cost
86            )?;
87        }
88        if !self.scan_cost.is_zero() {
89            write!(f, ", \"scan_cost\":\"{:?}\"", self.scan_cost)?;
90        }
91
92        write!(f, "}}")
93    }
94}
95
96impl FileScanMetrics {
97    /// Merges another FileMetrics into this one.
98    pub(crate) fn merge_from(&mut self, other: &FileScanMetrics) {
99        self.num_ranges += other.num_ranges;
100        self.num_rows += other.num_rows;
101        self.build_part_cost += other.build_part_cost;
102        self.build_reader_cost += other.build_reader_cost;
103        self.scan_cost += other.scan_cost;
104    }
105}
106
107/// Verbose scan metrics for a partition.
108#[derive(Default)]
109pub(crate) struct ScanMetricsSet {
110    /// Duration to prepare the scan task.
111    prepare_scan_cost: Duration,
112    /// Duration to build the (merge) reader.
113    build_reader_cost: Duration,
114    /// Duration to scan data.
115    scan_cost: Duration,
116    /// Duration while waiting for `yield`.
117    yield_cost: Duration,
118    /// Duration to convert [`Batch`]es.
119    convert_cost: Option<Time>,
120    /// Duration of the scan.
121    total_cost: Duration,
122    /// Number of rows returned.
123    num_rows: usize,
124    /// Number of batches returned.
125    num_batches: usize,
126    /// Number of mem ranges scanned.
127    num_mem_ranges: usize,
128    /// Number of file ranges scanned.
129    num_file_ranges: usize,
130
131    // Memtable related metrics:
132    /// Duration to scan memtables.
133    mem_scan_cost: Duration,
134    /// Number of rows read from memtables.
135    mem_rows: usize,
136    /// Number of batches read from memtables.
137    mem_batches: usize,
138    /// Number of series read from memtables.
139    mem_series: usize,
140    /// Duration of prefilter in memtable scan.
141    mem_prefilter_cost: Duration,
142    /// Number of rows filtered by prefilter in memtable scan.
143    mem_prefilter_rows_filtered: usize,
144
145    // SST related metrics:
146    /// Duration to build file ranges.
147    build_parts_cost: Duration,
148    /// Duration to scan SST files.
149    sst_scan_cost: Duration,
150    /// Number of row groups before filtering.
151    rg_total: usize,
152    /// Number of row groups filtered by fulltext index.
153    rg_fulltext_filtered: usize,
154    /// Number of row groups filtered by inverted index.
155    rg_inverted_filtered: usize,
156    /// Number of row groups filtered by min-max index.
157    rg_minmax_filtered: usize,
158    /// Number of row groups filtered by bloom filter index.
159    rg_bloom_filtered: usize,
160    /// Number of row groups filtered by vector index.
161    rg_vector_filtered: usize,
162    /// Number of rows in row group before filtering.
163    rows_before_filter: usize,
164    /// Number of rows in row group filtered by fulltext index.
165    rows_fulltext_filtered: usize,
166    /// Number of rows in row group filtered by inverted index.
167    rows_inverted_filtered: usize,
168    /// Number of rows in row group filtered by bloom filter index.
169    rows_bloom_filtered: usize,
170    /// Number of rows filtered by vector index.
171    rows_vector_filtered: usize,
172    /// Number of rows selected by vector index.
173    rows_vector_selected: usize,
174    /// Number of rows filtered by precise filter.
175    rows_precise_filtered: usize,
176    /// Number of index result cache hits for fulltext index.
177    fulltext_index_cache_hit: usize,
178    /// Number of index result cache misses for fulltext index.
179    fulltext_index_cache_miss: usize,
180    /// Number of index result cache hits for inverted index.
181    inverted_index_cache_hit: usize,
182    /// Number of index result cache misses for inverted index.
183    inverted_index_cache_miss: usize,
184    /// Number of index result cache hits for bloom filter index.
185    bloom_filter_cache_hit: usize,
186    /// Number of index result cache misses for bloom filter index.
187    bloom_filter_cache_miss: usize,
188    /// Number of index result cache hits for minmax pruning.
189    minmax_cache_hit: usize,
190    /// Number of index result cache misses for minmax pruning.
191    minmax_cache_miss: usize,
192    /// Number of pruner builder cache hits.
193    pruner_cache_hit: usize,
194    /// Number of pruner builder cache misses.
195    pruner_cache_miss: usize,
196    /// Duration spent waiting for pruner to build file ranges.
197    pruner_prune_cost: Duration,
198    /// Number of record batches read from SST.
199    num_sst_record_batches: usize,
200    /// Number of batches decoded from SST.
201    num_sst_batches: usize,
202    /// Number of rows read from SST.
203    num_sst_rows: usize,
204
205    /// Elapsed time before the first poll operation.
206    first_poll: Duration,
207
208    /// Number of send timeout in SeriesScan.
209    num_series_send_timeout: usize,
210    /// Number of send full in SeriesScan.
211    num_series_send_full: usize,
212    /// Number of rows the series distributor scanned.
213    num_distributor_rows: usize,
214    /// Number of batches the series distributor scanned.
215    num_distributor_batches: usize,
216    /// Duration of the series distributor to scan.
217    distributor_scan_cost: Duration,
218    /// Duration of the series distributor to yield.
219    distributor_yield_cost: Duration,
220    /// Duration spent in divider operations.
221    distributor_divider_cost: Duration,
222
223    /// Merge metrics.
224    merge_metrics: MergeMetrics,
225    /// Dedup metrics.
226    dedup_metrics: DedupMetrics,
227
228    /// The stream reached EOF
229    stream_eof: bool,
230
231    // Optional verbose metrics:
232    /// Inverted index apply metrics.
233    inverted_index_apply_metrics: Option<InvertedIndexApplyMetrics>,
234    /// Bloom filter index apply metrics.
235    bloom_filter_apply_metrics: Option<BloomFilterIndexApplyMetrics>,
236    /// Fulltext index apply metrics.
237    fulltext_index_apply_metrics: Option<FulltextIndexApplyMetrics>,
238    /// Parquet fetch metrics.
239    fetch_metrics: Option<ParquetFetchMetrics>,
240    /// Metadata cache metrics.
241    metadata_cache_metrics: Option<MetadataCacheMetrics>,
242    /// Per-file scan metrics, only populated when explain_verbose is true.
243    per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
244
245    /// Current memory usage for file range builders.
246    build_ranges_mem_size: isize,
247    /// Peak memory usage for file range builders.
248    build_ranges_peak_mem_size: isize,
249    /// Current number of file range builders.
250    num_range_builders: isize,
251    /// Peak number of file range builders.
252    num_peak_range_builders: isize,
253    /// Total bytes added to the range cache during this scan.
254    range_cache_size: usize,
255    /// Number of range cache hits during this scan.
256    range_cache_hit: usize,
257    /// Number of range cache misses during this scan.
258    range_cache_miss: usize,
259}
260
261/// Wrapper for file metrics that compares by total cost in reverse order.
262/// This allows using BinaryHeap as a min-heap for efficient top-K selection.
263struct CompareCostReverse<'a> {
264    total_cost: Duration,
265    file_id: RegionFileId,
266    metrics: &'a FileScanMetrics,
267}
268
269impl Ord for CompareCostReverse<'_> {
270    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
271        // Reverse comparison: smaller costs are "greater"
272        other.total_cost.cmp(&self.total_cost)
273    }
274}
275
276impl PartialOrd for CompareCostReverse<'_> {
277    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
278        Some(self.cmp(other))
279    }
280}
281
282impl Eq for CompareCostReverse<'_> {}
283
284impl PartialEq for CompareCostReverse<'_> {
285    fn eq(&self, other: &Self) -> bool {
286        self.total_cost == other.total_cost
287    }
288}
289
290impl fmt::Debug for ScanMetricsSet {
291    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
292        let ScanMetricsSet {
293            prepare_scan_cost,
294            build_reader_cost,
295            scan_cost,
296            yield_cost,
297            convert_cost,
298            total_cost,
299            num_rows,
300            num_batches,
301            num_mem_ranges,
302            num_file_ranges,
303            build_parts_cost,
304            sst_scan_cost,
305            rg_total,
306            rg_fulltext_filtered,
307            rg_inverted_filtered,
308            rg_minmax_filtered,
309            rg_bloom_filtered,
310            rg_vector_filtered,
311            rows_before_filter,
312            rows_fulltext_filtered,
313            rows_inverted_filtered,
314            rows_bloom_filtered,
315            rows_vector_filtered,
316            rows_vector_selected,
317            rows_precise_filtered,
318            fulltext_index_cache_hit,
319            fulltext_index_cache_miss,
320            inverted_index_cache_hit,
321            inverted_index_cache_miss,
322            bloom_filter_cache_hit,
323            bloom_filter_cache_miss,
324            minmax_cache_hit,
325            minmax_cache_miss,
326            pruner_cache_hit,
327            pruner_cache_miss,
328            pruner_prune_cost,
329            num_sst_record_batches,
330            num_sst_batches,
331            num_sst_rows,
332            first_poll,
333            num_series_send_timeout,
334            num_series_send_full,
335            num_distributor_rows,
336            num_distributor_batches,
337            distributor_scan_cost,
338            distributor_yield_cost,
339            distributor_divider_cost,
340            merge_metrics,
341            dedup_metrics,
342            stream_eof,
343            mem_scan_cost,
344            mem_rows,
345            mem_batches,
346            mem_series,
347            mem_prefilter_cost,
348            mem_prefilter_rows_filtered,
349            inverted_index_apply_metrics,
350            bloom_filter_apply_metrics,
351            fulltext_index_apply_metrics,
352            fetch_metrics,
353            metadata_cache_metrics,
354            per_file_metrics,
355            build_ranges_mem_size: _,
356            build_ranges_peak_mem_size,
357            num_range_builders: _,
358            num_peak_range_builders,
359            range_cache_size,
360            range_cache_hit,
361            range_cache_miss,
362        } = self;
363
364        // Write core metrics
365        write!(
366            f,
367            "{{\"prepare_scan_cost\":\"{prepare_scan_cost:?}\", \
368            \"build_reader_cost\":\"{build_reader_cost:?}\", \
369            \"scan_cost\":\"{scan_cost:?}\", \
370            \"yield_cost\":\"{yield_cost:?}\", \
371            \"total_cost\":\"{total_cost:?}\", \
372            \"num_rows\":{num_rows}, \
373            \"num_batches\":{num_batches}, \
374            \"num_mem_ranges\":{num_mem_ranges}, \
375            \"num_file_ranges\":{num_file_ranges}, \
376            \"build_parts_cost\":\"{build_parts_cost:?}\", \
377            \"sst_scan_cost\":\"{sst_scan_cost:?}\", \
378            \"rg_total\":{rg_total}, \
379            \"rows_before_filter\":{rows_before_filter}, \
380            \"num_sst_record_batches\":{num_sst_record_batches}, \
381            \"num_sst_batches\":{num_sst_batches}, \
382            \"num_sst_rows\":{num_sst_rows}, \
383            \"first_poll\":\"{first_poll:?}\""
384        )?;
385
386        // Write convert_cost if present
387        if let Some(time) = convert_cost {
388            let duration = Duration::from_nanos(time.value() as u64);
389            write!(f, ", \"convert_cost\":\"{duration:?}\"")?;
390        }
391
392        // Write non-zero filter counters
393        if *rg_fulltext_filtered > 0 {
394            write!(f, ", \"rg_fulltext_filtered\":{rg_fulltext_filtered}")?;
395        }
396        if *rg_inverted_filtered > 0 {
397            write!(f, ", \"rg_inverted_filtered\":{rg_inverted_filtered}")?;
398        }
399        if *rg_minmax_filtered > 0 {
400            write!(f, ", \"rg_minmax_filtered\":{rg_minmax_filtered}")?;
401        }
402        if *rg_bloom_filtered > 0 {
403            write!(f, ", \"rg_bloom_filtered\":{rg_bloom_filtered}")?;
404        }
405        if *rg_vector_filtered > 0 {
406            write!(f, ", \"rg_vector_filtered\":{rg_vector_filtered}")?;
407        }
408        if *rows_fulltext_filtered > 0 {
409            write!(f, ", \"rows_fulltext_filtered\":{rows_fulltext_filtered}")?;
410        }
411        if *rows_inverted_filtered > 0 {
412            write!(f, ", \"rows_inverted_filtered\":{rows_inverted_filtered}")?;
413        }
414        if *rows_bloom_filtered > 0 {
415            write!(f, ", \"rows_bloom_filtered\":{rows_bloom_filtered}")?;
416        }
417        if *rows_vector_filtered > 0 {
418            write!(f, ", \"rows_vector_filtered\":{rows_vector_filtered}")?;
419        }
420        if *rows_vector_selected > 0 {
421            write!(f, ", \"rows_vector_selected\":{rows_vector_selected}")?;
422        }
423        if *rows_precise_filtered > 0 {
424            write!(f, ", \"rows_precise_filtered\":{rows_precise_filtered}")?;
425        }
426        if *fulltext_index_cache_hit > 0 {
427            write!(
428                f,
429                ", \"fulltext_index_cache_hit\":{fulltext_index_cache_hit}"
430            )?;
431        }
432        if *fulltext_index_cache_miss > 0 {
433            write!(
434                f,
435                ", \"fulltext_index_cache_miss\":{fulltext_index_cache_miss}"
436            )?;
437        }
438        if *inverted_index_cache_hit > 0 {
439            write!(
440                f,
441                ", \"inverted_index_cache_hit\":{inverted_index_cache_hit}"
442            )?;
443        }
444        if *inverted_index_cache_miss > 0 {
445            write!(
446                f,
447                ", \"inverted_index_cache_miss\":{inverted_index_cache_miss}"
448            )?;
449        }
450        if *bloom_filter_cache_hit > 0 {
451            write!(f, ", \"bloom_filter_cache_hit\":{bloom_filter_cache_hit}")?;
452        }
453        if *bloom_filter_cache_miss > 0 {
454            write!(f, ", \"bloom_filter_cache_miss\":{bloom_filter_cache_miss}")?;
455        }
456        if *minmax_cache_hit > 0 {
457            write!(f, ", \"minmax_cache_hit\":{minmax_cache_hit}")?;
458        }
459        if *minmax_cache_miss > 0 {
460            write!(f, ", \"minmax_cache_miss\":{minmax_cache_miss}")?;
461        }
462        if *pruner_cache_hit > 0 {
463            write!(f, ", \"pruner_cache_hit\":{pruner_cache_hit}")?;
464        }
465        if *pruner_cache_miss > 0 {
466            write!(f, ", \"pruner_cache_miss\":{pruner_cache_miss}")?;
467        }
468        if !pruner_prune_cost.is_zero() {
469            write!(f, ", \"pruner_prune_cost\":\"{pruner_prune_cost:?}\"")?;
470        }
471
472        // Write non-zero distributor metrics
473        if *num_series_send_timeout > 0 {
474            write!(f, ", \"num_series_send_timeout\":{num_series_send_timeout}")?;
475        }
476        if *num_series_send_full > 0 {
477            write!(f, ", \"num_series_send_full\":{num_series_send_full}")?;
478        }
479        if *num_distributor_rows > 0 {
480            write!(f, ", \"num_distributor_rows\":{num_distributor_rows}")?;
481        }
482        if *num_distributor_batches > 0 {
483            write!(f, ", \"num_distributor_batches\":{num_distributor_batches}")?;
484        }
485        if !distributor_scan_cost.is_zero() {
486            write!(
487                f,
488                ", \"distributor_scan_cost\":\"{distributor_scan_cost:?}\""
489            )?;
490        }
491        if !distributor_yield_cost.is_zero() {
492            write!(
493                f,
494                ", \"distributor_yield_cost\":\"{distributor_yield_cost:?}\""
495            )?;
496        }
497        if !distributor_divider_cost.is_zero() {
498            write!(
499                f,
500                ", \"distributor_divider_cost\":\"{distributor_divider_cost:?}\""
501            )?;
502        }
503
504        // Write non-zero memtable metrics
505        if *mem_rows > 0 {
506            write!(f, ", \"mem_rows\":{mem_rows}")?;
507        }
508        if *mem_batches > 0 {
509            write!(f, ", \"mem_batches\":{mem_batches}")?;
510        }
511        if *mem_series > 0 {
512            write!(f, ", \"mem_series\":{mem_series}")?;
513        }
514        if !mem_scan_cost.is_zero() {
515            write!(f, ", \"mem_scan_cost\":\"{mem_scan_cost:?}\"")?;
516        }
517        if !mem_prefilter_cost.is_zero() {
518            write!(f, ", \"mem_prefilter_cost\":\"{mem_prefilter_cost:?}\"")?;
519        }
520        if *mem_prefilter_rows_filtered > 0 {
521            write!(
522                f,
523                ", \"mem_prefilter_rows_filtered\":{mem_prefilter_rows_filtered}"
524            )?;
525        }
526
527        // Write optional verbose metrics if they are not empty
528        if let Some(metrics) = inverted_index_apply_metrics
529            && !metrics.is_empty()
530        {
531            write!(f, ", \"inverted_index_apply_metrics\":{:?}", metrics)?;
532        }
533        if let Some(metrics) = bloom_filter_apply_metrics
534            && !metrics.is_empty()
535        {
536            write!(f, ", \"bloom_filter_apply_metrics\":{:?}", metrics)?;
537        }
538        if let Some(metrics) = fulltext_index_apply_metrics
539            && !metrics.is_empty()
540        {
541            write!(f, ", \"fulltext_index_apply_metrics\":{:?}", metrics)?;
542        }
543        if let Some(metrics) = fetch_metrics
544            && !metrics.is_empty()
545        {
546            write!(f, ", \"fetch_metrics\":{:?}", metrics)?;
547        }
548        if let Some(metrics) = metadata_cache_metrics
549            && !metrics.is_empty()
550        {
551            write!(f, ", \"metadata_cache_metrics\":{:?}", metrics)?;
552        }
553
554        // Write merge metrics if not empty
555        if !merge_metrics.scan_cost.is_zero() {
556            write!(f, ", \"merge_metrics\":{:?}", merge_metrics)?;
557        }
558
559        // Write dedup metrics if not empty
560        if !dedup_metrics.dedup_cost.is_zero() {
561            write!(f, ", \"dedup_metrics\":{:?}", dedup_metrics)?;
562        }
563
564        // Write top file metrics if present and non-empty
565        if let Some(file_metrics) = per_file_metrics
566            && !file_metrics.is_empty()
567        {
568            // Use min-heap (BinaryHeap with reverse comparison) to keep only top 10
569            let mut heap = BinaryHeap::new();
570            for (file_id, metrics) in file_metrics.iter() {
571                let total_cost =
572                    metrics.build_part_cost + metrics.build_reader_cost + metrics.scan_cost;
573
574                // If the file has been pruned by a pruner, the build part cost may be zero.
575                // If we didn't read any ranges from it, we don't output the file.
576                if total_cost.is_zero() && metrics.num_ranges == 0 {
577                    continue;
578                }
579
580                if heap.len() < 10 {
581                    // Haven't reached 10 yet, just push
582                    heap.push(CompareCostReverse {
583                        total_cost,
584                        file_id: *file_id,
585                        metrics,
586                    });
587                } else if let Some(min_entry) = heap.peek() {
588                    // If current cost is higher than the minimum in our top-10, replace it
589                    if total_cost > min_entry.total_cost {
590                        heap.pop();
591                        heap.push(CompareCostReverse {
592                            total_cost,
593                            file_id: *file_id,
594                            metrics,
595                        });
596                    }
597                }
598            }
599
600            let top_files = heap.into_sorted_vec();
601            write!(f, ", \"top_file_metrics\": {{")?;
602            for (i, item) in top_files.iter().enumerate() {
603                let CompareCostReverse {
604                    total_cost: _,
605                    file_id,
606                    metrics,
607                } = item;
608                if i > 0 {
609                    write!(f, ", ")?;
610                }
611                write!(f, "\"{}\": {:?}", file_id, metrics)?;
612            }
613            write!(f, "}}")?;
614        }
615
616        if *range_cache_size > 0 {
617            write!(f, ", \"range_cache_size\":{range_cache_size}")?;
618        }
619        if *range_cache_hit > 0 {
620            write!(f, ", \"range_cache_hit\":{range_cache_hit}")?;
621        }
622        if *range_cache_miss > 0 {
623            write!(f, ", \"range_cache_miss\":{range_cache_miss}")?;
624        }
625
626        write!(
627            f,
628            ", \"build_ranges_peak_mem_size\":{build_ranges_peak_mem_size}, \
629             \"num_peak_range_builders\":{num_peak_range_builders}, \
630             \"stream_eof\":{stream_eof}}}"
631        )
632    }
633}
634impl ScanMetricsSet {
635    /// Attaches the `prepare_scan_cost` to the metrics set.
636    fn with_prepare_scan_cost(mut self, cost: Duration) -> Self {
637        self.prepare_scan_cost += cost;
638        self
639    }
640
641    /// Attaches the `convert_cost` to the metrics set.
642    fn with_convert_cost(mut self, time: Time) -> Self {
643        self.convert_cost = Some(time);
644        self
645    }
646
647    /// Merges the local scanner metrics.
648    fn merge_scanner_metrics(&mut self, other: &ScannerMetrics) {
649        let ScannerMetrics {
650            scan_cost,
651            yield_cost,
652            num_batches,
653            num_rows,
654        } = other;
655
656        self.scan_cost += *scan_cost;
657        self.yield_cost += *yield_cost;
658        self.num_rows += *num_rows;
659        self.num_batches += *num_batches;
660    }
661
662    /// Merges the local reader metrics.
663    fn merge_reader_metrics(&mut self, other: &ReaderMetrics) {
664        let ReaderMetrics {
665            build_cost,
666            filter_metrics:
667                ReaderFilterMetrics {
668                    rg_total,
669                    rg_fulltext_filtered,
670                    rg_inverted_filtered,
671                    rg_minmax_filtered,
672                    rg_bloom_filtered,
673                    rg_vector_filtered,
674                    rows_total,
675                    rows_fulltext_filtered,
676                    rows_inverted_filtered,
677                    rows_bloom_filtered,
678                    rows_vector_filtered,
679                    rows_vector_selected,
680                    rows_precise_filtered,
681                    fulltext_index_cache_hit,
682                    fulltext_index_cache_miss,
683                    inverted_index_cache_hit,
684                    inverted_index_cache_miss,
685                    bloom_filter_cache_hit,
686                    bloom_filter_cache_miss,
687                    minmax_cache_hit,
688                    minmax_cache_miss,
689                    pruner_cache_hit,
690                    pruner_cache_miss,
691                    pruner_prune_cost,
692                    inverted_index_apply_metrics,
693                    bloom_filter_apply_metrics,
694                    fulltext_index_apply_metrics,
695                },
696            num_record_batches,
697            num_batches,
698            num_rows,
699            scan_cost,
700            metadata_cache_metrics,
701            fetch_metrics,
702            metadata_mem_size,
703            num_range_builders,
704        } = other;
705
706        self.build_parts_cost += *build_cost;
707        self.sst_scan_cost += *scan_cost;
708
709        self.rg_total += *rg_total;
710        self.rg_fulltext_filtered += *rg_fulltext_filtered;
711        self.rg_inverted_filtered += *rg_inverted_filtered;
712        self.rg_minmax_filtered += *rg_minmax_filtered;
713        self.rg_bloom_filtered += *rg_bloom_filtered;
714        self.rg_vector_filtered += *rg_vector_filtered;
715
716        self.rows_before_filter += *rows_total;
717        self.rows_fulltext_filtered += *rows_fulltext_filtered;
718        self.rows_inverted_filtered += *rows_inverted_filtered;
719        self.rows_bloom_filtered += *rows_bloom_filtered;
720        self.rows_vector_filtered += *rows_vector_filtered;
721        self.rows_vector_selected += *rows_vector_selected;
722        self.rows_precise_filtered += *rows_precise_filtered;
723
724        self.fulltext_index_cache_hit += *fulltext_index_cache_hit;
725        self.fulltext_index_cache_miss += *fulltext_index_cache_miss;
726        self.inverted_index_cache_hit += *inverted_index_cache_hit;
727        self.inverted_index_cache_miss += *inverted_index_cache_miss;
728        self.bloom_filter_cache_hit += *bloom_filter_cache_hit;
729        self.bloom_filter_cache_miss += *bloom_filter_cache_miss;
730        self.minmax_cache_hit += *minmax_cache_hit;
731        self.minmax_cache_miss += *minmax_cache_miss;
732        self.pruner_cache_hit += *pruner_cache_hit;
733        self.pruner_cache_miss += *pruner_cache_miss;
734        self.pruner_prune_cost += *pruner_prune_cost;
735
736        self.num_sst_record_batches += *num_record_batches;
737        self.num_sst_batches += *num_batches;
738        self.num_sst_rows += *num_rows;
739
740        // Merge optional verbose metrics
741        if let Some(metrics) = inverted_index_apply_metrics {
742            self.inverted_index_apply_metrics
743                .get_or_insert_with(InvertedIndexApplyMetrics::default)
744                .merge_from(metrics);
745        }
746        if let Some(metrics) = bloom_filter_apply_metrics {
747            self.bloom_filter_apply_metrics
748                .get_or_insert_with(BloomFilterIndexApplyMetrics::default)
749                .merge_from(metrics);
750        }
751        if let Some(metrics) = fulltext_index_apply_metrics {
752            self.fulltext_index_apply_metrics
753                .get_or_insert_with(FulltextIndexApplyMetrics::default)
754                .merge_from(metrics);
755        }
756        if let Some(metrics) = fetch_metrics {
757            self.fetch_metrics
758                .get_or_insert_with(ParquetFetchMetrics::default)
759                .merge_from(metrics);
760        }
761        self.metadata_cache_metrics
762            .get_or_insert_with(MetadataCacheMetrics::default)
763            .merge_from(metadata_cache_metrics);
764
765        // Track memory usage and update peak.
766        self.build_ranges_mem_size += *metadata_mem_size;
767        if self.build_ranges_mem_size > self.build_ranges_peak_mem_size {
768            self.build_ranges_peak_mem_size = self.build_ranges_mem_size;
769        }
770
771        // Track number of builders and update peak.
772        self.num_range_builders += *num_range_builders;
773        if self.num_range_builders > self.num_peak_range_builders {
774            self.num_peak_range_builders = self.num_range_builders;
775        }
776    }
777
778    /// Merges per-file metrics.
779    fn merge_per_file_metrics(&mut self, other: &HashMap<RegionFileId, FileScanMetrics>) {
780        let self_file_metrics = self.per_file_metrics.get_or_insert_with(HashMap::new);
781        for (file_id, metrics) in other {
782            self_file_metrics
783                .entry(*file_id)
784                .or_default()
785                .merge_from(metrics);
786        }
787    }
788
789    /// Sets distributor metrics.
790    fn set_distributor_metrics(&mut self, distributor_metrics: &SeriesDistributorMetrics) {
791        let SeriesDistributorMetrics {
792            num_series_send_timeout,
793            num_series_send_full,
794            num_rows,
795            num_batches,
796            scan_cost,
797            yield_cost,
798            divider_cost,
799        } = distributor_metrics;
800
801        self.num_series_send_timeout += *num_series_send_timeout;
802        self.num_series_send_full += *num_series_send_full;
803        self.num_distributor_rows += *num_rows;
804        self.num_distributor_batches += *num_batches;
805        self.distributor_scan_cost += *scan_cost;
806        self.distributor_yield_cost += *yield_cost;
807        self.distributor_divider_cost += *divider_cost;
808    }
809
810    /// Observes metrics.
811    fn observe_metrics(&self) {
812        READ_STAGE_ELAPSED
813            .with_label_values(&["prepare_scan"])
814            .observe(self.prepare_scan_cost.as_secs_f64());
815        READ_STAGE_ELAPSED
816            .with_label_values(&["build_reader"])
817            .observe(self.build_reader_cost.as_secs_f64());
818        READ_STAGE_ELAPSED
819            .with_label_values(&["scan"])
820            .observe(self.scan_cost.as_secs_f64());
821        READ_STAGE_ELAPSED
822            .with_label_values(&["yield"])
823            .observe(self.yield_cost.as_secs_f64());
824        if let Some(time) = &self.convert_cost {
825            READ_STAGE_ELAPSED
826                .with_label_values(&["convert"])
827                .observe(Duration::from_nanos(time.value() as u64).as_secs_f64());
828        }
829        READ_STAGE_ELAPSED
830            .with_label_values(&["total"])
831            .observe(self.total_cost.as_secs_f64());
832        READ_ROWS_RETURN.observe(self.num_rows as f64);
833        READ_BATCHES_RETURN.observe(self.num_batches as f64);
834
835        READ_STAGE_ELAPSED
836            .with_label_values(&["build_parts"])
837            .observe(self.build_parts_cost.as_secs_f64());
838
839        READ_ROW_GROUPS_TOTAL
840            .with_label_values(&["before_filtering"])
841            .inc_by(self.rg_total as u64);
842        READ_ROW_GROUPS_TOTAL
843            .with_label_values(&["fulltext_index_filtered"])
844            .inc_by(self.rg_fulltext_filtered as u64);
845        READ_ROW_GROUPS_TOTAL
846            .with_label_values(&["inverted_index_filtered"])
847            .inc_by(self.rg_inverted_filtered as u64);
848        READ_ROW_GROUPS_TOTAL
849            .with_label_values(&["minmax_index_filtered"])
850            .inc_by(self.rg_minmax_filtered as u64);
851        READ_ROW_GROUPS_TOTAL
852            .with_label_values(&["bloom_filter_index_filtered"])
853            .inc_by(self.rg_bloom_filtered as u64);
854        #[cfg(feature = "vector_index")]
855        READ_ROW_GROUPS_TOTAL
856            .with_label_values(&["vector_index_filtered"])
857            .inc_by(self.rg_vector_filtered as u64);
858
859        PRECISE_FILTER_ROWS_TOTAL
860            .with_label_values(&["parquet"])
861            .inc_by(self.rows_precise_filtered as u64);
862        READ_ROWS_IN_ROW_GROUP_TOTAL
863            .with_label_values(&["before_filtering"])
864            .inc_by(self.rows_before_filter as u64);
865        READ_ROWS_IN_ROW_GROUP_TOTAL
866            .with_label_values(&["fulltext_index_filtered"])
867            .inc_by(self.rows_fulltext_filtered as u64);
868        READ_ROWS_IN_ROW_GROUP_TOTAL
869            .with_label_values(&["inverted_index_filtered"])
870            .inc_by(self.rows_inverted_filtered as u64);
871        READ_ROWS_IN_ROW_GROUP_TOTAL
872            .with_label_values(&["bloom_filter_index_filtered"])
873            .inc_by(self.rows_bloom_filtered as u64);
874        #[cfg(feature = "vector_index")]
875        READ_ROWS_IN_ROW_GROUP_TOTAL
876            .with_label_values(&["vector_index_filtered"])
877            .inc_by(self.rows_vector_filtered as u64);
878    }
879}
880
881struct PartitionMetricsInner {
882    region_id: RegionId,
883    /// Index of the partition to scan.
884    partition: usize,
885    /// Label to distinguish different scan operation.
886    scanner_type: &'static str,
887    /// Query start time.
888    query_start: Instant,
889    /// Whether to use verbose logging.
890    explain_verbose: bool,
891    /// Verbose scan metrics that only log to debug logs by default.
892    metrics: Mutex<ScanMetricsSet>,
893    in_progress_scan: IntGauge,
894
895    // Normal metrics that always report to the [ExecutionPlanMetricsSet]:
896    /// Duration to build file ranges.
897    build_parts_cost: Time,
898    /// Duration to build the (merge) reader.
899    build_reader_cost: Time,
900    /// Duration to scan data.
901    scan_cost: Time,
902    /// Duration while waiting for `yield`.
903    yield_cost: Time,
904    /// Duration to convert [`Batch`]es.
905    convert_cost: Time,
906    /// Aggregated compute time reported to DataFusion.
907    elapsed_compute: Time,
908}
909
910impl PartitionMetricsInner {
911    fn on_finish(&self, stream_eof: bool) {
912        let mut metrics = self.metrics.lock().unwrap();
913        if metrics.total_cost.is_zero() {
914            metrics.total_cost = self.query_start.elapsed();
915        }
916        if !metrics.stream_eof {
917            metrics.stream_eof = stream_eof;
918        }
919    }
920}
921
922impl MergeMetricsReport for PartitionMetricsInner {
923    fn report(&self, metrics: &mut MergeMetrics) {
924        let mut scan_metrics = self.metrics.lock().unwrap();
925        // Merge the metrics into scan_metrics
926        scan_metrics.merge_metrics.merge(metrics);
927
928        // Reset the input metrics
929        *metrics = MergeMetrics::default();
930    }
931}
932
933impl DedupMetricsReport for PartitionMetricsInner {
934    fn report(&self, metrics: &mut DedupMetrics) {
935        let mut scan_metrics = self.metrics.lock().unwrap();
936        // Merge the metrics into scan_metrics
937        scan_metrics.dedup_metrics.merge(metrics);
938
939        // Reset the input metrics
940        *metrics = DedupMetrics::default();
941    }
942}
943
944impl Drop for PartitionMetricsInner {
945    fn drop(&mut self) {
946        self.on_finish(false);
947        let metrics = self.metrics.lock().unwrap();
948        metrics.observe_metrics();
949        self.in_progress_scan.dec();
950
951        if self.explain_verbose {
952            common_telemetry::info!(
953                "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}",
954                self.scanner_type,
955                self.region_id,
956                self.partition,
957                metrics,
958            );
959        } else {
960            common_telemetry::debug!(
961                "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}",
962                self.scanner_type,
963                self.region_id,
964                self.partition,
965                metrics,
966            );
967        }
968    }
969}
970
971/// List of PartitionMetrics.
972#[derive(Default)]
973pub(crate) struct PartitionMetricsList(Mutex<Vec<Option<PartitionMetrics>>>);
974
975impl PartitionMetricsList {
976    /// Sets a new [PartitionMetrics] at the specified partition.
977    pub(crate) fn set(&self, partition: usize, metrics: PartitionMetrics) {
978        let mut list = self.0.lock().unwrap();
979        if list.len() <= partition {
980            list.resize(partition + 1, None);
981        }
982        list[partition] = Some(metrics);
983    }
984
985    /// Format verbose metrics for each partition for explain.
986    pub(crate) fn format_verbose_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result {
987        let list = self.0.lock().unwrap();
988        write!(f, ", \"metrics_per_partition\": ")?;
989        f.debug_list()
990            .entries(list.iter().filter_map(|p| p.as_ref()))
991            .finish()?;
992        write!(f, "}}")
993    }
994}
995
996/// Metrics while reading a partition.
997#[derive(Clone)]
998pub struct PartitionMetrics(Arc<PartitionMetricsInner>);
999
1000impl PartitionMetrics {
1001    pub(crate) fn new(
1002        region_id: RegionId,
1003        partition: usize,
1004        scanner_type: &'static str,
1005        query_start: Instant,
1006        explain_verbose: bool,
1007        metrics_set: &ExecutionPlanMetricsSet,
1008    ) -> Self {
1009        let partition_str = partition.to_string();
1010        let in_progress_scan = IN_PROGRESS_SCAN.with_label_values(&[scanner_type, &partition_str]);
1011        in_progress_scan.inc();
1012        let convert_cost = MetricBuilder::new(metrics_set).subset_time("convert_cost", partition);
1013        let metrics = ScanMetricsSet::default()
1014            .with_prepare_scan_cost(query_start.elapsed())
1015            .with_convert_cost(convert_cost.clone());
1016        let inner = PartitionMetricsInner {
1017            region_id,
1018            partition,
1019            scanner_type,
1020            query_start,
1021            explain_verbose,
1022            metrics: Mutex::new(metrics),
1023            in_progress_scan,
1024            build_parts_cost: MetricBuilder::new(metrics_set)
1025                .subset_time("build_parts_cost", partition),
1026            build_reader_cost: MetricBuilder::new(metrics_set)
1027                .subset_time("build_reader_cost", partition),
1028            scan_cost: MetricBuilder::new(metrics_set).subset_time("scan_cost", partition),
1029            yield_cost: MetricBuilder::new(metrics_set).subset_time("yield_cost", partition),
1030            convert_cost,
1031            elapsed_compute: MetricBuilder::new(metrics_set).elapsed_compute(partition),
1032        };
1033        Self(Arc::new(inner))
1034    }
1035
1036    pub(crate) fn on_first_poll(&self) {
1037        let mut metrics = self.0.metrics.lock().unwrap();
1038        metrics.first_poll = self.0.query_start.elapsed();
1039    }
1040
1041    pub(crate) fn inc_num_mem_ranges(&self, num: usize) {
1042        let mut metrics = self.0.metrics.lock().unwrap();
1043        metrics.num_mem_ranges += num;
1044    }
1045
1046    pub fn inc_num_file_ranges(&self, num: usize) {
1047        let mut metrics = self.0.metrics.lock().unwrap();
1048        metrics.num_file_ranges += num;
1049    }
1050
1051    fn record_elapsed_compute(&self, duration: Duration) {
1052        if duration.is_zero() {
1053            return;
1054        }
1055        self.0.elapsed_compute.add_duration(duration);
1056    }
1057
1058    /// Merges `build_reader_cost`.
1059    pub(crate) fn inc_build_reader_cost(&self, cost: Duration) {
1060        self.0.build_reader_cost.add_duration(cost);
1061
1062        let mut metrics = self.0.metrics.lock().unwrap();
1063        metrics.build_reader_cost += cost;
1064    }
1065
1066    pub(crate) fn inc_convert_batch_cost(&self, cost: Duration) {
1067        self.0.convert_cost.add_duration(cost);
1068        self.record_elapsed_compute(cost);
1069    }
1070
1071    /// Reports memtable scan metrics.
1072    pub(crate) fn report_mem_scan_metrics(&self, data: &crate::memtable::MemScanMetricsData) {
1073        let mut metrics = self.0.metrics.lock().unwrap();
1074        metrics.mem_scan_cost += data.scan_cost;
1075        metrics.mem_rows += data.num_rows;
1076        metrics.mem_batches += data.num_batches;
1077        metrics.mem_series += data.total_series;
1078        metrics.mem_prefilter_cost += data.prefilter_cost;
1079        metrics.mem_prefilter_rows_filtered += data.prefilter_rows_filtered;
1080    }
1081
1082    /// Merges [ScannerMetrics], `build_reader_cost`, `scan_cost` and `yield_cost`.
1083    pub(crate) fn merge_metrics(&self, metrics: &ScannerMetrics) {
1084        self.0.scan_cost.add_duration(metrics.scan_cost);
1085        self.record_elapsed_compute(metrics.scan_cost);
1086        self.0.yield_cost.add_duration(metrics.yield_cost);
1087        self.record_elapsed_compute(metrics.yield_cost);
1088
1089        let mut metrics_set = self.0.metrics.lock().unwrap();
1090        metrics_set.merge_scanner_metrics(metrics);
1091    }
1092
1093    /// Merges [ReaderMetrics] and `build_reader_cost`.
1094    pub fn merge_reader_metrics(
1095        &self,
1096        metrics: &ReaderMetrics,
1097        per_file_metrics: Option<&HashMap<RegionFileId, FileScanMetrics>>,
1098    ) {
1099        self.0.build_parts_cost.add_duration(metrics.build_cost);
1100
1101        let mut metrics_set = self.0.metrics.lock().unwrap();
1102        metrics_set.merge_reader_metrics(metrics);
1103
1104        // Merge per-file metrics if provided
1105        if let Some(file_metrics) = per_file_metrics {
1106            metrics_set.merge_per_file_metrics(file_metrics);
1107        }
1108    }
1109
1110    /// Finishes the query.
1111    pub(crate) fn on_finish(&self) {
1112        self.0.on_finish(true);
1113    }
1114
1115    /// Sets the distributor metrics.
1116    pub(crate) fn set_distributor_metrics(&self, metrics: &SeriesDistributorMetrics) {
1117        let mut metrics_set = self.0.metrics.lock().unwrap();
1118        metrics_set.set_distributor_metrics(metrics);
1119    }
1120
1121    /// Returns whether verbose explain is enabled.
1122    pub(crate) fn explain_verbose(&self) -> bool {
1123        self.0.explain_verbose
1124    }
1125
1126    /// Returns a MergeMetricsReport trait object for reporting merge metrics.
1127    pub(crate) fn merge_metrics_reporter(&self) -> Arc<dyn MergeMetricsReport> {
1128        self.0.clone()
1129    }
1130
1131    /// Returns a DedupMetricsReport trait object for reporting dedup metrics.
1132    pub(crate) fn dedup_metrics_reporter(&self) -> Arc<dyn DedupMetricsReport> {
1133        self.0.clone()
1134    }
1135
1136    /// Increments the total bytes added to the range cache.
1137    #[allow(dead_code)]
1138    pub(crate) fn inc_range_cache_size(&self, size: usize) {
1139        let mut metrics = self.0.metrics.lock().unwrap();
1140        metrics.range_cache_size += size;
1141    }
1142
1143    /// Increments the range cache hit counter.
1144    #[allow(dead_code)]
1145    pub(crate) fn inc_range_cache_hit(&self) {
1146        let mut metrics = self.0.metrics.lock().unwrap();
1147        metrics.range_cache_hit += 1;
1148    }
1149
1150    /// Increments the range cache miss counter.
1151    #[allow(dead_code)]
1152    pub(crate) fn inc_range_cache_miss(&self) {
1153        let mut metrics = self.0.metrics.lock().unwrap();
1154        metrics.range_cache_miss += 1;
1155    }
1156}
1157
1158impl fmt::Debug for PartitionMetrics {
1159    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1160        let metrics = self.0.metrics.lock().unwrap();
1161        write!(
1162            f,
1163            r#"{{"partition":{}, "metrics":{:?}}}"#,
1164            self.0.partition, metrics
1165        )
1166    }
1167}
1168
1169/// Metrics for the series distributor.
1170#[derive(Default)]
1171pub(crate) struct SeriesDistributorMetrics {
1172    /// Number of send timeout in SeriesScan.
1173    pub(crate) num_series_send_timeout: usize,
1174    /// Number of send full in SeriesScan.
1175    pub(crate) num_series_send_full: usize,
1176    /// Number of rows the series distributor scanned.
1177    pub(crate) num_rows: usize,
1178    /// Number of batches the series distributor scanned.
1179    pub(crate) num_batches: usize,
1180    /// Duration of the series distributor to scan.
1181    pub(crate) scan_cost: Duration,
1182    /// Duration of the series distributor to yield.
1183    pub(crate) yield_cost: Duration,
1184    /// Duration spent in divider operations.
1185    pub(crate) divider_cost: Duration,
1186}
1187
1188/// Scans memtable ranges at `index` using flat format that returns RecordBatch.
1189#[tracing::instrument(
1190    skip_all,
1191    fields(
1192        region_id = %stream_ctx.input.region_metadata().region_id,
1193        row_group_index = %index.index,
1194        source = "mem_flat"
1195    )
1196)]
1197pub(crate) fn scan_flat_mem_ranges(
1198    stream_ctx: Arc<StreamContext>,
1199    part_metrics: PartitionMetrics,
1200    index: RowGroupIndex,
1201    time_range: FileTimeRange,
1202) -> impl Stream<Item = Result<RecordBatch>> {
1203    try_stream! {
1204        let ranges = stream_ctx.input.build_mem_ranges(index);
1205        part_metrics.inc_num_mem_ranges(ranges.len());
1206        for range in ranges {
1207            let build_reader_start = Instant::now();
1208            let mem_scan_metrics = Some(MemScanMetrics::default());
1209            let mut iter = range.build_record_batch_iter(Some(time_range), mem_scan_metrics.clone())?;
1210            part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
1211
1212            while let Some(record_batch) = iter.next().transpose()? {
1213                yield record_batch;
1214            }
1215
1216            // Report the memtable scan metrics to partition metrics
1217            if let Some(ref metrics) = mem_scan_metrics {
1218                let data = metrics.data();
1219                part_metrics.report_mem_scan_metrics(&data);
1220            }
1221        }
1222    }
1223}
1224
1225/// Files with row count greater than this threshold can contribute to the estimation.
1226const SPLIT_ROW_THRESHOLD: u64 = DEFAULT_ROW_GROUP_SIZE as u64;
1227/// Number of series threshold for splitting batches.
1228const NUM_SERIES_THRESHOLD: u64 = 10240;
1229/// Minimum batch size after splitting. The batch size is less than 60 because a series may only have
1230/// 60 samples per hour.
1231const BATCH_SIZE_THRESHOLD: u64 = 50;
1232
1233/// Returns the estimated rows per batch after splitting if splitting flat record batches
1234/// may improve merge performance. Returns `None` if splitting is not beneficial.
1235pub(crate) fn should_split_flat_batches_for_merge(
1236    stream_ctx: &Arc<StreamContext>,
1237    range_meta: &RangeMeta,
1238) -> Option<usize> {
1239    // Number of files to split and scan.
1240    let mut num_files_to_split = 0;
1241    let mut num_mem_rows = 0;
1242    let mut num_mem_series = 0;
1243    // Total rows and series for estimating batch size after splitting.
1244    let mut total_rows: u64 = 0;
1245    let mut total_series: u64 = 0;
1246    // Checks each file range, returns early if any range is not splittable.
1247    // For mem ranges, we collect the total number of rows and series because the number of rows in a
1248    // mem range may be too small.
1249    for index in &range_meta.row_group_indices {
1250        if stream_ctx.is_mem_range_index(*index) {
1251            let memtable = &stream_ctx.input.memtables[index.index];
1252            // Is mem range
1253            let stats = memtable.stats();
1254            num_mem_rows += stats.num_rows();
1255            num_mem_series += stats.series_count();
1256        } else if stream_ctx.is_file_range_index(*index) {
1257            // This is a file range.
1258            let file_index = index.index - stream_ctx.input.num_memtables();
1259            let file = &stream_ctx.input.files[file_index];
1260            let file_meta = file.meta_ref();
1261            if file_meta.level == 0 {
1262                // Always split level 0 files.
1263                num_files_to_split += 1;
1264                continue;
1265            } else if file_meta.num_rows < SPLIT_ROW_THRESHOLD || file_meta.num_series == 0 {
1266                // If the file doesn't have enough rows, or the number of series is unavailable, skips it.
1267                continue;
1268            }
1269            debug_assert!(file_meta.num_rows > 0);
1270            if !can_split_series(file_meta.num_rows, file_meta.num_series) {
1271                // We can't split batches in a file.
1272                common_telemetry::trace!(
1273                    "Can't split series for file {}, level: {}, num_rows: {}, num_series: {}",
1274                    file_meta.file_id,
1275                    file_meta.level,
1276                    file_meta.num_rows,
1277                    file_meta.num_series,
1278                );
1279                return None;
1280            } else {
1281                num_files_to_split += 1;
1282                total_rows += file.meta_ref().num_rows;
1283                total_series += file.meta_ref().num_series;
1284            }
1285        }
1286        // Skips non-file and non-mem ranges.
1287    }
1288
1289    let should_split = if num_files_to_split > 0 {
1290        // We mainly consider file ranges because they have enough data for sampling.
1291        true
1292    } else if num_mem_series > 0
1293        && num_mem_rows > 0
1294        && can_split_series(num_mem_rows as u64, num_mem_series as u64)
1295    {
1296        total_rows += num_mem_rows as u64;
1297        total_series += num_mem_series as u64;
1298        true
1299    } else {
1300        false
1301    };
1302
1303    if !should_split {
1304        return None;
1305    }
1306
1307    // Estimate rows per batch after splitting.
1308    let estimated_batch_size = if total_series > 0 && total_rows > 0 {
1309        ((total_rows / total_series) as usize).clamp(1, DEFAULT_READ_BATCH_SIZE)
1310    } else {
1311        // No valid estimate available, use a conservative fallback.
1312        DEFAULT_READ_BATCH_SIZE / 4
1313    };
1314    Some(estimated_batch_size)
1315}
1316
1317/// Computes the channel size for parallel scan based on the estimated rows per batch.
1318/// The channel should buffer approximately `2 * DEFAULT_READ_BATCH_SIZE` rows.
1319pub(crate) fn compute_parallel_channel_size(estimated_rows_per_batch: usize) -> usize {
1320    let size = 2 * DEFAULT_READ_BATCH_SIZE / estimated_rows_per_batch.max(1);
1321    size.clamp(2, 64)
1322}
1323
1324/// Computes the average estimated rows per batch across multiple range readers.
1325pub(crate) fn compute_average_batch_size(
1326    estimated_rows_per_batch: impl IntoIterator<Item = usize>,
1327) -> usize {
1328    let mut total = 0usize;
1329    let mut count = 0usize;
1330    for size in estimated_rows_per_batch {
1331        total += size;
1332        count += 1;
1333    }
1334
1335    if count == 0 {
1336        return DEFAULT_READ_BATCH_SIZE;
1337    }
1338
1339    (total / count).clamp(1, DEFAULT_READ_BATCH_SIZE)
1340}
1341
1342fn can_split_series(num_rows: u64, num_series: u64) -> bool {
1343    if num_rows == 0 || num_series == 0 {
1344        return false;
1345    }
1346
1347    // It doesn't have too many series or it will have enough rows for each batch.
1348    num_series < NUM_SERIES_THRESHOLD || num_rows / num_series >= BATCH_SIZE_THRESHOLD
1349}
1350
1351#[cfg(test)]
1352mod split_tests {
1353    use std::sync::Arc;
1354
1355    use common_time::Timestamp;
1356    use smallvec::smallvec;
1357    use store_api::storage::FileId;
1358
1359    use super::*;
1360    use crate::read::flat_projection::FlatProjectionMapper;
1361    use crate::read::range::{RangeMeta, RowGroupIndex, SourceIndex};
1362    use crate::read::scan_region::{ScanInput, StreamContext};
1363    use crate::sst::file::FileHandle;
1364    use crate::test_util::memtable_util::metadata_with_primary_key;
1365    use crate::test_util::scheduler_util::SchedulerEnv;
1366    use crate::test_util::sst_util::sst_file_handle_with_file_id;
1367
1368    async fn new_stream_context_with_files(files: Vec<FileHandle>) -> StreamContext {
1369        let env = SchedulerEnv::new().await;
1370        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1371        let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap();
1372        let input = ScanInput::new(env.access_layer.clone(), mapper).with_files(files);
1373
1374        StreamContext {
1375            input,
1376            ranges: vec![],
1377            scan_fingerprint: None,
1378            query_start: std::time::Instant::now(),
1379        }
1380    }
1381
1382    fn single_file_range_meta() -> RangeMeta {
1383        RangeMeta {
1384            time_range: (
1385                Timestamp::new_millisecond(0),
1386                Timestamp::new_millisecond(1000),
1387            ),
1388            indices: smallvec![SourceIndex {
1389                index: 0,
1390                num_row_groups: 1,
1391            }],
1392            row_group_indices: smallvec![RowGroupIndex {
1393                index: 0,
1394                row_group_index: 0,
1395            }],
1396            num_rows: 1024,
1397        }
1398    }
1399
1400    #[tokio::test]
1401    async fn should_split_level_zero_file_even_when_series_stats_are_missing() {
1402        let mut file = sst_file_handle_with_file_id(FileId::random(), 0, 1000)
1403            .meta_ref()
1404            .clone();
1405        file.level = 0;
1406        file.num_rows = DEFAULT_ROW_GROUP_SIZE as u64;
1407        file.num_row_groups = 1;
1408        file.num_series = 0;
1409
1410        let file = FileHandle::new(file, crate::test_util::new_noop_file_purger());
1411        let stream_ctx = Arc::new(new_stream_context_with_files(vec![file]).await);
1412
1413        assert!(
1414            should_split_flat_batches_for_merge(&stream_ctx, &single_file_range_meta()).is_some()
1415        );
1416    }
1417
1418    #[test]
1419    fn can_split_series_returns_false_for_zero_inputs() {
1420        assert!(!can_split_series(0, 1));
1421        assert!(!can_split_series(1, 0));
1422        assert!(!can_split_series(0, 0));
1423    }
1424}
1425
1426/// Creates a new [ReaderFilterMetrics] with optional apply metrics initialized
1427/// based on the `explain_verbose` flag.
1428fn new_filter_metrics(explain_verbose: bool) -> ReaderFilterMetrics {
1429    if explain_verbose {
1430        ReaderFilterMetrics {
1431            inverted_index_apply_metrics: Some(InvertedIndexApplyMetrics::default()),
1432            bloom_filter_apply_metrics: Some(BloomFilterIndexApplyMetrics::default()),
1433            fulltext_index_apply_metrics: Some(FulltextIndexApplyMetrics::default()),
1434            ..Default::default()
1435        }
1436    } else {
1437        ReaderFilterMetrics::default()
1438    }
1439}
1440
1441/// Scans file ranges at `index` using flat reader that returns RecordBatch.
1442#[tracing::instrument(
1443    skip_all,
1444    fields(
1445        region_id = %stream_ctx.input.region_metadata().region_id,
1446        row_group_index = %index.index,
1447        source = read_type
1448    )
1449)]
1450pub(crate) async fn scan_flat_file_ranges(
1451    stream_ctx: Arc<StreamContext>,
1452    part_metrics: PartitionMetrics,
1453    index: RowGroupIndex,
1454    read_type: &'static str,
1455    partition_pruner: Arc<PartitionPruner>,
1456) -> Result<impl Stream<Item = Result<RecordBatch>>> {
1457    let mut reader_metrics = ReaderMetrics {
1458        filter_metrics: new_filter_metrics(part_metrics.explain_verbose()),
1459        ..Default::default()
1460    };
1461    let ranges = partition_pruner
1462        .build_file_ranges(index, &part_metrics, &mut reader_metrics)
1463        .await?;
1464    part_metrics.inc_num_file_ranges(ranges.len());
1465    part_metrics.merge_reader_metrics(&reader_metrics, None);
1466
1467    // Creates initial per-file metrics with build_part_cost.
1468    let init_per_file_metrics = if part_metrics.explain_verbose() {
1469        let file = stream_ctx.input.file_from_index(index);
1470        let file_id = file.file_id();
1471
1472        let mut map = HashMap::new();
1473        map.insert(
1474            file_id,
1475            FileScanMetrics {
1476                build_part_cost: reader_metrics.build_cost,
1477                ..Default::default()
1478            },
1479        );
1480        Some(map)
1481    } else {
1482        None
1483    };
1484
1485    Ok(build_flat_file_range_scan_stream(
1486        stream_ctx,
1487        part_metrics,
1488        read_type,
1489        ranges,
1490        init_per_file_metrics,
1491    ))
1492}
1493
1494/// Build the stream of scanning the input [`FileRange`]s using flat reader that returns RecordBatch.
1495#[tracing::instrument(
1496    skip_all,
1497    fields(read_type = read_type, range_count = ranges.len())
1498)]
1499pub fn build_flat_file_range_scan_stream(
1500    stream_ctx: Arc<StreamContext>,
1501    part_metrics: PartitionMetrics,
1502    read_type: &'static str,
1503    ranges: SmallVec<[FileRange; 2]>,
1504    mut per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
1505) -> impl Stream<Item = Result<RecordBatch>> {
1506    try_stream! {
1507        let fetch_metrics = if part_metrics.explain_verbose() {
1508            Some(Arc::new(ParquetFetchMetrics::default()))
1509        } else {
1510            None
1511        };
1512        let reader_metrics = &mut ReaderMetrics {
1513            fetch_metrics: fetch_metrics.clone(),
1514            ..Default::default()
1515        };
1516        for range in ranges {
1517            let build_reader_start = Instant::now();
1518            let Some(mut reader) = range
1519                .flat_reader(
1520                    stream_ctx.input.series_row_selector,
1521                    fetch_metrics.as_deref(),
1522                )
1523                .await?
1524            else {
1525                continue;
1526            };
1527            let build_cost = build_reader_start.elapsed();
1528            part_metrics.inc_build_reader_cost(build_cost);
1529
1530            let may_compat = range.compat_batch();
1531
1532            let mapper = range.compaction_projection_mapper();
1533            while let Some(record_batch) = reader.next_batch().await? {
1534                let record_batch = if let Some(mapper) = mapper {
1535                    let batch = mapper.project(record_batch)?;
1536                    batch
1537                } else {
1538                    record_batch
1539                };
1540
1541                if let Some(flat_compat) = may_compat {
1542                    let batch = flat_compat.compat(record_batch)?;
1543                    yield batch;
1544                } else {
1545                    yield record_batch;
1546                }
1547            }
1548
1549            let prune_metrics = reader.metrics();
1550
1551            // Update per-file metrics if tracking is enabled
1552            if let Some(file_metrics_map) = per_file_metrics.as_mut() {
1553                let file_id = range.file_handle().file_id();
1554                let file_metrics = file_metrics_map
1555                    .entry(file_id)
1556                    .or_insert_with(FileScanMetrics::default);
1557
1558                file_metrics.num_ranges += 1;
1559                file_metrics.num_rows += prune_metrics.num_rows;
1560                file_metrics.build_reader_cost += build_cost;
1561                file_metrics.scan_cost += prune_metrics.scan_cost;
1562            }
1563
1564            reader_metrics.merge_from(&prune_metrics);
1565        }
1566
1567        // Reports metrics.
1568        reader_metrics.observe_rows(read_type);
1569        reader_metrics.filter_metrics.observe();
1570        part_metrics.merge_reader_metrics(reader_metrics, per_file_metrics.as_ref());
1571    }
1572}
1573
1574/// Build the stream of scanning the extension range in flat format denoted by the [`RowGroupIndex`].
1575#[cfg(feature = "enterprise")]
1576pub(crate) async fn scan_flat_extension_range(
1577    context: Arc<StreamContext>,
1578    index: RowGroupIndex,
1579    partition_metrics: PartitionMetrics,
1580    options: crate::extension::ExtensionRangeReadOptions,
1581) -> Result<BoxedRecordBatchStream> {
1582    use snafu::ResultExt;
1583
1584    let range = context.input.extension_range(index.index);
1585    let reader = range.flat_reader(context.as_ref(), options);
1586    let stream = reader
1587        .read(context, partition_metrics, index)
1588        .await
1589        .context(crate::error::ScanExternalRangeSnafu)?;
1590    Ok(stream)
1591}
1592
1593pub(crate) async fn maybe_scan_flat_other_ranges(
1594    context: &Arc<StreamContext>,
1595    index: RowGroupIndex,
1596    metrics: &PartitionMetrics,
1597    pre_filter_mode: PreFilterMode,
1598) -> Result<BoxedRecordBatchStream> {
1599    #[cfg(feature = "enterprise")]
1600    {
1601        let options = crate::extension::ExtensionRangeReadOptions { pre_filter_mode };
1602        scan_flat_extension_range(context.clone(), index, metrics.clone(), options).await
1603    }
1604
1605    #[cfg(not(feature = "enterprise"))]
1606    {
1607        let _ = context;
1608        let _ = index;
1609        let _ = metrics;
1610        let _ = pre_filter_mode;
1611
1612        crate::error::UnexpectedSnafu {
1613            reason: "no other ranges scannable in flat format",
1614        }
1615        .fail()
1616    }
1617}
1618
1619/// A stream wrapper that splits record batches from an inner stream.
1620pub(crate) struct SplitRecordBatchStream<S> {
1621    /// The inner stream that yields record batches.
1622    inner: S,
1623    /// Buffer for split batches.
1624    batches: VecDeque<RecordBatch>,
1625}
1626
1627impl<S> SplitRecordBatchStream<S> {
1628    /// Creates a new splitting stream wrapper.
1629    pub(crate) fn new(inner: S) -> Self {
1630        Self {
1631            inner,
1632            batches: VecDeque::new(),
1633        }
1634    }
1635}
1636
1637impl<S> Stream for SplitRecordBatchStream<S>
1638where
1639    S: Stream<Item = Result<RecordBatch>> + Unpin,
1640{
1641    type Item = Result<RecordBatch>;
1642
1643    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1644        loop {
1645            // First, check if we have buffered split batches
1646            if let Some(batch) = self.batches.pop_front() {
1647                return Poll::Ready(Some(Ok(batch)));
1648            }
1649
1650            // Poll the inner stream for the next batch
1651            let record_batch = match futures::ready!(Pin::new(&mut self.inner).poll_next(cx)) {
1652                Some(Ok(batch)) => batch,
1653                Some(Err(e)) => return Poll::Ready(Some(Err(e))),
1654                None => return Poll::Ready(None),
1655            };
1656
1657            // Split the batch and buffer the results
1658            split_record_batch(record_batch, &mut self.batches);
1659            // Continue the loop to return the first split batch
1660        }
1661    }
1662}
1663
1664/// Splits the batch by timestamps.
1665///
1666/// # Panics
1667/// Panics if the timestamp array is invalid.
1668pub(crate) fn split_record_batch(record_batch: RecordBatch, batches: &mut VecDeque<RecordBatch>) {
1669    let batch_rows = record_batch.num_rows();
1670    if batch_rows == 0 {
1671        return;
1672    }
1673    if batch_rows < 2 {
1674        batches.push_back(record_batch);
1675        return;
1676    }
1677
1678    let time_index_pos = time_index_column_index(record_batch.num_columns());
1679    let timestamps = record_batch.column(time_index_pos);
1680    let (ts_values, _unit) = timestamp_array_to_primitive(timestamps).unwrap();
1681    let mut offsets = Vec::with_capacity(16);
1682    offsets.push(0);
1683    let values = ts_values.values();
1684    for (i, &value) in values.iter().take(batch_rows - 1).enumerate() {
1685        if value > values[i + 1] {
1686            offsets.push(i + 1);
1687        }
1688    }
1689    offsets.push(values.len());
1690
1691    // Splits the batch by offsets.
1692    for (i, &start) in offsets[..offsets.len() - 1].iter().enumerate() {
1693        let end = offsets[i + 1];
1694        let rows_in_batch = end - start;
1695        batches.push_back(record_batch.slice(start, rows_in_batch));
1696    }
1697}
1698
1699#[cfg(test)]
1700mod tests {
1701    use std::sync::Arc;
1702    use std::time::Instant;
1703
1704    use common_time::Timestamp;
1705    use smallvec::{SmallVec, smallvec};
1706    use store_api::storage::RegionId;
1707
1708    use super::*;
1709    use crate::cache::CacheStrategy;
1710    use crate::memtable::{
1711        BoxedBatchIterator, BoxedRecordBatchIterator, IterBuilder, MemtableRange,
1712        MemtableRangeContext, MemtableStats,
1713    };
1714    use crate::read::flat_projection::FlatProjectionMapper;
1715    use crate::read::range::{MemRangeBuilder, SourceIndex};
1716    use crate::read::scan_region::ScanInput;
1717    use crate::sst::file::{FileHandle, FileMeta};
1718    use crate::sst::file_purger::NoopFilePurger;
1719    use crate::test_util::memtable_util::metadata_for_test;
1720    use crate::test_util::scheduler_util::SchedulerEnv;
1721
1722    struct EmptyIterBuilder;
1723
1724    impl IterBuilder for EmptyIterBuilder {
1725        fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
1726            Ok(Box::new(std::iter::empty()))
1727        }
1728
1729        fn is_record_batch(&self) -> bool {
1730            true
1731        }
1732
1733        fn build_record_batch(
1734            &self,
1735            _time_range: Option<(Timestamp, Timestamp)>,
1736            _metrics: Option<MemScanMetrics>,
1737        ) -> Result<BoxedRecordBatchIterator> {
1738            Ok(Box::new(std::iter::empty()))
1739        }
1740    }
1741
1742    async fn new_test_stream_ctx(
1743        files: Vec<FileHandle>,
1744        memtables: Vec<MemRangeBuilder>,
1745    ) -> Arc<StreamContext> {
1746        let env = SchedulerEnv::new().await;
1747        let metadata = metadata_for_test();
1748        let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap();
1749        let input = ScanInput::new(env.access_layer.clone(), mapper)
1750            .with_cache(CacheStrategy::Disabled)
1751            .with_memtables(memtables)
1752            .with_files(files);
1753
1754        Arc::new(StreamContext {
1755            input,
1756            ranges: Vec::new(),
1757            scan_fingerprint: None,
1758            query_start: Instant::now(),
1759        })
1760    }
1761
1762    fn new_test_file(num_rows: u64, num_series: u64) -> FileHandle {
1763        let meta = FileMeta {
1764            region_id: RegionId::new(123, 456),
1765            file_id: Default::default(),
1766            level: 1,
1767            time_range: (
1768                Timestamp::new_millisecond(0),
1769                Timestamp::new_millisecond(1000),
1770            ),
1771            num_rows,
1772            num_series,
1773            ..Default::default()
1774        };
1775        FileHandle::new(meta, Arc::new(NoopFilePurger))
1776    }
1777
1778    fn new_test_memtable(num_rows: usize, series_count: usize) -> MemRangeBuilder {
1779        let context = Arc::new(MemtableRangeContext::new(
1780            0,
1781            Box::new(EmptyIterBuilder),
1782            Default::default(),
1783        ));
1784        let stats = MemtableStats {
1785            time_range: Some((
1786                Timestamp::new_millisecond(0),
1787                Timestamp::new_millisecond(1000),
1788            )),
1789            num_rows,
1790            num_ranges: 1,
1791            series_count,
1792            ..Default::default()
1793        };
1794        let range = MemtableRange::new(context, stats.clone());
1795        MemRangeBuilder::new(range, stats)
1796    }
1797
1798    fn new_test_range_meta(row_group_indices: SmallVec<[RowGroupIndex; 2]>) -> RangeMeta {
1799        let indices = row_group_indices
1800            .iter()
1801            .map(|row_group_index| SourceIndex {
1802                index: row_group_index.index,
1803                num_row_groups: 1,
1804            })
1805            .collect();
1806
1807        RangeMeta {
1808            time_range: (
1809                Timestamp::new_millisecond(0),
1810                Timestamp::new_millisecond(1000),
1811            ),
1812            indices,
1813            row_group_indices,
1814            num_rows: 0,
1815        }
1816    }
1817
1818    #[tokio::test]
1819    async fn test_should_split_flat_batches_for_merge_uses_splittable_file_rows_per_series() {
1820        let num_rows = SPLIT_ROW_THRESHOLD * 2;
1821        let num_series = (num_rows / 100).max(1);
1822        let stream_ctx =
1823            new_test_stream_ctx(vec![new_test_file(num_rows, num_series)], vec![]).await;
1824        let range_meta = new_test_range_meta(smallvec![RowGroupIndex {
1825            index: 0,
1826            row_group_index: 0,
1827        }]);
1828
1829        assert_eq!(
1830            Some((num_rows / num_series) as usize),
1831            should_split_flat_batches_for_merge(&stream_ctx, &range_meta)
1832        );
1833    }
1834
1835    #[tokio::test]
1836    async fn test_should_split_flat_batches_for_merge_skips_small_or_unknown_series_files() {
1837        let stream_ctx = new_test_stream_ctx(
1838            vec![
1839                new_test_file(SPLIT_ROW_THRESHOLD.saturating_sub(1), 1),
1840                new_test_file(SPLIT_ROW_THRESHOLD * 2, 0),
1841            ],
1842            vec![],
1843        )
1844        .await;
1845        let range_meta = new_test_range_meta(smallvec![
1846            RowGroupIndex {
1847                index: 0,
1848                row_group_index: 0,
1849            },
1850            RowGroupIndex {
1851                index: 1,
1852                row_group_index: 0,
1853            }
1854        ]);
1855
1856        assert_eq!(
1857            None,
1858            should_split_flat_batches_for_merge(&stream_ctx, &range_meta)
1859        );
1860    }
1861
1862    #[tokio::test]
1863    async fn test_should_split_flat_batches_for_merge_returns_none_for_unsplittable_file() {
1864        let num_series =
1865            (SPLIT_ROW_THRESHOLD / (BATCH_SIZE_THRESHOLD - 1)).max(NUM_SERIES_THRESHOLD) + 1;
1866        let stream_ctx =
1867            new_test_stream_ctx(vec![new_test_file(SPLIT_ROW_THRESHOLD, num_series)], vec![]).await;
1868        let range_meta = new_test_range_meta(smallvec![RowGroupIndex {
1869            index: 0,
1870            row_group_index: 0,
1871        }]);
1872
1873        assert_eq!(
1874            None,
1875            should_split_flat_batches_for_merge(&stream_ctx, &range_meta)
1876        );
1877    }
1878
1879    #[tokio::test]
1880    async fn test_should_split_flat_batches_for_merge_falls_back_to_memtables() {
1881        let stream_ctx = new_test_stream_ctx(vec![], vec![new_test_memtable(5_000, 100)]).await;
1882        let range_meta = new_test_range_meta(smallvec![RowGroupIndex {
1883            index: 0,
1884            row_group_index: 0,
1885        }]);
1886
1887        assert_eq!(
1888            Some(50),
1889            should_split_flat_batches_for_merge(&stream_ctx, &range_meta)
1890        );
1891    }
1892
1893    #[tokio::test]
1894    async fn test_should_split_flat_batches_for_merge_clamps_estimate() {
1895        let stream_ctx =
1896            new_test_stream_ctx(vec![new_test_file(SPLIT_ROW_THRESHOLD * 2, 1)], vec![]).await;
1897        let range_meta = new_test_range_meta(smallvec![RowGroupIndex {
1898            index: 0,
1899            row_group_index: 0,
1900        }]);
1901
1902        assert_eq!(
1903            Some(DEFAULT_READ_BATCH_SIZE),
1904            should_split_flat_batches_for_merge(&stream_ctx, &range_meta)
1905        );
1906    }
1907
1908    #[test]
1909    fn test_compute_parallel_channel_size_clamps_to_max_for_small_batches() {
1910        assert_eq!(64, compute_parallel_channel_size(0));
1911        assert_eq!(64, compute_parallel_channel_size(1));
1912    }
1913
1914    #[test]
1915    fn test_compute_parallel_channel_size_returns_expected_mid_range_size() {
1916        assert_eq!(
1917            4,
1918            compute_parallel_channel_size(DEFAULT_READ_BATCH_SIZE / 2)
1919        );
1920    }
1921
1922    #[test]
1923    fn test_compute_parallel_channel_size_clamps_to_min_for_large_batches() {
1924        assert_eq!(2, compute_parallel_channel_size(DEFAULT_READ_BATCH_SIZE));
1925        assert_eq!(
1926            2,
1927            compute_parallel_channel_size(DEFAULT_READ_BATCH_SIZE * 2)
1928        );
1929    }
1930
1931    #[test]
1932    fn test_compute_average_batch_size_uses_arithmetic_mean() {
1933        assert_eq!(24, compute_average_batch_size([16, 24, 32]));
1934    }
1935
1936    #[test]
1937    fn test_compute_average_batch_size_clamps_values() {
1938        assert_eq!(
1939            DEFAULT_READ_BATCH_SIZE,
1940            compute_average_batch_size([DEFAULT_READ_BATCH_SIZE, DEFAULT_READ_BATCH_SIZE * 2])
1941        );
1942        assert_eq!(1, compute_average_batch_size([0, 1]));
1943    }
1944
1945    #[test]
1946    fn test_compute_average_batch_size_falls_back_when_empty() {
1947        assert_eq!(
1948            DEFAULT_READ_BATCH_SIZE,
1949            compute_average_batch_size(std::iter::empty())
1950        );
1951    }
1952}