Skip to main content

mito2/read/
scan_util.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities for scanners.
16
17use std::collections::{BinaryHeap, HashMap, VecDeque};
18use std::fmt;
19use std::pin::Pin;
20use std::sync::{Arc, Mutex};
21use std::task::{Context, Poll};
22use std::time::{Duration, Instant};
23
24use async_stream::try_stream;
25use common_telemetry::tracing;
26use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time};
27use datatypes::arrow::record_batch::RecordBatch;
28use datatypes::timestamp::timestamp_array_to_primitive;
29use futures::Stream;
30use prometheus::IntGauge;
31use smallvec::SmallVec;
32use snafu::OptionExt;
33use store_api::storage::RegionId;
34
35use crate::error::{Result, UnexpectedSnafu};
36use crate::memtable::MemScanMetrics;
37use crate::metrics::{
38    IN_PROGRESS_SCAN, PRECISE_FILTER_ROWS_TOTAL, READ_BATCHES_RETURN, READ_ROW_GROUPS_TOTAL,
39    READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_RETURN, READ_STAGE_ELAPSED,
40};
41use crate::read::dedup::{DedupMetrics, DedupMetricsReport};
42use crate::read::flat_merge::{MergeMetrics, MergeMetricsReport};
43use crate::read::pruner::PartitionPruner;
44use crate::read::range::{RangeMeta, RowGroupIndex};
45use crate::read::scan_region::StreamContext;
46use crate::read::{BoxedRecordBatchStream, ScannerMetrics};
47use crate::sst::file::{FileTimeRange, RegionFileId};
48use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplyMetrics;
49use crate::sst::index::fulltext_index::applier::FulltextIndexApplyMetrics;
50use crate::sst::index::inverted_index::applier::InvertedIndexApplyMetrics;
51use crate::sst::parquet::file_range::FileRange;
52use crate::sst::parquet::flat_format::time_index_column_index;
53use crate::sst::parquet::reader::{MetadataCacheMetrics, ReaderFilterMetrics, ReaderMetrics};
54use crate::sst::parquet::row_group::ParquetFetchMetrics;
55use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE};
56
57/// Per-file scan metrics.
58#[derive(Default, Clone)]
59pub struct FileScanMetrics {
60    /// Number of ranges (row groups) read from this file.
61    pub num_ranges: usize,
62    /// Number of rows read from this file.
63    pub num_rows: usize,
64    /// Time spent building file ranges/parts (file-level preparation).
65    pub build_part_cost: Duration,
66    /// Time spent building readers for this file (accumulated across all ranges).
67    pub build_reader_cost: Duration,
68    /// Time spent scanning this file (accumulated across all ranges).
69    pub scan_cost: Duration,
70}
71
72impl fmt::Debug for FileScanMetrics {
73    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
74        write!(f, "{{\"build_part_cost\":\"{:?}\"", self.build_part_cost)?;
75
76        if self.num_ranges > 0 {
77            write!(f, ", \"num_ranges\":{}", self.num_ranges)?;
78        }
79        if self.num_rows > 0 {
80            write!(f, ", \"num_rows\":{}", self.num_rows)?;
81        }
82        if !self.build_reader_cost.is_zero() {
83            write!(
84                f,
85                ", \"build_reader_cost\":\"{:?}\"",
86                self.build_reader_cost
87            )?;
88        }
89        if !self.scan_cost.is_zero() {
90            write!(f, ", \"scan_cost\":\"{:?}\"", self.scan_cost)?;
91        }
92
93        write!(f, "}}")
94    }
95}
96
97impl FileScanMetrics {
98    /// Merges another FileMetrics into this one.
99    pub(crate) fn merge_from(&mut self, other: &FileScanMetrics) {
100        self.num_ranges += other.num_ranges;
101        self.num_rows += other.num_rows;
102        self.build_part_cost += other.build_part_cost;
103        self.build_reader_cost += other.build_reader_cost;
104        self.scan_cost += other.scan_cost;
105    }
106}
107
108/// Verbose scan metrics for a partition.
109#[derive(Default)]
110pub(crate) struct ScanMetricsSet {
111    /// Duration to prepare the scan task.
112    prepare_scan_cost: Duration,
113    /// Duration to build the (merge) reader.
114    build_reader_cost: Duration,
115    /// Duration to scan data.
116    scan_cost: Duration,
117    /// Duration while waiting for `yield`.
118    yield_cost: Duration,
119    /// Duration to convert [`Batch`]es.
120    convert_cost: Option<Time>,
121    /// Duration of the scan.
122    total_cost: Duration,
123    /// Number of rows returned.
124    num_rows: usize,
125    /// Number of batches returned.
126    num_batches: usize,
127    /// Number of mem ranges scanned.
128    num_mem_ranges: usize,
129    /// Number of file ranges scanned.
130    num_file_ranges: usize,
131
132    // Memtable related metrics:
133    /// Duration to scan memtables.
134    mem_scan_cost: Duration,
135    /// Number of rows read from memtables.
136    mem_rows: usize,
137    /// Number of batches read from memtables.
138    mem_batches: usize,
139    /// Number of series read from memtables.
140    mem_series: usize,
141    /// Duration of prefilter in memtable scan.
142    mem_prefilter_cost: Duration,
143    /// Number of rows filtered by prefilter in memtable scan.
144    mem_prefilter_rows_filtered: usize,
145
146    // SST related metrics:
147    /// Duration to build file ranges.
148    build_parts_cost: Duration,
149    /// Duration to scan SST files.
150    sst_scan_cost: Duration,
151    /// Number of row groups before filtering.
152    rg_total: usize,
153    /// Number of row groups filtered by fulltext index.
154    rg_fulltext_filtered: usize,
155    /// Number of row groups filtered by inverted index.
156    rg_inverted_filtered: usize,
157    /// Number of row groups filtered by min-max index.
158    rg_minmax_filtered: usize,
159    /// Number of row groups filtered by bloom filter index.
160    rg_bloom_filtered: usize,
161    /// Number of row groups filtered by vector index.
162    rg_vector_filtered: usize,
163    /// Number of rows in row group before filtering.
164    rows_before_filter: usize,
165    /// Number of rows in row group filtered by fulltext index.
166    rows_fulltext_filtered: usize,
167    /// Number of rows in row group filtered by inverted index.
168    rows_inverted_filtered: usize,
169    /// Number of rows in row group filtered by bloom filter index.
170    rows_bloom_filtered: usize,
171    /// Number of rows filtered by vector index.
172    rows_vector_filtered: usize,
173    /// Number of rows selected by vector index.
174    rows_vector_selected: usize,
175    /// Number of rows filtered by precise filter.
176    rows_precise_filtered: usize,
177    /// Number of index result cache hits for fulltext index.
178    fulltext_index_cache_hit: usize,
179    /// Number of index result cache misses for fulltext index.
180    fulltext_index_cache_miss: usize,
181    /// Number of index result cache hits for inverted index.
182    inverted_index_cache_hit: usize,
183    /// Number of index result cache misses for inverted index.
184    inverted_index_cache_miss: usize,
185    /// Number of index result cache hits for bloom filter index.
186    bloom_filter_cache_hit: usize,
187    /// Number of index result cache misses for bloom filter index.
188    bloom_filter_cache_miss: usize,
189    /// Number of index result cache hits for minmax pruning.
190    minmax_cache_hit: usize,
191    /// Number of index result cache misses for minmax pruning.
192    minmax_cache_miss: usize,
193    /// Number of pruner builder cache hits.
194    pruner_cache_hit: usize,
195    /// Number of pruner builder cache misses.
196    pruner_cache_miss: usize,
197    /// Duration spent waiting for pruner to build file ranges.
198    pruner_prune_cost: Duration,
199    /// Number of record batches read from SST.
200    num_sst_record_batches: usize,
201    /// Number of batches decoded from SST.
202    num_sst_batches: usize,
203    /// Number of rows read from SST.
204    num_sst_rows: usize,
205
206    /// Elapsed time before the first poll operation.
207    first_poll: Duration,
208
209    /// Number of send timeout in SeriesScan.
210    num_series_send_timeout: usize,
211    /// Number of send full in SeriesScan.
212    num_series_send_full: usize,
213    /// Number of rows the series distributor scanned.
214    num_distributor_rows: usize,
215    /// Number of batches the series distributor scanned.
216    num_distributor_batches: usize,
217    /// Duration of the series distributor to scan.
218    distributor_scan_cost: Duration,
219    /// Duration of the series distributor to yield.
220    distributor_yield_cost: Duration,
221    /// Duration spent in divider operations.
222    distributor_divider_cost: Duration,
223
224    /// Merge metrics.
225    merge_metrics: MergeMetrics,
226    /// Dedup metrics.
227    dedup_metrics: DedupMetrics,
228
229    /// The stream reached EOF
230    stream_eof: bool,
231
232    // Optional verbose metrics:
233    /// Inverted index apply metrics.
234    inverted_index_apply_metrics: Option<InvertedIndexApplyMetrics>,
235    /// Bloom filter index apply metrics.
236    bloom_filter_apply_metrics: Option<BloomFilterIndexApplyMetrics>,
237    /// Fulltext index apply metrics.
238    fulltext_index_apply_metrics: Option<FulltextIndexApplyMetrics>,
239    /// Parquet fetch metrics.
240    fetch_metrics: Option<ParquetFetchMetrics>,
241    /// Metadata cache metrics.
242    metadata_cache_metrics: Option<MetadataCacheMetrics>,
243    /// Per-file scan metrics, only populated when explain_verbose is true.
244    per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
245
246    /// Current memory usage for file range builders.
247    build_ranges_mem_size: isize,
248    /// Peak memory usage for file range builders.
249    build_ranges_peak_mem_size: isize,
250    /// Current number of file range builders.
251    num_range_builders: isize,
252    /// Peak number of file range builders.
253    num_peak_range_builders: isize,
254    /// Total bytes added to the range cache during this scan.
255    range_cache_size: usize,
256    /// Number of range cache hits during this scan.
257    range_cache_hit: usize,
258    /// Number of range cache misses during this scan.
259    range_cache_miss: usize,
260}
261
262/// Wrapper for file metrics that compares by total cost in reverse order.
263/// This allows using BinaryHeap as a min-heap for efficient top-K selection.
264struct CompareCostReverse<'a> {
265    total_cost: Duration,
266    file_id: RegionFileId,
267    metrics: &'a FileScanMetrics,
268}
269
270impl Ord for CompareCostReverse<'_> {
271    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
272        // Reverse comparison: smaller costs are "greater"
273        other.total_cost.cmp(&self.total_cost)
274    }
275}
276
277impl PartialOrd for CompareCostReverse<'_> {
278    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
279        Some(self.cmp(other))
280    }
281}
282
283impl Eq for CompareCostReverse<'_> {}
284
285impl PartialEq for CompareCostReverse<'_> {
286    fn eq(&self, other: &Self) -> bool {
287        self.total_cost == other.total_cost
288    }
289}
290
291impl fmt::Debug for ScanMetricsSet {
292    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
293        let ScanMetricsSet {
294            prepare_scan_cost,
295            build_reader_cost,
296            scan_cost,
297            yield_cost,
298            convert_cost,
299            total_cost,
300            num_rows,
301            num_batches,
302            num_mem_ranges,
303            num_file_ranges,
304            build_parts_cost,
305            sst_scan_cost,
306            rg_total,
307            rg_fulltext_filtered,
308            rg_inverted_filtered,
309            rg_minmax_filtered,
310            rg_bloom_filtered,
311            rg_vector_filtered,
312            rows_before_filter,
313            rows_fulltext_filtered,
314            rows_inverted_filtered,
315            rows_bloom_filtered,
316            rows_vector_filtered,
317            rows_vector_selected,
318            rows_precise_filtered,
319            fulltext_index_cache_hit,
320            fulltext_index_cache_miss,
321            inverted_index_cache_hit,
322            inverted_index_cache_miss,
323            bloom_filter_cache_hit,
324            bloom_filter_cache_miss,
325            minmax_cache_hit,
326            minmax_cache_miss,
327            pruner_cache_hit,
328            pruner_cache_miss,
329            pruner_prune_cost,
330            num_sst_record_batches,
331            num_sst_batches,
332            num_sst_rows,
333            first_poll,
334            num_series_send_timeout,
335            num_series_send_full,
336            num_distributor_rows,
337            num_distributor_batches,
338            distributor_scan_cost,
339            distributor_yield_cost,
340            distributor_divider_cost,
341            merge_metrics,
342            dedup_metrics,
343            stream_eof,
344            mem_scan_cost,
345            mem_rows,
346            mem_batches,
347            mem_series,
348            mem_prefilter_cost,
349            mem_prefilter_rows_filtered,
350            inverted_index_apply_metrics,
351            bloom_filter_apply_metrics,
352            fulltext_index_apply_metrics,
353            fetch_metrics,
354            metadata_cache_metrics,
355            per_file_metrics,
356            build_ranges_mem_size: _,
357            build_ranges_peak_mem_size,
358            num_range_builders: _,
359            num_peak_range_builders,
360            range_cache_size,
361            range_cache_hit,
362            range_cache_miss,
363        } = self;
364
365        // Write core metrics
366        write!(
367            f,
368            "{{\"prepare_scan_cost\":\"{prepare_scan_cost:?}\", \
369            \"build_reader_cost\":\"{build_reader_cost:?}\", \
370            \"scan_cost\":\"{scan_cost:?}\", \
371            \"yield_cost\":\"{yield_cost:?}\", \
372            \"total_cost\":\"{total_cost:?}\", \
373            \"num_rows\":{num_rows}, \
374            \"num_batches\":{num_batches}, \
375            \"num_mem_ranges\":{num_mem_ranges}, \
376            \"num_file_ranges\":{num_file_ranges}, \
377            \"build_parts_cost\":\"{build_parts_cost:?}\", \
378            \"sst_scan_cost\":\"{sst_scan_cost:?}\", \
379            \"rg_total\":{rg_total}, \
380            \"rows_before_filter\":{rows_before_filter}, \
381            \"num_sst_record_batches\":{num_sst_record_batches}, \
382            \"num_sst_batches\":{num_sst_batches}, \
383            \"num_sst_rows\":{num_sst_rows}, \
384            \"first_poll\":\"{first_poll:?}\""
385        )?;
386
387        // Write convert_cost if present
388        if let Some(time) = convert_cost {
389            let duration = Duration::from_nanos(time.value() as u64);
390            write!(f, ", \"convert_cost\":\"{duration:?}\"")?;
391        }
392
393        // Write non-zero filter counters
394        if *rg_fulltext_filtered > 0 {
395            write!(f, ", \"rg_fulltext_filtered\":{rg_fulltext_filtered}")?;
396        }
397        if *rg_inverted_filtered > 0 {
398            write!(f, ", \"rg_inverted_filtered\":{rg_inverted_filtered}")?;
399        }
400        if *rg_minmax_filtered > 0 {
401            write!(f, ", \"rg_minmax_filtered\":{rg_minmax_filtered}")?;
402        }
403        if *rg_bloom_filtered > 0 {
404            write!(f, ", \"rg_bloom_filtered\":{rg_bloom_filtered}")?;
405        }
406        if *rg_vector_filtered > 0 {
407            write!(f, ", \"rg_vector_filtered\":{rg_vector_filtered}")?;
408        }
409        if *rows_fulltext_filtered > 0 {
410            write!(f, ", \"rows_fulltext_filtered\":{rows_fulltext_filtered}")?;
411        }
412        if *rows_inverted_filtered > 0 {
413            write!(f, ", \"rows_inverted_filtered\":{rows_inverted_filtered}")?;
414        }
415        if *rows_bloom_filtered > 0 {
416            write!(f, ", \"rows_bloom_filtered\":{rows_bloom_filtered}")?;
417        }
418        if *rows_vector_filtered > 0 {
419            write!(f, ", \"rows_vector_filtered\":{rows_vector_filtered}")?;
420        }
421        if *rows_vector_selected > 0 {
422            write!(f, ", \"rows_vector_selected\":{rows_vector_selected}")?;
423        }
424        if *rows_precise_filtered > 0 {
425            write!(f, ", \"rows_precise_filtered\":{rows_precise_filtered}")?;
426        }
427        if *fulltext_index_cache_hit > 0 {
428            write!(
429                f,
430                ", \"fulltext_index_cache_hit\":{fulltext_index_cache_hit}"
431            )?;
432        }
433        if *fulltext_index_cache_miss > 0 {
434            write!(
435                f,
436                ", \"fulltext_index_cache_miss\":{fulltext_index_cache_miss}"
437            )?;
438        }
439        if *inverted_index_cache_hit > 0 {
440            write!(
441                f,
442                ", \"inverted_index_cache_hit\":{inverted_index_cache_hit}"
443            )?;
444        }
445        if *inverted_index_cache_miss > 0 {
446            write!(
447                f,
448                ", \"inverted_index_cache_miss\":{inverted_index_cache_miss}"
449            )?;
450        }
451        if *bloom_filter_cache_hit > 0 {
452            write!(f, ", \"bloom_filter_cache_hit\":{bloom_filter_cache_hit}")?;
453        }
454        if *bloom_filter_cache_miss > 0 {
455            write!(f, ", \"bloom_filter_cache_miss\":{bloom_filter_cache_miss}")?;
456        }
457        if *minmax_cache_hit > 0 {
458            write!(f, ", \"minmax_cache_hit\":{minmax_cache_hit}")?;
459        }
460        if *minmax_cache_miss > 0 {
461            write!(f, ", \"minmax_cache_miss\":{minmax_cache_miss}")?;
462        }
463        if *pruner_cache_hit > 0 {
464            write!(f, ", \"pruner_cache_hit\":{pruner_cache_hit}")?;
465        }
466        if *pruner_cache_miss > 0 {
467            write!(f, ", \"pruner_cache_miss\":{pruner_cache_miss}")?;
468        }
469        if !pruner_prune_cost.is_zero() {
470            write!(f, ", \"pruner_prune_cost\":\"{pruner_prune_cost:?}\"")?;
471        }
472
473        // Write non-zero distributor metrics
474        if *num_series_send_timeout > 0 {
475            write!(f, ", \"num_series_send_timeout\":{num_series_send_timeout}")?;
476        }
477        if *num_series_send_full > 0 {
478            write!(f, ", \"num_series_send_full\":{num_series_send_full}")?;
479        }
480        if *num_distributor_rows > 0 {
481            write!(f, ", \"num_distributor_rows\":{num_distributor_rows}")?;
482        }
483        if *num_distributor_batches > 0 {
484            write!(f, ", \"num_distributor_batches\":{num_distributor_batches}")?;
485        }
486        if !distributor_scan_cost.is_zero() {
487            write!(
488                f,
489                ", \"distributor_scan_cost\":\"{distributor_scan_cost:?}\""
490            )?;
491        }
492        if !distributor_yield_cost.is_zero() {
493            write!(
494                f,
495                ", \"distributor_yield_cost\":\"{distributor_yield_cost:?}\""
496            )?;
497        }
498        if !distributor_divider_cost.is_zero() {
499            write!(
500                f,
501                ", \"distributor_divider_cost\":\"{distributor_divider_cost:?}\""
502            )?;
503        }
504
505        // Write non-zero memtable metrics
506        if *mem_rows > 0 {
507            write!(f, ", \"mem_rows\":{mem_rows}")?;
508        }
509        if *mem_batches > 0 {
510            write!(f, ", \"mem_batches\":{mem_batches}")?;
511        }
512        if *mem_series > 0 {
513            write!(f, ", \"mem_series\":{mem_series}")?;
514        }
515        if !mem_scan_cost.is_zero() {
516            write!(f, ", \"mem_scan_cost\":\"{mem_scan_cost:?}\"")?;
517        }
518        if !mem_prefilter_cost.is_zero() {
519            write!(f, ", \"mem_prefilter_cost\":\"{mem_prefilter_cost:?}\"")?;
520        }
521        if *mem_prefilter_rows_filtered > 0 {
522            write!(
523                f,
524                ", \"mem_prefilter_rows_filtered\":{mem_prefilter_rows_filtered}"
525            )?;
526        }
527
528        // Write optional verbose metrics if they are not empty
529        if let Some(metrics) = inverted_index_apply_metrics
530            && !metrics.is_empty()
531        {
532            write!(f, ", \"inverted_index_apply_metrics\":{:?}", metrics)?;
533        }
534        if let Some(metrics) = bloom_filter_apply_metrics
535            && !metrics.is_empty()
536        {
537            write!(f, ", \"bloom_filter_apply_metrics\":{:?}", metrics)?;
538        }
539        if let Some(metrics) = fulltext_index_apply_metrics
540            && !metrics.is_empty()
541        {
542            write!(f, ", \"fulltext_index_apply_metrics\":{:?}", metrics)?;
543        }
544        if let Some(metrics) = fetch_metrics
545            && !metrics.is_empty()
546        {
547            write!(f, ", \"fetch_metrics\":{:?}", metrics)?;
548        }
549        if let Some(metrics) = metadata_cache_metrics
550            && !metrics.is_empty()
551        {
552            write!(f, ", \"metadata_cache_metrics\":{:?}", metrics)?;
553        }
554
555        // Write merge metrics if not empty
556        if !merge_metrics.scan_cost.is_zero() {
557            write!(f, ", \"merge_metrics\":{:?}", merge_metrics)?;
558        }
559
560        // Write dedup metrics if not empty
561        if !dedup_metrics.dedup_cost.is_zero() {
562            write!(f, ", \"dedup_metrics\":{:?}", dedup_metrics)?;
563        }
564
565        // Write top file metrics if present and non-empty
566        if let Some(file_metrics) = per_file_metrics
567            && !file_metrics.is_empty()
568        {
569            // Use min-heap (BinaryHeap with reverse comparison) to keep only top 10
570            let mut heap = BinaryHeap::new();
571            for (file_id, metrics) in file_metrics.iter() {
572                let total_cost =
573                    metrics.build_part_cost + metrics.build_reader_cost + metrics.scan_cost;
574
575                // If the file has been pruned by a pruner, the build part cost may be zero.
576                // If we didn't read any ranges from it, we don't output the file.
577                if total_cost.is_zero() && metrics.num_ranges == 0 {
578                    continue;
579                }
580
581                if heap.len() < 10 {
582                    // Haven't reached 10 yet, just push
583                    heap.push(CompareCostReverse {
584                        total_cost,
585                        file_id: *file_id,
586                        metrics,
587                    });
588                } else if let Some(min_entry) = heap.peek() {
589                    // If current cost is higher than the minimum in our top-10, replace it
590                    if total_cost > min_entry.total_cost {
591                        heap.pop();
592                        heap.push(CompareCostReverse {
593                            total_cost,
594                            file_id: *file_id,
595                            metrics,
596                        });
597                    }
598                }
599            }
600
601            let top_files = heap.into_sorted_vec();
602            write!(f, ", \"top_file_metrics\": {{")?;
603            for (i, item) in top_files.iter().enumerate() {
604                let CompareCostReverse {
605                    total_cost: _,
606                    file_id,
607                    metrics,
608                } = item;
609                if i > 0 {
610                    write!(f, ", ")?;
611                }
612                write!(f, "\"{}\": {:?}", file_id, metrics)?;
613            }
614            write!(f, "}}")?;
615        }
616
617        if *range_cache_size > 0 {
618            write!(f, ", \"range_cache_size\":{range_cache_size}")?;
619        }
620        if *range_cache_hit > 0 {
621            write!(f, ", \"range_cache_hit\":{range_cache_hit}")?;
622        }
623        if *range_cache_miss > 0 {
624            write!(f, ", \"range_cache_miss\":{range_cache_miss}")?;
625        }
626
627        write!(
628            f,
629            ", \"build_ranges_peak_mem_size\":{build_ranges_peak_mem_size}, \
630             \"num_peak_range_builders\":{num_peak_range_builders}, \
631             \"stream_eof\":{stream_eof}}}"
632        )
633    }
634}
635impl ScanMetricsSet {
636    /// Attaches the `prepare_scan_cost` to the metrics set.
637    fn with_prepare_scan_cost(mut self, cost: Duration) -> Self {
638        self.prepare_scan_cost += cost;
639        self
640    }
641
642    /// Attaches the `convert_cost` to the metrics set.
643    fn with_convert_cost(mut self, time: Time) -> Self {
644        self.convert_cost = Some(time);
645        self
646    }
647
648    /// Merges the local scanner metrics.
649    fn merge_scanner_metrics(&mut self, other: &ScannerMetrics) {
650        let ScannerMetrics {
651            scan_cost,
652            yield_cost,
653            num_batches,
654            num_rows,
655        } = other;
656
657        self.scan_cost += *scan_cost;
658        self.yield_cost += *yield_cost;
659        self.num_rows += *num_rows;
660        self.num_batches += *num_batches;
661    }
662
663    /// Merges the local reader metrics.
664    fn merge_reader_metrics(&mut self, other: &ReaderMetrics) {
665        let ReaderMetrics {
666            build_cost,
667            filter_metrics:
668                ReaderFilterMetrics {
669                    rg_total,
670                    rg_fulltext_filtered,
671                    rg_inverted_filtered,
672                    rg_minmax_filtered,
673                    rg_bloom_filtered,
674                    rg_vector_filtered,
675                    rows_total,
676                    rows_fulltext_filtered,
677                    rows_inverted_filtered,
678                    rows_bloom_filtered,
679                    rows_vector_filtered,
680                    rows_vector_selected,
681                    rows_precise_filtered,
682                    fulltext_index_cache_hit,
683                    fulltext_index_cache_miss,
684                    inverted_index_cache_hit,
685                    inverted_index_cache_miss,
686                    bloom_filter_cache_hit,
687                    bloom_filter_cache_miss,
688                    minmax_cache_hit,
689                    minmax_cache_miss,
690                    pruner_cache_hit,
691                    pruner_cache_miss,
692                    pruner_prune_cost,
693                    inverted_index_apply_metrics,
694                    bloom_filter_apply_metrics,
695                    fulltext_index_apply_metrics,
696                },
697            num_record_batches,
698            num_batches,
699            num_rows,
700            scan_cost,
701            metadata_cache_metrics,
702            fetch_metrics,
703            metadata_mem_size,
704            num_range_builders,
705        } = other;
706
707        self.build_parts_cost += *build_cost;
708        self.sst_scan_cost += *scan_cost;
709
710        self.rg_total += *rg_total;
711        self.rg_fulltext_filtered += *rg_fulltext_filtered;
712        self.rg_inverted_filtered += *rg_inverted_filtered;
713        self.rg_minmax_filtered += *rg_minmax_filtered;
714        self.rg_bloom_filtered += *rg_bloom_filtered;
715        self.rg_vector_filtered += *rg_vector_filtered;
716
717        self.rows_before_filter += *rows_total;
718        self.rows_fulltext_filtered += *rows_fulltext_filtered;
719        self.rows_inverted_filtered += *rows_inverted_filtered;
720        self.rows_bloom_filtered += *rows_bloom_filtered;
721        self.rows_vector_filtered += *rows_vector_filtered;
722        self.rows_vector_selected += *rows_vector_selected;
723        self.rows_precise_filtered += *rows_precise_filtered;
724
725        self.fulltext_index_cache_hit += *fulltext_index_cache_hit;
726        self.fulltext_index_cache_miss += *fulltext_index_cache_miss;
727        self.inverted_index_cache_hit += *inverted_index_cache_hit;
728        self.inverted_index_cache_miss += *inverted_index_cache_miss;
729        self.bloom_filter_cache_hit += *bloom_filter_cache_hit;
730        self.bloom_filter_cache_miss += *bloom_filter_cache_miss;
731        self.minmax_cache_hit += *minmax_cache_hit;
732        self.minmax_cache_miss += *minmax_cache_miss;
733        self.pruner_cache_hit += *pruner_cache_hit;
734        self.pruner_cache_miss += *pruner_cache_miss;
735        self.pruner_prune_cost += *pruner_prune_cost;
736
737        self.num_sst_record_batches += *num_record_batches;
738        self.num_sst_batches += *num_batches;
739        self.num_sst_rows += *num_rows;
740
741        // Merge optional verbose metrics
742        if let Some(metrics) = inverted_index_apply_metrics {
743            self.inverted_index_apply_metrics
744                .get_or_insert_with(InvertedIndexApplyMetrics::default)
745                .merge_from(metrics);
746        }
747        if let Some(metrics) = bloom_filter_apply_metrics {
748            self.bloom_filter_apply_metrics
749                .get_or_insert_with(BloomFilterIndexApplyMetrics::default)
750                .merge_from(metrics);
751        }
752        if let Some(metrics) = fulltext_index_apply_metrics {
753            self.fulltext_index_apply_metrics
754                .get_or_insert_with(FulltextIndexApplyMetrics::default)
755                .merge_from(metrics);
756        }
757        if let Some(metrics) = fetch_metrics {
758            self.fetch_metrics
759                .get_or_insert_with(ParquetFetchMetrics::default)
760                .merge_from(metrics);
761        }
762        self.metadata_cache_metrics
763            .get_or_insert_with(MetadataCacheMetrics::default)
764            .merge_from(metadata_cache_metrics);
765
766        // Track memory usage and update peak.
767        self.build_ranges_mem_size += *metadata_mem_size;
768        if self.build_ranges_mem_size > self.build_ranges_peak_mem_size {
769            self.build_ranges_peak_mem_size = self.build_ranges_mem_size;
770        }
771
772        // Track number of builders and update peak.
773        self.num_range_builders += *num_range_builders;
774        if self.num_range_builders > self.num_peak_range_builders {
775            self.num_peak_range_builders = self.num_range_builders;
776        }
777    }
778
779    /// Merges per-file metrics.
780    fn merge_per_file_metrics(&mut self, other: &HashMap<RegionFileId, FileScanMetrics>) {
781        let self_file_metrics = self.per_file_metrics.get_or_insert_with(HashMap::new);
782        for (file_id, metrics) in other {
783            self_file_metrics
784                .entry(*file_id)
785                .or_default()
786                .merge_from(metrics);
787        }
788    }
789
790    /// Sets distributor metrics.
791    fn set_distributor_metrics(&mut self, distributor_metrics: &SeriesDistributorMetrics) {
792        let SeriesDistributorMetrics {
793            num_series_send_timeout,
794            num_series_send_full,
795            num_rows,
796            num_batches,
797            scan_cost,
798            yield_cost,
799            divider_cost,
800        } = distributor_metrics;
801
802        self.num_series_send_timeout += *num_series_send_timeout;
803        self.num_series_send_full += *num_series_send_full;
804        self.num_distributor_rows += *num_rows;
805        self.num_distributor_batches += *num_batches;
806        self.distributor_scan_cost += *scan_cost;
807        self.distributor_yield_cost += *yield_cost;
808        self.distributor_divider_cost += *divider_cost;
809    }
810
811    /// Observes metrics.
812    fn observe_metrics(&self) {
813        READ_STAGE_ELAPSED
814            .with_label_values(&["prepare_scan"])
815            .observe(self.prepare_scan_cost.as_secs_f64());
816        READ_STAGE_ELAPSED
817            .with_label_values(&["build_reader"])
818            .observe(self.build_reader_cost.as_secs_f64());
819        READ_STAGE_ELAPSED
820            .with_label_values(&["scan"])
821            .observe(self.scan_cost.as_secs_f64());
822        READ_STAGE_ELAPSED
823            .with_label_values(&["yield"])
824            .observe(self.yield_cost.as_secs_f64());
825        if let Some(time) = &self.convert_cost {
826            READ_STAGE_ELAPSED
827                .with_label_values(&["convert"])
828                .observe(Duration::from_nanos(time.value() as u64).as_secs_f64());
829        }
830        READ_STAGE_ELAPSED
831            .with_label_values(&["total"])
832            .observe(self.total_cost.as_secs_f64());
833        READ_ROWS_RETURN.observe(self.num_rows as f64);
834        READ_BATCHES_RETURN.observe(self.num_batches as f64);
835
836        READ_STAGE_ELAPSED
837            .with_label_values(&["build_parts"])
838            .observe(self.build_parts_cost.as_secs_f64());
839
840        READ_ROW_GROUPS_TOTAL
841            .with_label_values(&["before_filtering"])
842            .inc_by(self.rg_total as u64);
843        READ_ROW_GROUPS_TOTAL
844            .with_label_values(&["fulltext_index_filtered"])
845            .inc_by(self.rg_fulltext_filtered as u64);
846        READ_ROW_GROUPS_TOTAL
847            .with_label_values(&["inverted_index_filtered"])
848            .inc_by(self.rg_inverted_filtered as u64);
849        READ_ROW_GROUPS_TOTAL
850            .with_label_values(&["minmax_index_filtered"])
851            .inc_by(self.rg_minmax_filtered as u64);
852        READ_ROW_GROUPS_TOTAL
853            .with_label_values(&["bloom_filter_index_filtered"])
854            .inc_by(self.rg_bloom_filtered as u64);
855        #[cfg(feature = "vector_index")]
856        READ_ROW_GROUPS_TOTAL
857            .with_label_values(&["vector_index_filtered"])
858            .inc_by(self.rg_vector_filtered as u64);
859
860        PRECISE_FILTER_ROWS_TOTAL
861            .with_label_values(&["parquet"])
862            .inc_by(self.rows_precise_filtered as u64);
863        READ_ROWS_IN_ROW_GROUP_TOTAL
864            .with_label_values(&["before_filtering"])
865            .inc_by(self.rows_before_filter as u64);
866        READ_ROWS_IN_ROW_GROUP_TOTAL
867            .with_label_values(&["fulltext_index_filtered"])
868            .inc_by(self.rows_fulltext_filtered as u64);
869        READ_ROWS_IN_ROW_GROUP_TOTAL
870            .with_label_values(&["inverted_index_filtered"])
871            .inc_by(self.rows_inverted_filtered as u64);
872        READ_ROWS_IN_ROW_GROUP_TOTAL
873            .with_label_values(&["bloom_filter_index_filtered"])
874            .inc_by(self.rows_bloom_filtered as u64);
875        #[cfg(feature = "vector_index")]
876        READ_ROWS_IN_ROW_GROUP_TOTAL
877            .with_label_values(&["vector_index_filtered"])
878            .inc_by(self.rows_vector_filtered as u64);
879    }
880}
881
882struct PartitionMetricsInner {
883    region_id: RegionId,
884    /// Index of the partition to scan.
885    partition: usize,
886    /// Label to distinguish different scan operation.
887    scanner_type: &'static str,
888    /// Query start time.
889    query_start: Instant,
890    /// Whether to use verbose logging.
891    explain_verbose: bool,
892    /// Verbose scan metrics that only log to debug logs by default.
893    metrics: Mutex<ScanMetricsSet>,
894    in_progress_scan: IntGauge,
895
896    // Normal metrics that always report to the [ExecutionPlanMetricsSet]:
897    /// Duration to build file ranges.
898    build_parts_cost: Time,
899    /// Duration to build the (merge) reader.
900    build_reader_cost: Time,
901    /// Duration to scan data.
902    scan_cost: Time,
903    /// Duration while waiting for `yield`.
904    yield_cost: Time,
905    /// Duration to convert [`Batch`]es.
906    convert_cost: Time,
907    /// Aggregated compute time reported to DataFusion.
908    elapsed_compute: Time,
909}
910
911impl PartitionMetricsInner {
912    fn on_finish(&self, stream_eof: bool) {
913        let mut metrics = self.metrics.lock().unwrap();
914        if metrics.total_cost.is_zero() {
915            metrics.total_cost = self.query_start.elapsed();
916        }
917        if !metrics.stream_eof {
918            metrics.stream_eof = stream_eof;
919        }
920    }
921}
922
923impl MergeMetricsReport for PartitionMetricsInner {
924    fn report(&self, metrics: &mut MergeMetrics) {
925        let mut scan_metrics = self.metrics.lock().unwrap();
926        // Merge the metrics into scan_metrics
927        scan_metrics.merge_metrics.merge(metrics);
928
929        // Reset the input metrics
930        *metrics = MergeMetrics::default();
931    }
932}
933
934impl DedupMetricsReport for PartitionMetricsInner {
935    fn report(&self, metrics: &mut DedupMetrics) {
936        let mut scan_metrics = self.metrics.lock().unwrap();
937        // Merge the metrics into scan_metrics
938        scan_metrics.dedup_metrics.merge(metrics);
939
940        // Reset the input metrics
941        *metrics = DedupMetrics::default();
942    }
943}
944
945impl Drop for PartitionMetricsInner {
946    fn drop(&mut self) {
947        self.on_finish(false);
948        let metrics = self.metrics.lock().unwrap();
949        metrics.observe_metrics();
950        self.in_progress_scan.dec();
951
952        if self.explain_verbose {
953            common_telemetry::info!(
954                "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}",
955                self.scanner_type,
956                self.region_id,
957                self.partition,
958                metrics,
959            );
960        } else {
961            common_telemetry::debug!(
962                "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}",
963                self.scanner_type,
964                self.region_id,
965                self.partition,
966                metrics,
967            );
968        }
969    }
970}
971
972/// List of PartitionMetrics.
973#[derive(Default)]
974pub(crate) struct PartitionMetricsList(Mutex<Vec<Option<PartitionMetrics>>>);
975
976impl PartitionMetricsList {
977    /// Sets a new [PartitionMetrics] at the specified partition.
978    pub(crate) fn set(&self, partition: usize, metrics: PartitionMetrics) {
979        let mut list = self.0.lock().unwrap();
980        if list.len() <= partition {
981            list.resize(partition + 1, None);
982        }
983        list[partition] = Some(metrics);
984    }
985
986    /// Format verbose metrics for each partition for explain.
987    pub(crate) fn format_verbose_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result {
988        let list = self.0.lock().unwrap();
989        write!(f, ", \"metrics_per_partition\": ")?;
990        f.debug_list()
991            .entries(list.iter().filter_map(|p| p.as_ref()))
992            .finish()?;
993        write!(f, "}}")
994    }
995}
996
997/// Metrics while reading a partition.
998#[derive(Clone)]
999pub struct PartitionMetrics(Arc<PartitionMetricsInner>);
1000
1001impl PartitionMetrics {
1002    pub(crate) fn new(
1003        region_id: RegionId,
1004        partition: usize,
1005        scanner_type: &'static str,
1006        query_start: Instant,
1007        explain_verbose: bool,
1008        metrics_set: &ExecutionPlanMetricsSet,
1009    ) -> Self {
1010        let partition_str = partition.to_string();
1011        let in_progress_scan = IN_PROGRESS_SCAN.with_label_values(&[scanner_type, &partition_str]);
1012        in_progress_scan.inc();
1013        let convert_cost = MetricBuilder::new(metrics_set).subset_time("convert_cost", partition);
1014        let metrics = ScanMetricsSet::default()
1015            .with_prepare_scan_cost(query_start.elapsed())
1016            .with_convert_cost(convert_cost.clone());
1017        let inner = PartitionMetricsInner {
1018            region_id,
1019            partition,
1020            scanner_type,
1021            query_start,
1022            explain_verbose,
1023            metrics: Mutex::new(metrics),
1024            in_progress_scan,
1025            build_parts_cost: MetricBuilder::new(metrics_set)
1026                .subset_time("build_parts_cost", partition),
1027            build_reader_cost: MetricBuilder::new(metrics_set)
1028                .subset_time("build_reader_cost", partition),
1029            scan_cost: MetricBuilder::new(metrics_set).subset_time("scan_cost", partition),
1030            yield_cost: MetricBuilder::new(metrics_set).subset_time("yield_cost", partition),
1031            convert_cost,
1032            elapsed_compute: MetricBuilder::new(metrics_set).elapsed_compute(partition),
1033        };
1034        Self(Arc::new(inner))
1035    }
1036
1037    pub(crate) fn on_first_poll(&self) {
1038        let mut metrics = self.0.metrics.lock().unwrap();
1039        metrics.first_poll = self.0.query_start.elapsed();
1040    }
1041
1042    pub(crate) fn inc_num_mem_ranges(&self, num: usize) {
1043        let mut metrics = self.0.metrics.lock().unwrap();
1044        metrics.num_mem_ranges += num;
1045    }
1046
1047    pub fn inc_num_file_ranges(&self, num: usize) {
1048        let mut metrics = self.0.metrics.lock().unwrap();
1049        metrics.num_file_ranges += num;
1050    }
1051
1052    fn record_elapsed_compute(&self, duration: Duration) {
1053        if duration.is_zero() {
1054            return;
1055        }
1056        self.0.elapsed_compute.add_duration(duration);
1057    }
1058
1059    /// Merges `build_reader_cost`.
1060    pub(crate) fn inc_build_reader_cost(&self, cost: Duration) {
1061        self.0.build_reader_cost.add_duration(cost);
1062
1063        let mut metrics = self.0.metrics.lock().unwrap();
1064        metrics.build_reader_cost += cost;
1065    }
1066
1067    pub(crate) fn inc_convert_batch_cost(&self, cost: Duration) {
1068        self.0.convert_cost.add_duration(cost);
1069        self.record_elapsed_compute(cost);
1070    }
1071
1072    /// Reports memtable scan metrics.
1073    pub(crate) fn report_mem_scan_metrics(&self, data: &crate::memtable::MemScanMetricsData) {
1074        let mut metrics = self.0.metrics.lock().unwrap();
1075        metrics.mem_scan_cost += data.scan_cost;
1076        metrics.mem_rows += data.num_rows;
1077        metrics.mem_batches += data.num_batches;
1078        metrics.mem_series += data.total_series;
1079        metrics.mem_prefilter_cost += data.prefilter_cost;
1080        metrics.mem_prefilter_rows_filtered += data.prefilter_rows_filtered;
1081    }
1082
1083    /// Merges [ScannerMetrics], `build_reader_cost`, `scan_cost` and `yield_cost`.
1084    pub(crate) fn merge_metrics(&self, metrics: &ScannerMetrics) {
1085        self.0.scan_cost.add_duration(metrics.scan_cost);
1086        self.record_elapsed_compute(metrics.scan_cost);
1087        self.0.yield_cost.add_duration(metrics.yield_cost);
1088        self.record_elapsed_compute(metrics.yield_cost);
1089
1090        let mut metrics_set = self.0.metrics.lock().unwrap();
1091        metrics_set.merge_scanner_metrics(metrics);
1092    }
1093
1094    /// Merges [ReaderMetrics] and `build_reader_cost`.
1095    pub fn merge_reader_metrics(
1096        &self,
1097        metrics: &ReaderMetrics,
1098        per_file_metrics: Option<&HashMap<RegionFileId, FileScanMetrics>>,
1099    ) {
1100        self.0.build_parts_cost.add_duration(metrics.build_cost);
1101
1102        let mut metrics_set = self.0.metrics.lock().unwrap();
1103        metrics_set.merge_reader_metrics(metrics);
1104
1105        // Merge per-file metrics if provided
1106        if let Some(file_metrics) = per_file_metrics {
1107            metrics_set.merge_per_file_metrics(file_metrics);
1108        }
1109    }
1110
1111    /// Finishes the query.
1112    pub(crate) fn on_finish(&self) {
1113        self.0.on_finish(true);
1114    }
1115
1116    /// Sets the distributor metrics.
1117    pub(crate) fn set_distributor_metrics(&self, metrics: &SeriesDistributorMetrics) {
1118        let mut metrics_set = self.0.metrics.lock().unwrap();
1119        metrics_set.set_distributor_metrics(metrics);
1120    }
1121
1122    /// Returns whether verbose explain is enabled.
1123    pub(crate) fn explain_verbose(&self) -> bool {
1124        self.0.explain_verbose
1125    }
1126
1127    /// Returns a MergeMetricsReport trait object for reporting merge metrics.
1128    pub(crate) fn merge_metrics_reporter(&self) -> Arc<dyn MergeMetricsReport> {
1129        self.0.clone()
1130    }
1131
1132    /// Returns a DedupMetricsReport trait object for reporting dedup metrics.
1133    pub(crate) fn dedup_metrics_reporter(&self) -> Arc<dyn DedupMetricsReport> {
1134        self.0.clone()
1135    }
1136
1137    /// Increments the total bytes added to the range cache.
1138    #[allow(dead_code)]
1139    pub(crate) fn inc_range_cache_size(&self, size: usize) {
1140        let mut metrics = self.0.metrics.lock().unwrap();
1141        metrics.range_cache_size += size;
1142    }
1143
1144    /// Increments the range cache hit counter.
1145    #[allow(dead_code)]
1146    pub(crate) fn inc_range_cache_hit(&self) {
1147        let mut metrics = self.0.metrics.lock().unwrap();
1148        metrics.range_cache_hit += 1;
1149    }
1150
1151    /// Increments the range cache miss counter.
1152    #[allow(dead_code)]
1153    pub(crate) fn inc_range_cache_miss(&self) {
1154        let mut metrics = self.0.metrics.lock().unwrap();
1155        metrics.range_cache_miss += 1;
1156    }
1157}
1158
1159impl fmt::Debug for PartitionMetrics {
1160    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1161        let metrics = self.0.metrics.lock().unwrap();
1162        write!(
1163            f,
1164            r#"{{"partition":{}, "metrics":{:?}}}"#,
1165            self.0.partition, metrics
1166        )
1167    }
1168}
1169
1170/// Metrics for the series distributor.
1171#[derive(Default)]
1172pub(crate) struct SeriesDistributorMetrics {
1173    /// Number of send timeout in SeriesScan.
1174    pub(crate) num_series_send_timeout: usize,
1175    /// Number of send full in SeriesScan.
1176    pub(crate) num_series_send_full: usize,
1177    /// Number of rows the series distributor scanned.
1178    pub(crate) num_rows: usize,
1179    /// Number of batches the series distributor scanned.
1180    pub(crate) num_batches: usize,
1181    /// Duration of the series distributor to scan.
1182    pub(crate) scan_cost: Duration,
1183    /// Duration of the series distributor to yield.
1184    pub(crate) yield_cost: Duration,
1185    /// Duration spent in divider operations.
1186    pub(crate) divider_cost: Duration,
1187}
1188
1189/// Scans memtable ranges at `index` using flat format that returns RecordBatch.
1190#[tracing::instrument(
1191    skip_all,
1192    fields(
1193        region_id = %stream_ctx.input.region_metadata().region_id,
1194        row_group_index = %index.index,
1195        source = "mem_flat"
1196    )
1197)]
1198pub(crate) fn scan_flat_mem_ranges(
1199    stream_ctx: Arc<StreamContext>,
1200    part_metrics: PartitionMetrics,
1201    index: RowGroupIndex,
1202    time_range: FileTimeRange,
1203) -> impl Stream<Item = Result<RecordBatch>> {
1204    try_stream! {
1205        let ranges = stream_ctx.input.build_mem_ranges(index);
1206        part_metrics.inc_num_mem_ranges(ranges.len());
1207        for range in ranges {
1208            let build_reader_start = Instant::now();
1209            let mem_scan_metrics = Some(MemScanMetrics::default());
1210            let mut iter = range.build_record_batch_iter(Some(time_range), mem_scan_metrics.clone())?;
1211            part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
1212
1213            while let Some(record_batch) = iter.next().transpose()? {
1214                yield record_batch;
1215            }
1216
1217            // Report the memtable scan metrics to partition metrics
1218            if let Some(ref metrics) = mem_scan_metrics {
1219                let data = metrics.data();
1220                part_metrics.report_mem_scan_metrics(&data);
1221            }
1222        }
1223    }
1224}
1225
1226/// Files with row count greater than this threshold can contribute to the estimation.
1227const SPLIT_ROW_THRESHOLD: u64 = DEFAULT_ROW_GROUP_SIZE as u64;
1228/// Number of series threshold for splitting batches.
1229const NUM_SERIES_THRESHOLD: u64 = 10240;
1230/// Minimum batch size after splitting. The batch size is less than 60 because a series may only have
1231/// 60 samples per hour.
1232const BATCH_SIZE_THRESHOLD: u64 = 50;
1233
1234/// Returns the estimated rows per batch after splitting if splitting flat record batches
1235/// may improve merge performance. Returns `None` if splitting is not beneficial.
1236pub(crate) fn should_split_flat_batches_for_merge(
1237    stream_ctx: &Arc<StreamContext>,
1238    range_meta: &RangeMeta,
1239) -> Option<usize> {
1240    // Number of files to split and scan.
1241    let mut num_files_to_split = 0;
1242    let mut num_mem_rows = 0;
1243    let mut num_mem_series = 0;
1244    // Total rows and series for estimating batch size after splitting.
1245    let mut total_rows: u64 = 0;
1246    let mut total_series: u64 = 0;
1247    // Checks each file range, returns early if any range is not splittable.
1248    // For mem ranges, we collect the total number of rows and series because the number of rows in a
1249    // mem range may be too small.
1250    for index in &range_meta.row_group_indices {
1251        if stream_ctx.is_mem_range_index(*index) {
1252            let memtable = &stream_ctx.input.memtables[index.index];
1253            // Is mem range
1254            let stats = memtable.stats();
1255            num_mem_rows += stats.num_rows();
1256            num_mem_series += stats.series_count();
1257        } else if stream_ctx.is_file_range_index(*index) {
1258            // This is a file range.
1259            let file_index = index.index - stream_ctx.input.num_memtables();
1260            let file = &stream_ctx.input.files[file_index];
1261            let file_meta = file.meta_ref();
1262            if file_meta.level == 0 {
1263                // Always split level 0 files.
1264                num_files_to_split += 1;
1265                continue;
1266            } else if file_meta.num_rows < SPLIT_ROW_THRESHOLD || file_meta.num_series == 0 {
1267                // If the file doesn't have enough rows, or the number of series is unavailable, skips it.
1268                continue;
1269            }
1270            debug_assert!(file_meta.num_rows > 0);
1271            if !can_split_series(file_meta.num_rows, file_meta.num_series) {
1272                // We can't split batches in a file.
1273                common_telemetry::trace!(
1274                    "Can't split series for file {}, level: {}, num_rows: {}, num_series: {}",
1275                    file_meta.file_id,
1276                    file_meta.level,
1277                    file_meta.num_rows,
1278                    file_meta.num_series,
1279                );
1280                return None;
1281            } else {
1282                num_files_to_split += 1;
1283                total_rows += file.meta_ref().num_rows;
1284                total_series += file.meta_ref().num_series;
1285            }
1286        }
1287        // Skips non-file and non-mem ranges.
1288    }
1289
1290    let should_split = if num_files_to_split > 0 {
1291        // We mainly consider file ranges because they have enough data for sampling.
1292        true
1293    } else if num_mem_series > 0
1294        && num_mem_rows > 0
1295        && can_split_series(num_mem_rows as u64, num_mem_series as u64)
1296    {
1297        total_rows += num_mem_rows as u64;
1298        total_series += num_mem_series as u64;
1299        true
1300    } else {
1301        false
1302    };
1303
1304    if !should_split {
1305        return None;
1306    }
1307
1308    // Estimate rows per batch after splitting.
1309    let estimated_batch_size = if total_series > 0 && total_rows > 0 {
1310        ((total_rows / total_series) as usize).clamp(1, DEFAULT_READ_BATCH_SIZE)
1311    } else {
1312        // No valid estimate available, use a conservative fallback.
1313        DEFAULT_READ_BATCH_SIZE / 4
1314    };
1315    Some(estimated_batch_size)
1316}
1317
1318/// Computes the channel size for parallel scan based on the estimated rows per batch.
1319/// The channel should buffer approximately `2 * DEFAULT_READ_BATCH_SIZE` rows.
1320pub(crate) fn compute_parallel_channel_size(estimated_rows_per_batch: usize) -> usize {
1321    let size = 2 * DEFAULT_READ_BATCH_SIZE / estimated_rows_per_batch.max(1);
1322    size.clamp(2, 64)
1323}
1324
1325/// Computes the average estimated rows per batch across multiple range readers.
1326pub(crate) fn compute_average_batch_size(
1327    estimated_rows_per_batch: impl IntoIterator<Item = usize>,
1328) -> usize {
1329    let mut total = 0usize;
1330    let mut count = 0usize;
1331    for size in estimated_rows_per_batch {
1332        total += size;
1333        count += 1;
1334    }
1335
1336    if count == 0 {
1337        return DEFAULT_READ_BATCH_SIZE;
1338    }
1339
1340    (total / count).clamp(1, DEFAULT_READ_BATCH_SIZE)
1341}
1342
1343fn can_split_series(num_rows: u64, num_series: u64) -> bool {
1344    if num_rows == 0 || num_series == 0 {
1345        return false;
1346    }
1347
1348    // It doesn't have too many series or it will have enough rows for each batch.
1349    num_series < NUM_SERIES_THRESHOLD || num_rows / num_series >= BATCH_SIZE_THRESHOLD
1350}
1351
1352#[cfg(test)]
1353mod split_tests {
1354    use std::sync::Arc;
1355
1356    use common_time::Timestamp;
1357    use smallvec::smallvec;
1358    use store_api::storage::FileId;
1359
1360    use super::*;
1361    use crate::read::projection::ProjectionMapper;
1362    use crate::read::range::{RangeMeta, RowGroupIndex, SourceIndex};
1363    use crate::read::scan_region::{ScanInput, StreamContext};
1364    use crate::sst::file::FileHandle;
1365    use crate::test_util::memtable_util::metadata_with_primary_key;
1366    use crate::test_util::scheduler_util::SchedulerEnv;
1367    use crate::test_util::sst_util::sst_file_handle_with_file_id;
1368
1369    async fn new_stream_context_with_files(files: Vec<FileHandle>) -> StreamContext {
1370        let env = SchedulerEnv::new().await;
1371        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1372        let mapper = ProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap();
1373        let input = ScanInput::new(env.access_layer.clone(), mapper).with_files(files);
1374
1375        StreamContext {
1376            input,
1377            ranges: vec![],
1378            scan_fingerprint: None,
1379            query_start: std::time::Instant::now(),
1380        }
1381    }
1382
1383    fn single_file_range_meta() -> RangeMeta {
1384        RangeMeta {
1385            time_range: (
1386                Timestamp::new_millisecond(0),
1387                Timestamp::new_millisecond(1000),
1388            ),
1389            indices: smallvec![SourceIndex {
1390                index: 0,
1391                num_row_groups: 1,
1392            }],
1393            row_group_indices: smallvec![RowGroupIndex {
1394                index: 0,
1395                row_group_index: 0,
1396            }],
1397            num_rows: 1024,
1398        }
1399    }
1400
1401    #[tokio::test]
1402    async fn should_split_level_zero_file_even_when_series_stats_are_missing() {
1403        let mut file = sst_file_handle_with_file_id(FileId::random(), 0, 1000)
1404            .meta_ref()
1405            .clone();
1406        file.level = 0;
1407        file.num_rows = DEFAULT_ROW_GROUP_SIZE as u64;
1408        file.num_row_groups = 1;
1409        file.num_series = 0;
1410
1411        let file = FileHandle::new(file, crate::test_util::new_noop_file_purger());
1412        let stream_ctx = Arc::new(new_stream_context_with_files(vec![file]).await);
1413
1414        assert!(
1415            should_split_flat_batches_for_merge(&stream_ctx, &single_file_range_meta()).is_some()
1416        );
1417    }
1418
1419    #[test]
1420    fn can_split_series_returns_false_for_zero_inputs() {
1421        assert!(!can_split_series(0, 1));
1422        assert!(!can_split_series(1, 0));
1423        assert!(!can_split_series(0, 0));
1424    }
1425}
1426
1427/// Creates a new [ReaderFilterMetrics] with optional apply metrics initialized
1428/// based on the `explain_verbose` flag.
1429fn new_filter_metrics(explain_verbose: bool) -> ReaderFilterMetrics {
1430    if explain_verbose {
1431        ReaderFilterMetrics {
1432            inverted_index_apply_metrics: Some(InvertedIndexApplyMetrics::default()),
1433            bloom_filter_apply_metrics: Some(BloomFilterIndexApplyMetrics::default()),
1434            fulltext_index_apply_metrics: Some(FulltextIndexApplyMetrics::default()),
1435            ..Default::default()
1436        }
1437    } else {
1438        ReaderFilterMetrics::default()
1439    }
1440}
1441
1442/// Scans file ranges at `index` using flat reader that returns RecordBatch.
1443#[tracing::instrument(
1444    skip_all,
1445    fields(
1446        region_id = %stream_ctx.input.region_metadata().region_id,
1447        row_group_index = %index.index,
1448        source = read_type
1449    )
1450)]
1451pub(crate) async fn scan_flat_file_ranges(
1452    stream_ctx: Arc<StreamContext>,
1453    part_metrics: PartitionMetrics,
1454    index: RowGroupIndex,
1455    read_type: &'static str,
1456    partition_pruner: Arc<PartitionPruner>,
1457) -> Result<impl Stream<Item = Result<RecordBatch>>> {
1458    let mut reader_metrics = ReaderMetrics {
1459        filter_metrics: new_filter_metrics(part_metrics.explain_verbose()),
1460        ..Default::default()
1461    };
1462    let ranges = partition_pruner
1463        .build_file_ranges(index, &part_metrics, &mut reader_metrics)
1464        .await?;
1465    part_metrics.inc_num_file_ranges(ranges.len());
1466    part_metrics.merge_reader_metrics(&reader_metrics, None);
1467
1468    // Creates initial per-file metrics with build_part_cost.
1469    let init_per_file_metrics = if part_metrics.explain_verbose() {
1470        let file = stream_ctx.input.file_from_index(index);
1471        let file_id = file.file_id();
1472
1473        let mut map = HashMap::new();
1474        map.insert(
1475            file_id,
1476            FileScanMetrics {
1477                build_part_cost: reader_metrics.build_cost,
1478                ..Default::default()
1479            },
1480        );
1481        Some(map)
1482    } else {
1483        None
1484    };
1485
1486    Ok(build_flat_file_range_scan_stream(
1487        stream_ctx,
1488        part_metrics,
1489        read_type,
1490        ranges,
1491        init_per_file_metrics,
1492    ))
1493}
1494
1495/// Build the stream of scanning the input [`FileRange`]s using flat reader that returns RecordBatch.
1496#[tracing::instrument(
1497    skip_all,
1498    fields(read_type = read_type, range_count = ranges.len())
1499)]
1500pub fn build_flat_file_range_scan_stream(
1501    _stream_ctx: Arc<StreamContext>,
1502    part_metrics: PartitionMetrics,
1503    read_type: &'static str,
1504    ranges: SmallVec<[FileRange; 2]>,
1505    mut per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
1506) -> impl Stream<Item = Result<RecordBatch>> {
1507    try_stream! {
1508        let fetch_metrics = if part_metrics.explain_verbose() {
1509            Some(Arc::new(ParquetFetchMetrics::default()))
1510        } else {
1511            None
1512        };
1513        let reader_metrics = &mut ReaderMetrics {
1514            fetch_metrics: fetch_metrics.clone(),
1515            ..Default::default()
1516        };
1517        for range in ranges {
1518            let build_reader_start = Instant::now();
1519            let Some(mut reader) = range.flat_reader(_stream_ctx.input.series_row_selector, fetch_metrics.as_deref()).await? else{continue};
1520            let build_cost = build_reader_start.elapsed();
1521            part_metrics.inc_build_reader_cost(build_cost);
1522
1523            let may_compat = range
1524                .compat_batch()
1525                .map(|compat| {
1526                    compat.as_flat().context(UnexpectedSnafu {
1527                        reason: "Invalid compat for flat format",
1528                    })
1529                })
1530                .transpose()?;
1531
1532            let mapper = range.compaction_projection_mapper();
1533            while let Some(record_batch) = reader.next_batch().await? {
1534                let record_batch = if let Some(mapper) = mapper {
1535                    let batch = mapper.project(record_batch)?;
1536                    batch
1537                } else {
1538                    record_batch
1539                };
1540
1541                if let Some(flat_compat) = may_compat {
1542                    let batch = flat_compat.compat(record_batch)?;
1543                    yield batch;
1544                } else {
1545                    yield record_batch;
1546                }
1547            }
1548
1549            let prune_metrics = reader.metrics();
1550
1551            // Update per-file metrics if tracking is enabled
1552            if let Some(file_metrics_map) = per_file_metrics.as_mut() {
1553                let file_id = range.file_handle().file_id();
1554                let file_metrics = file_metrics_map
1555                    .entry(file_id)
1556                    .or_insert_with(FileScanMetrics::default);
1557
1558                file_metrics.num_ranges += 1;
1559                file_metrics.num_rows += prune_metrics.num_rows;
1560                file_metrics.build_reader_cost += build_cost;
1561                file_metrics.scan_cost += prune_metrics.scan_cost;
1562            }
1563
1564            reader_metrics.merge_from(&prune_metrics);
1565        }
1566
1567        // Reports metrics.
1568        reader_metrics.observe_rows(read_type);
1569        reader_metrics.filter_metrics.observe();
1570        part_metrics.merge_reader_metrics(reader_metrics, per_file_metrics.as_ref());
1571    }
1572}
1573
1574/// Build the stream of scanning the extension range in flat format denoted by the [`RowGroupIndex`].
1575#[cfg(feature = "enterprise")]
1576pub(crate) async fn scan_flat_extension_range(
1577    context: Arc<StreamContext>,
1578    index: RowGroupIndex,
1579    partition_metrics: PartitionMetrics,
1580) -> Result<BoxedRecordBatchStream> {
1581    use snafu::ResultExt;
1582
1583    let range = context.input.extension_range(index.index);
1584    let reader = range.flat_reader(context.as_ref());
1585    let stream = reader
1586        .read(context, partition_metrics, index)
1587        .await
1588        .context(crate::error::ScanExternalRangeSnafu)?;
1589    Ok(stream)
1590}
1591
1592pub(crate) async fn maybe_scan_flat_other_ranges(
1593    context: &Arc<StreamContext>,
1594    index: RowGroupIndex,
1595    metrics: &PartitionMetrics,
1596) -> Result<BoxedRecordBatchStream> {
1597    #[cfg(feature = "enterprise")]
1598    {
1599        scan_flat_extension_range(context.clone(), index, metrics.clone()).await
1600    }
1601
1602    #[cfg(not(feature = "enterprise"))]
1603    {
1604        let _ = context;
1605        let _ = index;
1606        let _ = metrics;
1607
1608        crate::error::UnexpectedSnafu {
1609            reason: "no other ranges scannable in flat format",
1610        }
1611        .fail()
1612    }
1613}
1614
1615/// A stream wrapper that splits record batches from an inner stream.
1616pub(crate) struct SplitRecordBatchStream<S> {
1617    /// The inner stream that yields record batches.
1618    inner: S,
1619    /// Buffer for split batches.
1620    batches: VecDeque<RecordBatch>,
1621}
1622
1623impl<S> SplitRecordBatchStream<S> {
1624    /// Creates a new splitting stream wrapper.
1625    pub(crate) fn new(inner: S) -> Self {
1626        Self {
1627            inner,
1628            batches: VecDeque::new(),
1629        }
1630    }
1631}
1632
1633impl<S> Stream for SplitRecordBatchStream<S>
1634where
1635    S: Stream<Item = Result<RecordBatch>> + Unpin,
1636{
1637    type Item = Result<RecordBatch>;
1638
1639    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1640        loop {
1641            // First, check if we have buffered split batches
1642            if let Some(batch) = self.batches.pop_front() {
1643                return Poll::Ready(Some(Ok(batch)));
1644            }
1645
1646            // Poll the inner stream for the next batch
1647            let record_batch = match futures::ready!(Pin::new(&mut self.inner).poll_next(cx)) {
1648                Some(Ok(batch)) => batch,
1649                Some(Err(e)) => return Poll::Ready(Some(Err(e))),
1650                None => return Poll::Ready(None),
1651            };
1652
1653            // Split the batch and buffer the results
1654            split_record_batch(record_batch, &mut self.batches);
1655            // Continue the loop to return the first split batch
1656        }
1657    }
1658}
1659
1660/// Splits the batch by timestamps.
1661///
1662/// # Panics
1663/// Panics if the timestamp array is invalid.
1664pub(crate) fn split_record_batch(record_batch: RecordBatch, batches: &mut VecDeque<RecordBatch>) {
1665    let batch_rows = record_batch.num_rows();
1666    if batch_rows == 0 {
1667        return;
1668    }
1669    if batch_rows < 2 {
1670        batches.push_back(record_batch);
1671        return;
1672    }
1673
1674    let time_index_pos = time_index_column_index(record_batch.num_columns());
1675    let timestamps = record_batch.column(time_index_pos);
1676    let (ts_values, _unit) = timestamp_array_to_primitive(timestamps).unwrap();
1677    let mut offsets = Vec::with_capacity(16);
1678    offsets.push(0);
1679    let values = ts_values.values();
1680    for (i, &value) in values.iter().take(batch_rows - 1).enumerate() {
1681        if value > values[i + 1] {
1682            offsets.push(i + 1);
1683        }
1684    }
1685    offsets.push(values.len());
1686
1687    // Splits the batch by offsets.
1688    for (i, &start) in offsets[..offsets.len() - 1].iter().enumerate() {
1689        let end = offsets[i + 1];
1690        let rows_in_batch = end - start;
1691        batches.push_back(record_batch.slice(start, rows_in_batch));
1692    }
1693}
1694
1695#[cfg(test)]
1696mod tests {
1697    use std::sync::Arc;
1698    use std::time::Instant;
1699
1700    use common_time::Timestamp;
1701    use smallvec::{SmallVec, smallvec};
1702    use store_api::storage::RegionId;
1703
1704    use super::*;
1705    use crate::cache::CacheStrategy;
1706    use crate::memtable::{
1707        BoxedBatchIterator, BoxedRecordBatchIterator, IterBuilder, MemtableRange,
1708        MemtableRangeContext, MemtableStats,
1709    };
1710    use crate::read::projection::ProjectionMapper;
1711    use crate::read::range::{MemRangeBuilder, SourceIndex};
1712    use crate::read::scan_region::ScanInput;
1713    use crate::sst::file::{FileHandle, FileMeta};
1714    use crate::sst::file_purger::NoopFilePurger;
1715    use crate::test_util::memtable_util::metadata_for_test;
1716    use crate::test_util::scheduler_util::SchedulerEnv;
1717
1718    struct EmptyIterBuilder;
1719
1720    impl IterBuilder for EmptyIterBuilder {
1721        fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
1722            Ok(Box::new(std::iter::empty()))
1723        }
1724
1725        fn is_record_batch(&self) -> bool {
1726            true
1727        }
1728
1729        fn build_record_batch(
1730            &self,
1731            _time_range: Option<(Timestamp, Timestamp)>,
1732            _metrics: Option<MemScanMetrics>,
1733        ) -> Result<BoxedRecordBatchIterator> {
1734            Ok(Box::new(std::iter::empty()))
1735        }
1736    }
1737
1738    async fn new_test_stream_ctx(
1739        files: Vec<FileHandle>,
1740        memtables: Vec<MemRangeBuilder>,
1741    ) -> Arc<StreamContext> {
1742        let env = SchedulerEnv::new().await;
1743        let metadata = metadata_for_test();
1744        let mapper = ProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap();
1745        let input = ScanInput::new(env.access_layer.clone(), mapper)
1746            .with_cache(CacheStrategy::Disabled)
1747            .with_memtables(memtables)
1748            .with_files(files);
1749
1750        Arc::new(StreamContext {
1751            input,
1752            ranges: Vec::new(),
1753            scan_fingerprint: None,
1754            query_start: Instant::now(),
1755        })
1756    }
1757
1758    fn new_test_file(num_rows: u64, num_series: u64) -> FileHandle {
1759        let meta = FileMeta {
1760            region_id: RegionId::new(123, 456),
1761            file_id: Default::default(),
1762            level: 1,
1763            time_range: (
1764                Timestamp::new_millisecond(0),
1765                Timestamp::new_millisecond(1000),
1766            ),
1767            num_rows,
1768            num_series,
1769            ..Default::default()
1770        };
1771        FileHandle::new(meta, Arc::new(NoopFilePurger))
1772    }
1773
1774    fn new_test_memtable(num_rows: usize, series_count: usize) -> MemRangeBuilder {
1775        let context = Arc::new(MemtableRangeContext::new(
1776            0,
1777            Box::new(EmptyIterBuilder),
1778            Default::default(),
1779        ));
1780        let stats = MemtableStats {
1781            time_range: Some((
1782                Timestamp::new_millisecond(0),
1783                Timestamp::new_millisecond(1000),
1784            )),
1785            num_rows,
1786            num_ranges: 1,
1787            series_count,
1788            ..Default::default()
1789        };
1790        let range = MemtableRange::new(context, stats.clone());
1791        MemRangeBuilder::new(range, stats)
1792    }
1793
1794    fn new_test_range_meta(row_group_indices: SmallVec<[RowGroupIndex; 2]>) -> RangeMeta {
1795        let indices = row_group_indices
1796            .iter()
1797            .map(|row_group_index| SourceIndex {
1798                index: row_group_index.index,
1799                num_row_groups: 1,
1800            })
1801            .collect();
1802
1803        RangeMeta {
1804            time_range: (
1805                Timestamp::new_millisecond(0),
1806                Timestamp::new_millisecond(1000),
1807            ),
1808            indices,
1809            row_group_indices,
1810            num_rows: 0,
1811        }
1812    }
1813
1814    #[tokio::test]
1815    async fn test_should_split_flat_batches_for_merge_uses_splittable_file_rows_per_series() {
1816        let num_rows = SPLIT_ROW_THRESHOLD * 2;
1817        let num_series = (num_rows / 100).max(1);
1818        let stream_ctx =
1819            new_test_stream_ctx(vec![new_test_file(num_rows, num_series)], vec![]).await;
1820        let range_meta = new_test_range_meta(smallvec![RowGroupIndex {
1821            index: 0,
1822            row_group_index: 0,
1823        }]);
1824
1825        assert_eq!(
1826            Some((num_rows / num_series) as usize),
1827            should_split_flat_batches_for_merge(&stream_ctx, &range_meta)
1828        );
1829    }
1830
1831    #[tokio::test]
1832    async fn test_should_split_flat_batches_for_merge_skips_small_or_unknown_series_files() {
1833        let stream_ctx = new_test_stream_ctx(
1834            vec![
1835                new_test_file(SPLIT_ROW_THRESHOLD.saturating_sub(1), 1),
1836                new_test_file(SPLIT_ROW_THRESHOLD * 2, 0),
1837            ],
1838            vec![],
1839        )
1840        .await;
1841        let range_meta = new_test_range_meta(smallvec![
1842            RowGroupIndex {
1843                index: 0,
1844                row_group_index: 0,
1845            },
1846            RowGroupIndex {
1847                index: 1,
1848                row_group_index: 0,
1849            }
1850        ]);
1851
1852        assert_eq!(
1853            None,
1854            should_split_flat_batches_for_merge(&stream_ctx, &range_meta)
1855        );
1856    }
1857
1858    #[tokio::test]
1859    async fn test_should_split_flat_batches_for_merge_returns_none_for_unsplittable_file() {
1860        let num_series =
1861            (SPLIT_ROW_THRESHOLD / (BATCH_SIZE_THRESHOLD - 1)).max(NUM_SERIES_THRESHOLD) + 1;
1862        let stream_ctx =
1863            new_test_stream_ctx(vec![new_test_file(SPLIT_ROW_THRESHOLD, num_series)], vec![]).await;
1864        let range_meta = new_test_range_meta(smallvec![RowGroupIndex {
1865            index: 0,
1866            row_group_index: 0,
1867        }]);
1868
1869        assert_eq!(
1870            None,
1871            should_split_flat_batches_for_merge(&stream_ctx, &range_meta)
1872        );
1873    }
1874
1875    #[tokio::test]
1876    async fn test_should_split_flat_batches_for_merge_falls_back_to_memtables() {
1877        let stream_ctx = new_test_stream_ctx(vec![], vec![new_test_memtable(5_000, 100)]).await;
1878        let range_meta = new_test_range_meta(smallvec![RowGroupIndex {
1879            index: 0,
1880            row_group_index: 0,
1881        }]);
1882
1883        assert_eq!(
1884            Some(50),
1885            should_split_flat_batches_for_merge(&stream_ctx, &range_meta)
1886        );
1887    }
1888
1889    #[tokio::test]
1890    async fn test_should_split_flat_batches_for_merge_clamps_estimate() {
1891        let stream_ctx =
1892            new_test_stream_ctx(vec![new_test_file(SPLIT_ROW_THRESHOLD * 2, 1)], vec![]).await;
1893        let range_meta = new_test_range_meta(smallvec![RowGroupIndex {
1894            index: 0,
1895            row_group_index: 0,
1896        }]);
1897
1898        assert_eq!(
1899            Some(DEFAULT_READ_BATCH_SIZE),
1900            should_split_flat_batches_for_merge(&stream_ctx, &range_meta)
1901        );
1902    }
1903
1904    #[test]
1905    fn test_compute_parallel_channel_size_clamps_to_max_for_small_batches() {
1906        assert_eq!(64, compute_parallel_channel_size(0));
1907        assert_eq!(64, compute_parallel_channel_size(1));
1908    }
1909
1910    #[test]
1911    fn test_compute_parallel_channel_size_returns_expected_mid_range_size() {
1912        assert_eq!(
1913            4,
1914            compute_parallel_channel_size(DEFAULT_READ_BATCH_SIZE / 2)
1915        );
1916    }
1917
1918    #[test]
1919    fn test_compute_parallel_channel_size_clamps_to_min_for_large_batches() {
1920        assert_eq!(2, compute_parallel_channel_size(DEFAULT_READ_BATCH_SIZE));
1921        assert_eq!(
1922            2,
1923            compute_parallel_channel_size(DEFAULT_READ_BATCH_SIZE * 2)
1924        );
1925    }
1926
1927    #[test]
1928    fn test_compute_average_batch_size_uses_arithmetic_mean() {
1929        assert_eq!(24, compute_average_batch_size([16, 24, 32]));
1930    }
1931
1932    #[test]
1933    fn test_compute_average_batch_size_clamps_values() {
1934        assert_eq!(
1935            DEFAULT_READ_BATCH_SIZE,
1936            compute_average_batch_size([DEFAULT_READ_BATCH_SIZE, DEFAULT_READ_BATCH_SIZE * 2])
1937        );
1938        assert_eq!(1, compute_average_batch_size([0, 1]));
1939    }
1940
1941    #[test]
1942    fn test_compute_average_batch_size_falls_back_when_empty() {
1943        assert_eq!(
1944            DEFAULT_READ_BATCH_SIZE,
1945            compute_average_batch_size(std::iter::empty())
1946        );
1947    }
1948}