Skip to main content

mito2/read/
scan_util.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities for scanners.
16
17use std::collections::{BinaryHeap, HashMap, VecDeque};
18use std::fmt;
19use std::pin::Pin;
20use std::sync::{Arc, Mutex};
21use std::task::{Context, Poll};
22use std::time::{Duration, Instant};
23
24use async_stream::try_stream;
25use common_telemetry::tracing;
26use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time};
27use datatypes::arrow::record_batch::RecordBatch;
28use datatypes::timestamp::timestamp_array_to_primitive;
29use futures::Stream;
30use prometheus::IntGauge;
31use smallvec::SmallVec;
32use store_api::storage::RegionId;
33
34use crate::error::Result;
35use crate::memtable::MemScanMetrics;
36use crate::metrics::{
37    IN_PROGRESS_SCAN, PRECISE_FILTER_ROWS_TOTAL, READ_BATCHES_RETURN, READ_ROW_GROUPS_TOTAL,
38    READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_RETURN, READ_STAGE_ELAPSED,
39};
40use crate::read::dedup::{DedupMetrics, DedupMetricsReport};
41use crate::read::flat_merge::{MergeMetrics, MergeMetricsReport};
42use crate::read::pruner::PartitionPruner;
43use crate::read::range::{RangeMeta, RowGroupIndex};
44use crate::read::scan_region::StreamContext;
45use crate::read::{BoxedRecordBatchStream, ScannerMetrics};
46use crate::sst::file::{FileTimeRange, RegionFileId};
47use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplyMetrics;
48use crate::sst::index::fulltext_index::applier::FulltextIndexApplyMetrics;
49use crate::sst::index::inverted_index::applier::InvertedIndexApplyMetrics;
50use crate::sst::parquet::file_range::{FileRange, PreFilterMode};
51use crate::sst::parquet::flat_format::time_index_column_index;
52use crate::sst::parquet::reader::{MetadataCacheMetrics, ReaderFilterMetrics, ReaderMetrics};
53use crate::sst::parquet::row_group::ParquetFetchMetrics;
54use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE};
55
56/// Per-file scan metrics.
57#[derive(Default, Clone)]
58pub struct FileScanMetrics {
59    /// Number of ranges (row groups) read from this file.
60    pub num_ranges: usize,
61    /// Number of rows read from this file.
62    pub num_rows: usize,
63    /// Time spent building file ranges/parts (file-level preparation).
64    pub build_part_cost: Duration,
65    /// Time spent building readers for this file (accumulated across all ranges).
66    pub build_reader_cost: Duration,
67    /// Time spent scanning this file (accumulated across all ranges).
68    pub scan_cost: Duration,
69}
70
71impl fmt::Debug for FileScanMetrics {
72    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73        write!(f, "{{\"build_part_cost\":\"{:?}\"", self.build_part_cost)?;
74
75        if self.num_ranges > 0 {
76            write!(f, ", \"num_ranges\":{}", self.num_ranges)?;
77        }
78        if self.num_rows > 0 {
79            write!(f, ", \"num_rows\":{}", self.num_rows)?;
80        }
81        if !self.build_reader_cost.is_zero() {
82            write!(
83                f,
84                ", \"build_reader_cost\":\"{:?}\"",
85                self.build_reader_cost
86            )?;
87        }
88        if !self.scan_cost.is_zero() {
89            write!(f, ", \"scan_cost\":\"{:?}\"", self.scan_cost)?;
90        }
91
92        write!(f, "}}")
93    }
94}
95
96impl FileScanMetrics {
97    /// Merges another FileMetrics into this one.
98    pub(crate) fn merge_from(&mut self, other: &FileScanMetrics) {
99        self.num_ranges += other.num_ranges;
100        self.num_rows += other.num_rows;
101        self.build_part_cost += other.build_part_cost;
102        self.build_reader_cost += other.build_reader_cost;
103        self.scan_cost += other.scan_cost;
104    }
105}
106
107/// Verbose scan metrics for a partition.
108#[derive(Default)]
109pub(crate) struct ScanMetricsSet {
110    /// Duration to prepare the scan task.
111    prepare_scan_cost: Duration,
112    /// Duration to build the (merge) reader.
113    build_reader_cost: Duration,
114    /// Duration to scan data.
115    scan_cost: Duration,
116    /// Duration while waiting for `yield`.
117    yield_cost: Duration,
118    /// Duration to convert [`Batch`]es.
119    convert_cost: Option<Time>,
120    /// Duration of the scan.
121    total_cost: Duration,
122    /// Number of rows returned.
123    num_rows: usize,
124    /// Number of batches returned.
125    num_batches: usize,
126    /// Number of mem ranges scanned.
127    num_mem_ranges: usize,
128    /// Number of file ranges scanned.
129    num_file_ranges: usize,
130
131    // Memtable related metrics:
132    /// Duration to scan memtables.
133    mem_scan_cost: Duration,
134    /// Number of rows read from memtables.
135    mem_rows: usize,
136    /// Number of batches read from memtables.
137    mem_batches: usize,
138    /// Number of series read from memtables.
139    mem_series: usize,
140    /// Duration of prefilter in memtable scan.
141    mem_prefilter_cost: Duration,
142    /// Number of rows filtered by prefilter in memtable scan.
143    mem_prefilter_rows_filtered: usize,
144
145    // SST related metrics:
146    /// Duration to build file ranges.
147    build_parts_cost: Duration,
148    /// Duration to scan SST files.
149    sst_scan_cost: Duration,
150    /// Number of row groups before filtering.
151    rg_total: usize,
152    /// Number of row groups filtered by fulltext index.
153    rg_fulltext_filtered: usize,
154    /// Number of row groups filtered by inverted index.
155    rg_inverted_filtered: usize,
156    /// Number of row groups filtered by min-max index.
157    rg_minmax_filtered: usize,
158    /// Number of row groups filtered by bloom filter index.
159    rg_bloom_filtered: usize,
160    /// Number of row groups filtered by vector index.
161    rg_vector_filtered: usize,
162    /// Number of rows in row group before filtering.
163    rows_before_filter: usize,
164    /// Number of rows in row group filtered by fulltext index.
165    rows_fulltext_filtered: usize,
166    /// Number of rows in row group filtered by inverted index.
167    rows_inverted_filtered: usize,
168    /// Number of rows in row group filtered by bloom filter index.
169    rows_bloom_filtered: usize,
170    /// Number of rows filtered by vector index.
171    rows_vector_filtered: usize,
172    /// Number of rows selected by vector index.
173    rows_vector_selected: usize,
174    /// Number of rows filtered by precise filter.
175    rows_precise_filtered: usize,
176    /// Number of index result cache hits for fulltext index.
177    fulltext_index_cache_hit: usize,
178    /// Number of index result cache misses for fulltext index.
179    fulltext_index_cache_miss: usize,
180    /// Number of index result cache hits for inverted index.
181    inverted_index_cache_hit: usize,
182    /// Number of index result cache misses for inverted index.
183    inverted_index_cache_miss: usize,
184    /// Number of index result cache hits for bloom filter index.
185    bloom_filter_cache_hit: usize,
186    /// Number of index result cache misses for bloom filter index.
187    bloom_filter_cache_miss: usize,
188    /// Number of index result cache hits for minmax pruning.
189    minmax_cache_hit: usize,
190    /// Number of index result cache misses for minmax pruning.
191    minmax_cache_miss: usize,
192    /// Number of pruner builder cache hits.
193    pruner_cache_hit: usize,
194    /// Number of pruner builder cache misses.
195    pruner_cache_miss: usize,
196    /// Duration spent waiting for pruner to build file ranges.
197    pruner_prune_cost: Duration,
198    /// Number of files filtered by manifest time-range pruning.
199    files_time_range_pruned: usize,
200    /// Number of record batches read from SST.
201    num_sst_record_batches: usize,
202    /// Number of batches decoded from SST.
203    num_sst_batches: usize,
204    /// Number of rows read from SST.
205    num_sst_rows: usize,
206
207    /// Elapsed time before the first poll operation.
208    first_poll: Duration,
209
210    /// Number of send timeout in SeriesScan.
211    num_series_send_timeout: usize,
212    /// Number of send full in SeriesScan.
213    num_series_send_full: usize,
214    /// Number of rows the series distributor scanned.
215    num_distributor_rows: usize,
216    /// Number of batches the series distributor scanned.
217    num_distributor_batches: usize,
218    /// Duration of the series distributor to scan.
219    distributor_scan_cost: Duration,
220    /// Duration of the series distributor to yield.
221    distributor_yield_cost: Duration,
222    /// Duration spent in divider operations.
223    distributor_divider_cost: Duration,
224
225    /// Merge metrics.
226    merge_metrics: MergeMetrics,
227    /// Dedup metrics.
228    dedup_metrics: DedupMetrics,
229
230    /// The stream reached EOF
231    stream_eof: bool,
232
233    // Optional verbose metrics:
234    /// Inverted index apply metrics.
235    inverted_index_apply_metrics: Option<InvertedIndexApplyMetrics>,
236    /// Bloom filter index apply metrics.
237    bloom_filter_apply_metrics: Option<BloomFilterIndexApplyMetrics>,
238    /// Fulltext index apply metrics.
239    fulltext_index_apply_metrics: Option<FulltextIndexApplyMetrics>,
240    /// Parquet fetch metrics.
241    fetch_metrics: Option<ParquetFetchMetrics>,
242    /// Metadata cache metrics.
243    metadata_cache_metrics: Option<MetadataCacheMetrics>,
244    /// Per-file scan metrics, only populated when explain_verbose is true.
245    per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
246
247    /// Current memory usage for file range builders.
248    build_ranges_mem_size: isize,
249    /// Peak memory usage for file range builders.
250    build_ranges_peak_mem_size: isize,
251    /// Current number of file range builders.
252    num_range_builders: isize,
253    /// Peak number of file range builders.
254    num_peak_range_builders: isize,
255    /// Total bytes added to the range cache during this scan.
256    range_cache_size: usize,
257    /// Number of range cache hits during this scan.
258    range_cache_hit: usize,
259    /// Number of range cache misses during this scan.
260    range_cache_miss: usize,
261}
262
263/// Wrapper for file metrics that compares by total cost in reverse order.
264/// This allows using BinaryHeap as a min-heap for efficient top-K selection.
265struct CompareCostReverse<'a> {
266    total_cost: Duration,
267    file_id: RegionFileId,
268    metrics: &'a FileScanMetrics,
269}
270
271impl Ord for CompareCostReverse<'_> {
272    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
273        // Reverse comparison: smaller costs are "greater"
274        other.total_cost.cmp(&self.total_cost)
275    }
276}
277
278impl PartialOrd for CompareCostReverse<'_> {
279    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
280        Some(self.cmp(other))
281    }
282}
283
284impl Eq for CompareCostReverse<'_> {}
285
286impl PartialEq for CompareCostReverse<'_> {
287    fn eq(&self, other: &Self) -> bool {
288        self.total_cost == other.total_cost
289    }
290}
291
292impl fmt::Debug for ScanMetricsSet {
293    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
294        let ScanMetricsSet {
295            prepare_scan_cost,
296            build_reader_cost,
297            scan_cost,
298            yield_cost,
299            convert_cost,
300            total_cost,
301            num_rows,
302            num_batches,
303            num_mem_ranges,
304            num_file_ranges,
305            build_parts_cost,
306            sst_scan_cost,
307            rg_total,
308            rg_fulltext_filtered,
309            rg_inverted_filtered,
310            rg_minmax_filtered,
311            rg_bloom_filtered,
312            rg_vector_filtered,
313            rows_before_filter,
314            rows_fulltext_filtered,
315            rows_inverted_filtered,
316            rows_bloom_filtered,
317            rows_vector_filtered,
318            rows_vector_selected,
319            rows_precise_filtered,
320            fulltext_index_cache_hit,
321            fulltext_index_cache_miss,
322            inverted_index_cache_hit,
323            inverted_index_cache_miss,
324            bloom_filter_cache_hit,
325            bloom_filter_cache_miss,
326            minmax_cache_hit,
327            minmax_cache_miss,
328            pruner_cache_hit,
329            pruner_cache_miss,
330            pruner_prune_cost,
331            files_time_range_pruned,
332            num_sst_record_batches,
333            num_sst_batches,
334            num_sst_rows,
335            first_poll,
336            num_series_send_timeout,
337            num_series_send_full,
338            num_distributor_rows,
339            num_distributor_batches,
340            distributor_scan_cost,
341            distributor_yield_cost,
342            distributor_divider_cost,
343            merge_metrics,
344            dedup_metrics,
345            stream_eof,
346            mem_scan_cost,
347            mem_rows,
348            mem_batches,
349            mem_series,
350            mem_prefilter_cost,
351            mem_prefilter_rows_filtered,
352            inverted_index_apply_metrics,
353            bloom_filter_apply_metrics,
354            fulltext_index_apply_metrics,
355            fetch_metrics,
356            metadata_cache_metrics,
357            per_file_metrics,
358            build_ranges_mem_size: _,
359            build_ranges_peak_mem_size,
360            num_range_builders: _,
361            num_peak_range_builders,
362            range_cache_size,
363            range_cache_hit,
364            range_cache_miss,
365        } = self;
366
367        // Write core metrics
368        write!(
369            f,
370            "{{\"prepare_scan_cost\":\"{prepare_scan_cost:?}\", \
371            \"build_reader_cost\":\"{build_reader_cost:?}\", \
372            \"scan_cost\":\"{scan_cost:?}\", \
373            \"yield_cost\":\"{yield_cost:?}\", \
374            \"total_cost\":\"{total_cost:?}\", \
375            \"num_rows\":{num_rows}, \
376            \"num_batches\":{num_batches}, \
377            \"num_mem_ranges\":{num_mem_ranges}, \
378            \"num_file_ranges\":{num_file_ranges}, \
379            \"build_parts_cost\":\"{build_parts_cost:?}\", \
380            \"sst_scan_cost\":\"{sst_scan_cost:?}\", \
381            \"rg_total\":{rg_total}, \
382            \"rows_before_filter\":{rows_before_filter}, \
383            \"num_sst_record_batches\":{num_sst_record_batches}, \
384            \"num_sst_batches\":{num_sst_batches}, \
385            \"num_sst_rows\":{num_sst_rows}, \
386            \"first_poll\":\"{first_poll:?}\""
387        )?;
388
389        // Write convert_cost if present
390        if let Some(time) = convert_cost {
391            let duration = Duration::from_nanos(time.value() as u64);
392            write!(f, ", \"convert_cost\":\"{duration:?}\"")?;
393        }
394
395        // Write non-zero filter counters
396        if *files_time_range_pruned > 0 {
397            write!(f, ", \"files_time_range_pruned\":{files_time_range_pruned}")?;
398        }
399        if *rg_fulltext_filtered > 0 {
400            write!(f, ", \"rg_fulltext_filtered\":{rg_fulltext_filtered}")?;
401        }
402        if *rg_inverted_filtered > 0 {
403            write!(f, ", \"rg_inverted_filtered\":{rg_inverted_filtered}")?;
404        }
405        if *rg_minmax_filtered > 0 {
406            write!(f, ", \"rg_minmax_filtered\":{rg_minmax_filtered}")?;
407        }
408        if *rg_bloom_filtered > 0 {
409            write!(f, ", \"rg_bloom_filtered\":{rg_bloom_filtered}")?;
410        }
411        if *rg_vector_filtered > 0 {
412            write!(f, ", \"rg_vector_filtered\":{rg_vector_filtered}")?;
413        }
414        if *rows_fulltext_filtered > 0 {
415            write!(f, ", \"rows_fulltext_filtered\":{rows_fulltext_filtered}")?;
416        }
417        if *rows_inverted_filtered > 0 {
418            write!(f, ", \"rows_inverted_filtered\":{rows_inverted_filtered}")?;
419        }
420        if *rows_bloom_filtered > 0 {
421            write!(f, ", \"rows_bloom_filtered\":{rows_bloom_filtered}")?;
422        }
423        if *rows_vector_filtered > 0 {
424            write!(f, ", \"rows_vector_filtered\":{rows_vector_filtered}")?;
425        }
426        if *rows_vector_selected > 0 {
427            write!(f, ", \"rows_vector_selected\":{rows_vector_selected}")?;
428        }
429        if *rows_precise_filtered > 0 {
430            write!(f, ", \"rows_precise_filtered\":{rows_precise_filtered}")?;
431        }
432        if *fulltext_index_cache_hit > 0 {
433            write!(
434                f,
435                ", \"fulltext_index_cache_hit\":{fulltext_index_cache_hit}"
436            )?;
437        }
438        if *fulltext_index_cache_miss > 0 {
439            write!(
440                f,
441                ", \"fulltext_index_cache_miss\":{fulltext_index_cache_miss}"
442            )?;
443        }
444        if *inverted_index_cache_hit > 0 {
445            write!(
446                f,
447                ", \"inverted_index_cache_hit\":{inverted_index_cache_hit}"
448            )?;
449        }
450        if *inverted_index_cache_miss > 0 {
451            write!(
452                f,
453                ", \"inverted_index_cache_miss\":{inverted_index_cache_miss}"
454            )?;
455        }
456        if *bloom_filter_cache_hit > 0 {
457            write!(f, ", \"bloom_filter_cache_hit\":{bloom_filter_cache_hit}")?;
458        }
459        if *bloom_filter_cache_miss > 0 {
460            write!(f, ", \"bloom_filter_cache_miss\":{bloom_filter_cache_miss}")?;
461        }
462        if *minmax_cache_hit > 0 {
463            write!(f, ", \"minmax_cache_hit\":{minmax_cache_hit}")?;
464        }
465        if *minmax_cache_miss > 0 {
466            write!(f, ", \"minmax_cache_miss\":{minmax_cache_miss}")?;
467        }
468        if *pruner_cache_hit > 0 {
469            write!(f, ", \"pruner_cache_hit\":{pruner_cache_hit}")?;
470        }
471        if *pruner_cache_miss > 0 {
472            write!(f, ", \"pruner_cache_miss\":{pruner_cache_miss}")?;
473        }
474        if !pruner_prune_cost.is_zero() {
475            write!(f, ", \"pruner_prune_cost\":\"{pruner_prune_cost:?}\"")?;
476        }
477
478        // Write non-zero distributor metrics
479        if *num_series_send_timeout > 0 {
480            write!(f, ", \"num_series_send_timeout\":{num_series_send_timeout}")?;
481        }
482        if *num_series_send_full > 0 {
483            write!(f, ", \"num_series_send_full\":{num_series_send_full}")?;
484        }
485        if *num_distributor_rows > 0 {
486            write!(f, ", \"num_distributor_rows\":{num_distributor_rows}")?;
487        }
488        if *num_distributor_batches > 0 {
489            write!(f, ", \"num_distributor_batches\":{num_distributor_batches}")?;
490        }
491        if !distributor_scan_cost.is_zero() {
492            write!(
493                f,
494                ", \"distributor_scan_cost\":\"{distributor_scan_cost:?}\""
495            )?;
496        }
497        if !distributor_yield_cost.is_zero() {
498            write!(
499                f,
500                ", \"distributor_yield_cost\":\"{distributor_yield_cost:?}\""
501            )?;
502        }
503        if !distributor_divider_cost.is_zero() {
504            write!(
505                f,
506                ", \"distributor_divider_cost\":\"{distributor_divider_cost:?}\""
507            )?;
508        }
509
510        // Write non-zero memtable metrics
511        if *mem_rows > 0 {
512            write!(f, ", \"mem_rows\":{mem_rows}")?;
513        }
514        if *mem_batches > 0 {
515            write!(f, ", \"mem_batches\":{mem_batches}")?;
516        }
517        if *mem_series > 0 {
518            write!(f, ", \"mem_series\":{mem_series}")?;
519        }
520        if !mem_scan_cost.is_zero() {
521            write!(f, ", \"mem_scan_cost\":\"{mem_scan_cost:?}\"")?;
522        }
523        if !mem_prefilter_cost.is_zero() {
524            write!(f, ", \"mem_prefilter_cost\":\"{mem_prefilter_cost:?}\"")?;
525        }
526        if *mem_prefilter_rows_filtered > 0 {
527            write!(
528                f,
529                ", \"mem_prefilter_rows_filtered\":{mem_prefilter_rows_filtered}"
530            )?;
531        }
532
533        // Write optional verbose metrics if they are not empty
534        if let Some(metrics) = inverted_index_apply_metrics
535            && !metrics.is_empty()
536        {
537            write!(f, ", \"inverted_index_apply_metrics\":{:?}", metrics)?;
538        }
539        if let Some(metrics) = bloom_filter_apply_metrics
540            && !metrics.is_empty()
541        {
542            write!(f, ", \"bloom_filter_apply_metrics\":{:?}", metrics)?;
543        }
544        if let Some(metrics) = fulltext_index_apply_metrics
545            && !metrics.is_empty()
546        {
547            write!(f, ", \"fulltext_index_apply_metrics\":{:?}", metrics)?;
548        }
549        if let Some(metrics) = fetch_metrics
550            && !metrics.is_empty()
551        {
552            write!(f, ", \"fetch_metrics\":{:?}", metrics)?;
553        }
554        if let Some(metrics) = metadata_cache_metrics
555            && !metrics.is_empty()
556        {
557            write!(f, ", \"metadata_cache_metrics\":{:?}", metrics)?;
558        }
559
560        // Write merge metrics if not empty
561        if !merge_metrics.scan_cost.is_zero() {
562            write!(f, ", \"merge_metrics\":{:?}", merge_metrics)?;
563        }
564
565        // Write dedup metrics if not empty
566        if !dedup_metrics.dedup_cost.is_zero() {
567            write!(f, ", \"dedup_metrics\":{:?}", dedup_metrics)?;
568        }
569
570        // Write top file metrics if present and non-empty
571        if let Some(file_metrics) = per_file_metrics
572            && !file_metrics.is_empty()
573        {
574            // Use min-heap (BinaryHeap with reverse comparison) to keep only top 10
575            let mut heap = BinaryHeap::new();
576            for (file_id, metrics) in file_metrics.iter() {
577                let total_cost =
578                    metrics.build_part_cost + metrics.build_reader_cost + metrics.scan_cost;
579
580                // If the file has been pruned by a pruner, the build part cost may be zero.
581                // If we didn't read any ranges from it, we don't output the file.
582                if total_cost.is_zero() && metrics.num_ranges == 0 {
583                    continue;
584                }
585
586                if heap.len() < 10 {
587                    // Haven't reached 10 yet, just push
588                    heap.push(CompareCostReverse {
589                        total_cost,
590                        file_id: *file_id,
591                        metrics,
592                    });
593                } else if let Some(min_entry) = heap.peek() {
594                    // If current cost is higher than the minimum in our top-10, replace it
595                    if total_cost > min_entry.total_cost {
596                        heap.pop();
597                        heap.push(CompareCostReverse {
598                            total_cost,
599                            file_id: *file_id,
600                            metrics,
601                        });
602                    }
603                }
604            }
605
606            let top_files = heap.into_sorted_vec();
607            write!(f, ", \"top_file_metrics\": {{")?;
608            for (i, item) in top_files.iter().enumerate() {
609                let CompareCostReverse {
610                    total_cost: _,
611                    file_id,
612                    metrics,
613                } = item;
614                if i > 0 {
615                    write!(f, ", ")?;
616                }
617                write!(f, "\"{}\": {:?}", file_id, metrics)?;
618            }
619            write!(f, "}}")?;
620        }
621
622        if *range_cache_size > 0 {
623            write!(f, ", \"range_cache_size\":{range_cache_size}")?;
624        }
625        if *range_cache_hit > 0 {
626            write!(f, ", \"range_cache_hit\":{range_cache_hit}")?;
627        }
628        if *range_cache_miss > 0 {
629            write!(f, ", \"range_cache_miss\":{range_cache_miss}")?;
630        }
631
632        write!(
633            f,
634            ", \"build_ranges_peak_mem_size\":{build_ranges_peak_mem_size}, \
635             \"num_peak_range_builders\":{num_peak_range_builders}, \
636             \"stream_eof\":{stream_eof}}}"
637        )
638    }
639}
640impl ScanMetricsSet {
641    /// Attaches the `prepare_scan_cost` to the metrics set.
642    fn with_prepare_scan_cost(mut self, cost: Duration) -> Self {
643        self.prepare_scan_cost += cost;
644        self
645    }
646
647    /// Attaches the `convert_cost` to the metrics set.
648    fn with_convert_cost(mut self, time: Time) -> Self {
649        self.convert_cost = Some(time);
650        self
651    }
652
653    /// Merges the local scanner metrics.
654    fn merge_scanner_metrics(&mut self, other: &ScannerMetrics) {
655        let ScannerMetrics {
656            scan_cost,
657            yield_cost,
658            num_batches,
659            num_rows,
660        } = other;
661
662        self.scan_cost += *scan_cost;
663        self.yield_cost += *yield_cost;
664        self.num_rows += *num_rows;
665        self.num_batches += *num_batches;
666    }
667
668    /// Merges the local reader metrics.
669    fn merge_reader_metrics(&mut self, other: &ReaderMetrics) {
670        let ReaderMetrics {
671            build_cost,
672            filter_metrics:
673                ReaderFilterMetrics {
674                    rg_total,
675                    rg_fulltext_filtered,
676                    rg_inverted_filtered,
677                    rg_minmax_filtered,
678                    rg_bloom_filtered,
679                    rg_vector_filtered,
680                    rows_total,
681                    rows_fulltext_filtered,
682                    rows_inverted_filtered,
683                    rows_bloom_filtered,
684                    rows_vector_filtered,
685                    rows_vector_selected,
686                    rows_precise_filtered,
687                    fulltext_index_cache_hit,
688                    fulltext_index_cache_miss,
689                    inverted_index_cache_hit,
690                    inverted_index_cache_miss,
691                    bloom_filter_cache_hit,
692                    bloom_filter_cache_miss,
693                    minmax_cache_hit,
694                    minmax_cache_miss,
695                    pruner_cache_hit,
696                    pruner_cache_miss,
697                    pruner_prune_cost,
698                    files_time_range_pruned,
699                    inverted_index_apply_metrics,
700                    bloom_filter_apply_metrics,
701                    fulltext_index_apply_metrics,
702                },
703            num_record_batches,
704            num_batches,
705            num_rows,
706            scan_cost,
707            metadata_cache_metrics,
708            fetch_metrics,
709            metadata_mem_size,
710            num_range_builders,
711        } = other;
712
713        self.build_parts_cost += *build_cost;
714        self.sst_scan_cost += *scan_cost;
715
716        self.files_time_range_pruned += *files_time_range_pruned;
717
718        self.rg_total += *rg_total;
719        self.rg_fulltext_filtered += *rg_fulltext_filtered;
720        self.rg_inverted_filtered += *rg_inverted_filtered;
721        self.rg_minmax_filtered += *rg_minmax_filtered;
722        self.rg_bloom_filtered += *rg_bloom_filtered;
723        self.rg_vector_filtered += *rg_vector_filtered;
724
725        self.rows_before_filter += *rows_total;
726        self.rows_fulltext_filtered += *rows_fulltext_filtered;
727        self.rows_inverted_filtered += *rows_inverted_filtered;
728        self.rows_bloom_filtered += *rows_bloom_filtered;
729        self.rows_vector_filtered += *rows_vector_filtered;
730        self.rows_vector_selected += *rows_vector_selected;
731        self.rows_precise_filtered += *rows_precise_filtered;
732
733        self.fulltext_index_cache_hit += *fulltext_index_cache_hit;
734        self.fulltext_index_cache_miss += *fulltext_index_cache_miss;
735        self.inverted_index_cache_hit += *inverted_index_cache_hit;
736        self.inverted_index_cache_miss += *inverted_index_cache_miss;
737        self.bloom_filter_cache_hit += *bloom_filter_cache_hit;
738        self.bloom_filter_cache_miss += *bloom_filter_cache_miss;
739        self.minmax_cache_hit += *minmax_cache_hit;
740        self.minmax_cache_miss += *minmax_cache_miss;
741        self.pruner_cache_hit += *pruner_cache_hit;
742        self.pruner_cache_miss += *pruner_cache_miss;
743        self.pruner_prune_cost += *pruner_prune_cost;
744
745        self.num_sst_record_batches += *num_record_batches;
746        self.num_sst_batches += *num_batches;
747        self.num_sst_rows += *num_rows;
748
749        // Merge optional verbose metrics
750        if let Some(metrics) = inverted_index_apply_metrics {
751            self.inverted_index_apply_metrics
752                .get_or_insert_with(InvertedIndexApplyMetrics::default)
753                .merge_from(metrics);
754        }
755        if let Some(metrics) = bloom_filter_apply_metrics {
756            self.bloom_filter_apply_metrics
757                .get_or_insert_with(BloomFilterIndexApplyMetrics::default)
758                .merge_from(metrics);
759        }
760        if let Some(metrics) = fulltext_index_apply_metrics {
761            self.fulltext_index_apply_metrics
762                .get_or_insert_with(FulltextIndexApplyMetrics::default)
763                .merge_from(metrics);
764        }
765        if let Some(metrics) = fetch_metrics {
766            self.fetch_metrics
767                .get_or_insert_with(ParquetFetchMetrics::default)
768                .merge_from(metrics);
769        }
770        self.metadata_cache_metrics
771            .get_or_insert_with(MetadataCacheMetrics::default)
772            .merge_from(metadata_cache_metrics);
773
774        // Track memory usage and update peak.
775        self.build_ranges_mem_size += *metadata_mem_size;
776        if self.build_ranges_mem_size > self.build_ranges_peak_mem_size {
777            self.build_ranges_peak_mem_size = self.build_ranges_mem_size;
778        }
779
780        // Track number of builders and update peak.
781        self.num_range_builders += *num_range_builders;
782        if self.num_range_builders > self.num_peak_range_builders {
783            self.num_peak_range_builders = self.num_range_builders;
784        }
785    }
786
787    /// Merges per-file metrics.
788    fn merge_per_file_metrics(&mut self, other: &HashMap<RegionFileId, FileScanMetrics>) {
789        let self_file_metrics = self.per_file_metrics.get_or_insert_with(HashMap::new);
790        for (file_id, metrics) in other {
791            self_file_metrics
792                .entry(*file_id)
793                .or_default()
794                .merge_from(metrics);
795        }
796    }
797
798    /// Sets distributor metrics.
799    fn set_distributor_metrics(&mut self, distributor_metrics: &SeriesDistributorMetrics) {
800        let SeriesDistributorMetrics {
801            num_series_send_timeout,
802            num_series_send_full,
803            num_rows,
804            num_batches,
805            scan_cost,
806            yield_cost,
807            divider_cost,
808        } = distributor_metrics;
809
810        self.num_series_send_timeout += *num_series_send_timeout;
811        self.num_series_send_full += *num_series_send_full;
812        self.num_distributor_rows += *num_rows;
813        self.num_distributor_batches += *num_batches;
814        self.distributor_scan_cost += *scan_cost;
815        self.distributor_yield_cost += *yield_cost;
816        self.distributor_divider_cost += *divider_cost;
817    }
818
819    /// Observes metrics.
820    fn observe_metrics(&self) {
821        READ_STAGE_ELAPSED
822            .with_label_values(&["prepare_scan"])
823            .observe(self.prepare_scan_cost.as_secs_f64());
824        READ_STAGE_ELAPSED
825            .with_label_values(&["build_reader"])
826            .observe(self.build_reader_cost.as_secs_f64());
827        READ_STAGE_ELAPSED
828            .with_label_values(&["scan"])
829            .observe(self.scan_cost.as_secs_f64());
830        READ_STAGE_ELAPSED
831            .with_label_values(&["yield"])
832            .observe(self.yield_cost.as_secs_f64());
833        if let Some(time) = &self.convert_cost {
834            READ_STAGE_ELAPSED
835                .with_label_values(&["convert"])
836                .observe(Duration::from_nanos(time.value() as u64).as_secs_f64());
837        }
838        READ_STAGE_ELAPSED
839            .with_label_values(&["total"])
840            .observe(self.total_cost.as_secs_f64());
841        READ_ROWS_RETURN.observe(self.num_rows as f64);
842        READ_BATCHES_RETURN.observe(self.num_batches as f64);
843
844        READ_STAGE_ELAPSED
845            .with_label_values(&["build_parts"])
846            .observe(self.build_parts_cost.as_secs_f64());
847
848        READ_ROW_GROUPS_TOTAL
849            .with_label_values(&["before_filtering"])
850            .inc_by(self.rg_total as u64);
851        READ_ROW_GROUPS_TOTAL
852            .with_label_values(&["fulltext_index_filtered"])
853            .inc_by(self.rg_fulltext_filtered as u64);
854        READ_ROW_GROUPS_TOTAL
855            .with_label_values(&["inverted_index_filtered"])
856            .inc_by(self.rg_inverted_filtered as u64);
857        READ_ROW_GROUPS_TOTAL
858            .with_label_values(&["minmax_index_filtered"])
859            .inc_by(self.rg_minmax_filtered as u64);
860        READ_ROW_GROUPS_TOTAL
861            .with_label_values(&["bloom_filter_index_filtered"])
862            .inc_by(self.rg_bloom_filtered as u64);
863        #[cfg(feature = "vector_index")]
864        READ_ROW_GROUPS_TOTAL
865            .with_label_values(&["vector_index_filtered"])
866            .inc_by(self.rg_vector_filtered as u64);
867
868        PRECISE_FILTER_ROWS_TOTAL
869            .with_label_values(&["parquet"])
870            .inc_by(self.rows_precise_filtered as u64);
871        READ_ROWS_IN_ROW_GROUP_TOTAL
872            .with_label_values(&["before_filtering"])
873            .inc_by(self.rows_before_filter as u64);
874        READ_ROWS_IN_ROW_GROUP_TOTAL
875            .with_label_values(&["fulltext_index_filtered"])
876            .inc_by(self.rows_fulltext_filtered as u64);
877        READ_ROWS_IN_ROW_GROUP_TOTAL
878            .with_label_values(&["inverted_index_filtered"])
879            .inc_by(self.rows_inverted_filtered as u64);
880        READ_ROWS_IN_ROW_GROUP_TOTAL
881            .with_label_values(&["bloom_filter_index_filtered"])
882            .inc_by(self.rows_bloom_filtered as u64);
883        #[cfg(feature = "vector_index")]
884        READ_ROWS_IN_ROW_GROUP_TOTAL
885            .with_label_values(&["vector_index_filtered"])
886            .inc_by(self.rows_vector_filtered as u64);
887    }
888}
889
890struct PartitionMetricsInner {
891    region_id: RegionId,
892    /// Index of the partition to scan.
893    partition: usize,
894    /// Label to distinguish different scan operation.
895    scanner_type: &'static str,
896    /// Query start time.
897    query_start: Instant,
898    /// Whether to use verbose logging.
899    explain_verbose: bool,
900    /// Verbose scan metrics that only log to debug logs by default.
901    metrics: Mutex<ScanMetricsSet>,
902    in_progress_scan: IntGauge,
903
904    // Normal metrics that always report to the [ExecutionPlanMetricsSet]:
905    /// Duration to build file ranges.
906    build_parts_cost: Time,
907    /// Duration to build the (merge) reader.
908    build_reader_cost: Time,
909    /// Duration to scan data.
910    scan_cost: Time,
911    /// Duration while waiting for `yield`.
912    yield_cost: Time,
913    /// Duration to convert [`Batch`]es.
914    convert_cost: Time,
915    /// Aggregated compute time reported to DataFusion.
916    elapsed_compute: Time,
917}
918
919impl PartitionMetricsInner {
920    fn on_finish(&self, stream_eof: bool) {
921        let mut metrics = self.metrics.lock().unwrap();
922        if metrics.total_cost.is_zero() {
923            metrics.total_cost = self.query_start.elapsed();
924        }
925        if !metrics.stream_eof {
926            metrics.stream_eof = stream_eof;
927        }
928    }
929}
930
931impl MergeMetricsReport for PartitionMetricsInner {
932    fn report(&self, metrics: &mut MergeMetrics) {
933        let mut scan_metrics = self.metrics.lock().unwrap();
934        // Merge the metrics into scan_metrics
935        scan_metrics.merge_metrics.merge(metrics);
936
937        // Reset the input metrics
938        *metrics = MergeMetrics::default();
939    }
940}
941
942impl DedupMetricsReport for PartitionMetricsInner {
943    fn report(&self, metrics: &mut DedupMetrics) {
944        let mut scan_metrics = self.metrics.lock().unwrap();
945        // Merge the metrics into scan_metrics
946        scan_metrics.dedup_metrics.merge(metrics);
947
948        // Reset the input metrics
949        *metrics = DedupMetrics::default();
950    }
951}
952
953impl Drop for PartitionMetricsInner {
954    fn drop(&mut self) {
955        self.on_finish(false);
956        let metrics = self.metrics.lock().unwrap();
957        metrics.observe_metrics();
958        self.in_progress_scan.dec();
959
960        if self.explain_verbose {
961            common_telemetry::info!(
962                "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}",
963                self.scanner_type,
964                self.region_id,
965                self.partition,
966                metrics,
967            );
968        } else {
969            common_telemetry::debug!(
970                "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}",
971                self.scanner_type,
972                self.region_id,
973                self.partition,
974                metrics,
975            );
976        }
977    }
978}
979
980/// List of PartitionMetrics.
981#[derive(Default)]
982pub(crate) struct PartitionMetricsList(Mutex<Vec<Option<PartitionMetrics>>>);
983
984impl PartitionMetricsList {
985    /// Sets a new [PartitionMetrics] at the specified partition.
986    pub(crate) fn set(&self, partition: usize, metrics: PartitionMetrics) {
987        let mut list = self.0.lock().unwrap();
988        if list.len() <= partition {
989            list.resize(partition + 1, None);
990        }
991        list[partition] = Some(metrics);
992    }
993
994    /// Format verbose metrics for each partition for explain.
995    pub(crate) fn format_verbose_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result {
996        let list = self.0.lock().unwrap();
997        write!(f, ", \"metrics_per_partition\": ")?;
998        f.debug_list()
999            .entries(list.iter().filter_map(|p| p.as_ref()))
1000            .finish()?;
1001        write!(f, "}}")
1002    }
1003}
1004
1005/// Metrics while reading a partition.
1006#[derive(Clone)]
1007pub struct PartitionMetrics(Arc<PartitionMetricsInner>);
1008
1009impl PartitionMetrics {
1010    pub(crate) fn new(
1011        region_id: RegionId,
1012        partition: usize,
1013        scanner_type: &'static str,
1014        query_start: Instant,
1015        explain_verbose: bool,
1016        metrics_set: &ExecutionPlanMetricsSet,
1017    ) -> Self {
1018        let partition_str = partition.to_string();
1019        let in_progress_scan = IN_PROGRESS_SCAN.with_label_values(&[scanner_type, &partition_str]);
1020        in_progress_scan.inc();
1021        let convert_cost = MetricBuilder::new(metrics_set).subset_time("convert_cost", partition);
1022        let metrics = ScanMetricsSet::default()
1023            .with_prepare_scan_cost(query_start.elapsed())
1024            .with_convert_cost(convert_cost.clone());
1025        let inner = PartitionMetricsInner {
1026            region_id,
1027            partition,
1028            scanner_type,
1029            query_start,
1030            explain_verbose,
1031            metrics: Mutex::new(metrics),
1032            in_progress_scan,
1033            build_parts_cost: MetricBuilder::new(metrics_set)
1034                .subset_time("build_parts_cost", partition),
1035            build_reader_cost: MetricBuilder::new(metrics_set)
1036                .subset_time("build_reader_cost", partition),
1037            scan_cost: MetricBuilder::new(metrics_set).subset_time("scan_cost", partition),
1038            yield_cost: MetricBuilder::new(metrics_set).subset_time("yield_cost", partition),
1039            convert_cost,
1040            elapsed_compute: MetricBuilder::new(metrics_set).elapsed_compute(partition),
1041        };
1042        Self(Arc::new(inner))
1043    }
1044
1045    pub(crate) fn on_first_poll(&self) {
1046        let mut metrics = self.0.metrics.lock().unwrap();
1047        metrics.first_poll = self.0.query_start.elapsed();
1048    }
1049
1050    pub(crate) fn inc_num_mem_ranges(&self, num: usize) {
1051        let mut metrics = self.0.metrics.lock().unwrap();
1052        metrics.num_mem_ranges += num;
1053    }
1054
1055    pub fn inc_num_file_ranges(&self, num: usize) {
1056        let mut metrics = self.0.metrics.lock().unwrap();
1057        metrics.num_file_ranges += num;
1058    }
1059
1060    fn record_elapsed_compute(&self, duration: Duration) {
1061        if duration.is_zero() {
1062            return;
1063        }
1064        self.0.elapsed_compute.add_duration(duration);
1065    }
1066
1067    /// Merges `build_reader_cost`.
1068    pub(crate) fn inc_build_reader_cost(&self, cost: Duration) {
1069        self.0.build_reader_cost.add_duration(cost);
1070
1071        let mut metrics = self.0.metrics.lock().unwrap();
1072        metrics.build_reader_cost += cost;
1073    }
1074
1075    pub(crate) fn inc_convert_batch_cost(&self, cost: Duration) {
1076        self.0.convert_cost.add_duration(cost);
1077        self.record_elapsed_compute(cost);
1078    }
1079
1080    /// Reports memtable scan metrics.
1081    pub(crate) fn report_mem_scan_metrics(&self, data: &crate::memtable::MemScanMetricsData) {
1082        let mut metrics = self.0.metrics.lock().unwrap();
1083        metrics.mem_scan_cost += data.scan_cost;
1084        metrics.mem_rows += data.num_rows;
1085        metrics.mem_batches += data.num_batches;
1086        metrics.mem_series += data.total_series;
1087        metrics.mem_prefilter_cost += data.prefilter_cost;
1088        metrics.mem_prefilter_rows_filtered += data.prefilter_rows_filtered;
1089    }
1090
1091    /// Merges [ScannerMetrics], `build_reader_cost`, `scan_cost` and `yield_cost`.
1092    pub(crate) fn merge_metrics(&self, metrics: &ScannerMetrics) {
1093        self.0.scan_cost.add_duration(metrics.scan_cost);
1094        self.record_elapsed_compute(metrics.scan_cost);
1095        self.0.yield_cost.add_duration(metrics.yield_cost);
1096        self.record_elapsed_compute(metrics.yield_cost);
1097
1098        let mut metrics_set = self.0.metrics.lock().unwrap();
1099        metrics_set.merge_scanner_metrics(metrics);
1100    }
1101
1102    /// Merges [ReaderMetrics] and `build_reader_cost`.
1103    pub fn merge_reader_metrics(
1104        &self,
1105        metrics: &ReaderMetrics,
1106        per_file_metrics: Option<&HashMap<RegionFileId, FileScanMetrics>>,
1107    ) {
1108        self.0.build_parts_cost.add_duration(metrics.build_cost);
1109
1110        let mut metrics_set = self.0.metrics.lock().unwrap();
1111        metrics_set.merge_reader_metrics(metrics);
1112
1113        // Merge per-file metrics if provided
1114        if let Some(file_metrics) = per_file_metrics {
1115            metrics_set.merge_per_file_metrics(file_metrics);
1116        }
1117    }
1118
1119    /// Finishes the query.
1120    pub(crate) fn on_finish(&self) {
1121        self.0.on_finish(true);
1122    }
1123
1124    /// Sets the distributor metrics.
1125    pub(crate) fn set_distributor_metrics(&self, metrics: &SeriesDistributorMetrics) {
1126        let mut metrics_set = self.0.metrics.lock().unwrap();
1127        metrics_set.set_distributor_metrics(metrics);
1128    }
1129
1130    /// Returns whether verbose explain is enabled.
1131    pub(crate) fn explain_verbose(&self) -> bool {
1132        self.0.explain_verbose
1133    }
1134
1135    /// Returns a MergeMetricsReport trait object for reporting merge metrics.
1136    pub(crate) fn merge_metrics_reporter(&self) -> Arc<dyn MergeMetricsReport> {
1137        self.0.clone()
1138    }
1139
1140    /// Returns a DedupMetricsReport trait object for reporting dedup metrics.
1141    pub(crate) fn dedup_metrics_reporter(&self) -> Arc<dyn DedupMetricsReport> {
1142        self.0.clone()
1143    }
1144
1145    /// Increments the total bytes added to the range cache.
1146    #[allow(dead_code)]
1147    pub(crate) fn inc_range_cache_size(&self, size: usize) {
1148        let mut metrics = self.0.metrics.lock().unwrap();
1149        metrics.range_cache_size += size;
1150    }
1151
1152    /// Increments the range cache hit counter.
1153    #[allow(dead_code)]
1154    pub(crate) fn inc_range_cache_hit(&self) {
1155        let mut metrics = self.0.metrics.lock().unwrap();
1156        metrics.range_cache_hit += 1;
1157    }
1158
1159    /// Increments the range cache miss counter.
1160    #[allow(dead_code)]
1161    pub(crate) fn inc_range_cache_miss(&self) {
1162        let mut metrics = self.0.metrics.lock().unwrap();
1163        metrics.range_cache_miss += 1;
1164    }
1165}
1166
1167impl fmt::Debug for PartitionMetrics {
1168    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1169        let metrics = self.0.metrics.lock().unwrap();
1170        write!(
1171            f,
1172            r#"{{"partition":{}, "metrics":{:?}}}"#,
1173            self.0.partition, metrics
1174        )
1175    }
1176}
1177
1178/// Metrics for the series distributor.
1179#[derive(Default)]
1180pub(crate) struct SeriesDistributorMetrics {
1181    /// Number of send timeout in SeriesScan.
1182    pub(crate) num_series_send_timeout: usize,
1183    /// Number of send full in SeriesScan.
1184    pub(crate) num_series_send_full: usize,
1185    /// Number of rows the series distributor scanned.
1186    pub(crate) num_rows: usize,
1187    /// Number of batches the series distributor scanned.
1188    pub(crate) num_batches: usize,
1189    /// Duration of the series distributor to scan.
1190    pub(crate) scan_cost: Duration,
1191    /// Duration of the series distributor to yield.
1192    pub(crate) yield_cost: Duration,
1193    /// Duration spent in divider operations.
1194    pub(crate) divider_cost: Duration,
1195}
1196
1197/// Scans memtable ranges at `index` using flat format that returns RecordBatch.
1198#[tracing::instrument(
1199    skip_all,
1200    fields(
1201        region_id = %stream_ctx.input.region_metadata().region_id,
1202        row_group_index = %index.index,
1203        source = "mem_flat"
1204    )
1205)]
1206pub(crate) fn scan_flat_mem_ranges(
1207    stream_ctx: Arc<StreamContext>,
1208    part_metrics: PartitionMetrics,
1209    index: RowGroupIndex,
1210    time_range: FileTimeRange,
1211) -> impl Stream<Item = Result<RecordBatch>> {
1212    try_stream! {
1213        let ranges = stream_ctx.input.build_mem_ranges(index);
1214        part_metrics.inc_num_mem_ranges(ranges.len());
1215        for range in ranges {
1216            let build_reader_start = Instant::now();
1217            let mem_scan_metrics = Some(MemScanMetrics::default());
1218            let mut iter = range.build_record_batch_iter(Some(time_range), mem_scan_metrics.clone())?;
1219            part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
1220
1221            while let Some(record_batch) = iter.next().transpose()? {
1222                yield record_batch;
1223            }
1224
1225            // Report the memtable scan metrics to partition metrics
1226            if let Some(ref metrics) = mem_scan_metrics {
1227                let data = metrics.data();
1228                part_metrics.report_mem_scan_metrics(&data);
1229            }
1230        }
1231    }
1232}
1233
1234/// Files with row count greater than this threshold can contribute to the estimation.
1235const SPLIT_ROW_THRESHOLD: u64 = DEFAULT_ROW_GROUP_SIZE as u64;
1236/// Number of series threshold for splitting batches.
1237const NUM_SERIES_THRESHOLD: u64 = 10240;
1238/// Minimum batch size after splitting. The batch size is less than 60 because a series may only have
1239/// 60 samples per hour.
1240const BATCH_SIZE_THRESHOLD: u64 = 50;
1241
1242/// Returns the estimated rows per batch after splitting if splitting flat record batches
1243/// may improve merge performance. Returns `None` if splitting is not beneficial.
1244pub(crate) fn should_split_flat_batches_for_merge(
1245    stream_ctx: &Arc<StreamContext>,
1246    range_meta: &RangeMeta,
1247) -> Option<usize> {
1248    // Number of files to split and scan.
1249    let mut num_files_to_split = 0;
1250    let mut num_mem_rows = 0;
1251    let mut num_mem_series = 0;
1252    // Total rows and series for estimating batch size after splitting.
1253    let mut total_rows: u64 = 0;
1254    let mut total_series: u64 = 0;
1255    // Checks each file range, returns early if any range is not splittable.
1256    // For mem ranges, we collect the total number of rows and series because the number of rows in a
1257    // mem range may be too small.
1258    for index in &range_meta.row_group_indices {
1259        if stream_ctx.is_mem_range_index(*index) {
1260            let memtable = &stream_ctx.input.memtables[index.index];
1261            // Is mem range
1262            let stats = memtable.stats();
1263            num_mem_rows += stats.num_rows();
1264            num_mem_series += stats.series_count();
1265        } else if stream_ctx.is_file_range_index(*index) {
1266            // This is a file range.
1267            let file_index = index.index - stream_ctx.input.num_memtables();
1268            let file = &stream_ctx.input.files[file_index];
1269            let file_meta = file.meta_ref();
1270            if file_meta.level == 0 {
1271                // Always split level 0 files.
1272                num_files_to_split += 1;
1273                continue;
1274            } else if file_meta.num_rows < SPLIT_ROW_THRESHOLD || file_meta.num_series == 0 {
1275                // If the file doesn't have enough rows, or the number of series is unavailable, skips it.
1276                continue;
1277            }
1278            debug_assert!(file_meta.num_rows > 0);
1279            if !can_split_series(file_meta.num_rows, file_meta.num_series) {
1280                // We can't split batches in a file.
1281                common_telemetry::trace!(
1282                    "Can't split series for file {}, level: {}, num_rows: {}, num_series: {}",
1283                    file_meta.file_id,
1284                    file_meta.level,
1285                    file_meta.num_rows,
1286                    file_meta.num_series,
1287                );
1288                return None;
1289            } else {
1290                num_files_to_split += 1;
1291                total_rows += file.meta_ref().num_rows;
1292                total_series += file.meta_ref().num_series;
1293            }
1294        }
1295        // Skips non-file and non-mem ranges.
1296    }
1297
1298    let should_split = if num_files_to_split > 0 {
1299        // We mainly consider file ranges because they have enough data for sampling.
1300        true
1301    } else if num_mem_series > 0
1302        && num_mem_rows > 0
1303        && can_split_series(num_mem_rows as u64, num_mem_series as u64)
1304    {
1305        total_rows += num_mem_rows as u64;
1306        total_series += num_mem_series as u64;
1307        true
1308    } else {
1309        false
1310    };
1311
1312    if !should_split {
1313        return None;
1314    }
1315
1316    // Estimate rows per batch after splitting.
1317    let estimated_batch_size = if total_series > 0 && total_rows > 0 {
1318        ((total_rows / total_series) as usize).clamp(1, DEFAULT_READ_BATCH_SIZE)
1319    } else {
1320        // No valid estimate available, use a conservative fallback.
1321        DEFAULT_READ_BATCH_SIZE / 4
1322    };
1323    Some(estimated_batch_size)
1324}
1325
1326/// Computes the channel size for parallel scan based on the estimated rows per batch.
1327/// The channel should buffer approximately `2 * DEFAULT_READ_BATCH_SIZE` rows.
1328pub(crate) fn compute_parallel_channel_size(estimated_rows_per_batch: usize) -> usize {
1329    let size = 2 * DEFAULT_READ_BATCH_SIZE / estimated_rows_per_batch.max(1);
1330    size.clamp(2, 64)
1331}
1332
1333/// Computes the average estimated rows per batch across multiple range readers.
1334pub(crate) fn compute_average_batch_size(
1335    estimated_rows_per_batch: impl IntoIterator<Item = usize>,
1336) -> usize {
1337    let mut total = 0usize;
1338    let mut count = 0usize;
1339    for size in estimated_rows_per_batch {
1340        total += size;
1341        count += 1;
1342    }
1343
1344    if count == 0 {
1345        return DEFAULT_READ_BATCH_SIZE;
1346    }
1347
1348    (total / count).clamp(1, DEFAULT_READ_BATCH_SIZE)
1349}
1350
1351fn can_split_series(num_rows: u64, num_series: u64) -> bool {
1352    if num_rows == 0 || num_series == 0 {
1353        return false;
1354    }
1355
1356    // It doesn't have too many series or it will have enough rows for each batch.
1357    num_series < NUM_SERIES_THRESHOLD || num_rows / num_series >= BATCH_SIZE_THRESHOLD
1358}
1359
1360#[cfg(test)]
1361mod split_tests {
1362    use std::sync::Arc;
1363
1364    use common_time::Timestamp;
1365    use smallvec::smallvec;
1366    use store_api::storage::FileId;
1367
1368    use super::*;
1369    use crate::read::flat_projection::FlatProjectionMapper;
1370    use crate::read::range::{RangeMeta, RowGroupIndex, SourceIndex};
1371    use crate::read::scan_region::{ScanInput, StreamContext};
1372    use crate::sst::file::FileHandle;
1373    use crate::test_util::memtable_util::metadata_with_primary_key;
1374    use crate::test_util::scheduler_util::SchedulerEnv;
1375    use crate::test_util::sst_util::sst_file_handle_with_file_id;
1376
1377    async fn new_stream_context_with_files(files: Vec<FileHandle>) -> StreamContext {
1378        let env = SchedulerEnv::new().await;
1379        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1380        let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap();
1381        let input = ScanInput::new(env.access_layer.clone(), mapper).with_files(files);
1382
1383        StreamContext {
1384            input,
1385            ranges: vec![],
1386            scan_fingerprint: None,
1387            scan_implied_time_range: None,
1388            query_start: std::time::Instant::now(),
1389        }
1390    }
1391
1392    fn single_file_range_meta() -> RangeMeta {
1393        RangeMeta {
1394            time_range: (
1395                Timestamp::new_millisecond(0),
1396                Timestamp::new_millisecond(1000),
1397            ),
1398            indices: smallvec![SourceIndex {
1399                index: 0,
1400                num_row_groups: 1,
1401            }],
1402            row_group_indices: smallvec![RowGroupIndex {
1403                index: 0,
1404                row_group_index: 0,
1405            }],
1406            num_rows: 1024,
1407        }
1408    }
1409
1410    #[tokio::test]
1411    async fn should_split_level_zero_file_even_when_series_stats_are_missing() {
1412        let mut file = sst_file_handle_with_file_id(FileId::random(), 0, 1000)
1413            .meta_ref()
1414            .clone();
1415        file.level = 0;
1416        file.num_rows = DEFAULT_ROW_GROUP_SIZE as u64;
1417        file.num_row_groups = 1;
1418        file.num_series = 0;
1419
1420        let file = FileHandle::new(file, crate::test_util::new_noop_file_purger());
1421        let stream_ctx = Arc::new(new_stream_context_with_files(vec![file]).await);
1422
1423        assert!(
1424            should_split_flat_batches_for_merge(&stream_ctx, &single_file_range_meta()).is_some()
1425        );
1426    }
1427
1428    #[test]
1429    fn can_split_series_returns_false_for_zero_inputs() {
1430        assert!(!can_split_series(0, 1));
1431        assert!(!can_split_series(1, 0));
1432        assert!(!can_split_series(0, 0));
1433    }
1434}
1435
1436/// Creates a new [ReaderFilterMetrics] with optional apply metrics initialized
1437/// based on the `explain_verbose` flag.
1438pub(crate) fn new_filter_metrics(explain_verbose: bool) -> ReaderFilterMetrics {
1439    if explain_verbose {
1440        ReaderFilterMetrics {
1441            inverted_index_apply_metrics: Some(InvertedIndexApplyMetrics::default()),
1442            bloom_filter_apply_metrics: Some(BloomFilterIndexApplyMetrics::default()),
1443            fulltext_index_apply_metrics: Some(FulltextIndexApplyMetrics::default()),
1444            ..Default::default()
1445        }
1446    } else {
1447        ReaderFilterMetrics::default()
1448    }
1449}
1450
1451/// Scans file ranges at `index` using flat reader that returns RecordBatch.
1452#[tracing::instrument(
1453    skip_all,
1454    fields(
1455        region_id = %stream_ctx.input.region_metadata().region_id,
1456        row_group_index = %index.index,
1457        source = read_type
1458    )
1459)]
1460pub(crate) async fn scan_flat_file_ranges(
1461    stream_ctx: Arc<StreamContext>,
1462    part_metrics: PartitionMetrics,
1463    index: RowGroupIndex,
1464    read_type: &'static str,
1465    partition_pruner: Arc<PartitionPruner>,
1466) -> Result<impl Stream<Item = Result<RecordBatch>>> {
1467    let mut reader_metrics = ReaderMetrics {
1468        filter_metrics: new_filter_metrics(part_metrics.explain_verbose()),
1469        ..Default::default()
1470    };
1471    let ranges = partition_pruner
1472        .build_file_ranges(index, &part_metrics, &mut reader_metrics)
1473        .await?;
1474    part_metrics.inc_num_file_ranges(ranges.len());
1475    part_metrics.merge_reader_metrics(&reader_metrics, None);
1476
1477    // Creates initial per-file metrics with build_part_cost.
1478    let init_per_file_metrics = if part_metrics.explain_verbose() {
1479        let file = stream_ctx.input.file_from_index(index);
1480        let file_id = file.file_id();
1481
1482        let mut map = HashMap::new();
1483        map.insert(
1484            file_id,
1485            FileScanMetrics {
1486                build_part_cost: reader_metrics.build_cost,
1487                ..Default::default()
1488            },
1489        );
1490        Some(map)
1491    } else {
1492        None
1493    };
1494
1495    Ok(build_flat_file_range_scan_stream(
1496        stream_ctx,
1497        part_metrics,
1498        read_type,
1499        ranges,
1500        init_per_file_metrics,
1501    ))
1502}
1503
1504/// Build the stream of scanning the input [`FileRange`]s using flat reader that returns RecordBatch.
1505#[tracing::instrument(
1506    skip_all,
1507    fields(read_type = read_type, range_count = ranges.len())
1508)]
1509pub fn build_flat_file_range_scan_stream(
1510    stream_ctx: Arc<StreamContext>,
1511    part_metrics: PartitionMetrics,
1512    read_type: &'static str,
1513    ranges: SmallVec<[FileRange; 2]>,
1514    mut per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
1515) -> impl Stream<Item = Result<RecordBatch>> {
1516    try_stream! {
1517        let fetch_metrics = if part_metrics.explain_verbose() {
1518            Some(Arc::new(ParquetFetchMetrics::default()))
1519        } else {
1520            None
1521        };
1522        let reader_metrics = &mut ReaderMetrics {
1523            fetch_metrics: fetch_metrics.clone(),
1524            ..Default::default()
1525        };
1526        for range in ranges {
1527            let build_reader_start = Instant::now();
1528            let Some(mut reader) = range
1529                .flat_reader(
1530                    stream_ctx.input.series_row_selector,
1531                    fetch_metrics.as_deref(),
1532                )
1533                .await?
1534            else {
1535                continue;
1536            };
1537            let build_cost = build_reader_start.elapsed();
1538            part_metrics.inc_build_reader_cost(build_cost);
1539
1540            let may_compat = range.compat_batch();
1541
1542            let mapper = range.compaction_projection_mapper();
1543            while let Some(record_batch) = reader.next_batch().await? {
1544                let record_batch = if let Some(mapper) = mapper {
1545                    let batch = mapper.project(record_batch)?;
1546                    batch
1547                } else {
1548                    record_batch
1549                };
1550
1551                if let Some(flat_compat) = may_compat {
1552                    let batch = flat_compat.compat(record_batch)?;
1553                    yield batch;
1554                } else {
1555                    yield record_batch;
1556                }
1557            }
1558
1559            let prune_metrics = reader.metrics();
1560
1561            // Update per-file metrics if tracking is enabled
1562            if let Some(file_metrics_map) = per_file_metrics.as_mut() {
1563                let file_id = range.file_handle().file_id();
1564                let file_metrics = file_metrics_map
1565                    .entry(file_id)
1566                    .or_insert_with(FileScanMetrics::default);
1567
1568                file_metrics.num_ranges += 1;
1569                file_metrics.num_rows += prune_metrics.num_rows;
1570                file_metrics.build_reader_cost += build_cost;
1571                file_metrics.scan_cost += prune_metrics.scan_cost;
1572            }
1573
1574            reader_metrics.merge_from(&prune_metrics);
1575        }
1576
1577        // Reports metrics.
1578        reader_metrics.observe_rows(read_type);
1579        reader_metrics.filter_metrics.observe();
1580        part_metrics.merge_reader_metrics(reader_metrics, per_file_metrics.as_ref());
1581    }
1582}
1583
1584/// Build the stream of scanning the extension range in flat format denoted by the [`RowGroupIndex`].
1585#[cfg(feature = "enterprise")]
1586pub(crate) async fn scan_flat_extension_range(
1587    context: Arc<StreamContext>,
1588    index: RowGroupIndex,
1589    partition_metrics: PartitionMetrics,
1590    options: crate::extension::ExtensionRangeReadOptions,
1591) -> Result<BoxedRecordBatchStream> {
1592    use snafu::ResultExt;
1593
1594    let range = context.input.extension_range(index.index);
1595    let reader = range.flat_reader(context.as_ref(), options);
1596    let stream = reader
1597        .read(context, partition_metrics, index)
1598        .await
1599        .context(crate::error::ScanExternalRangeSnafu)?;
1600    Ok(stream)
1601}
1602
1603pub(crate) async fn maybe_scan_flat_other_ranges(
1604    context: &Arc<StreamContext>,
1605    index: RowGroupIndex,
1606    metrics: &PartitionMetrics,
1607    pre_filter_mode: PreFilterMode,
1608) -> Result<BoxedRecordBatchStream> {
1609    #[cfg(feature = "enterprise")]
1610    {
1611        let options = crate::extension::ExtensionRangeReadOptions { pre_filter_mode };
1612        scan_flat_extension_range(context.clone(), index, metrics.clone(), options).await
1613    }
1614
1615    #[cfg(not(feature = "enterprise"))]
1616    {
1617        let _ = context;
1618        let _ = index;
1619        let _ = metrics;
1620        let _ = pre_filter_mode;
1621
1622        crate::error::UnexpectedSnafu {
1623            reason: "no other ranges scannable in flat format",
1624        }
1625        .fail()
1626    }
1627}
1628
1629/// A stream wrapper that splits record batches from an inner stream.
1630pub(crate) struct SplitRecordBatchStream<S> {
1631    /// The inner stream that yields record batches.
1632    inner: S,
1633    /// Buffer for split batches.
1634    batches: VecDeque<RecordBatch>,
1635}
1636
1637impl<S> SplitRecordBatchStream<S> {
1638    /// Creates a new splitting stream wrapper.
1639    pub(crate) fn new(inner: S) -> Self {
1640        Self {
1641            inner,
1642            batches: VecDeque::new(),
1643        }
1644    }
1645}
1646
1647impl<S> Stream for SplitRecordBatchStream<S>
1648where
1649    S: Stream<Item = Result<RecordBatch>> + Unpin,
1650{
1651    type Item = Result<RecordBatch>;
1652
1653    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1654        loop {
1655            // First, check if we have buffered split batches
1656            if let Some(batch) = self.batches.pop_front() {
1657                return Poll::Ready(Some(Ok(batch)));
1658            }
1659
1660            // Poll the inner stream for the next batch
1661            let record_batch = match futures::ready!(Pin::new(&mut self.inner).poll_next(cx)) {
1662                Some(Ok(batch)) => batch,
1663                Some(Err(e)) => return Poll::Ready(Some(Err(e))),
1664                None => return Poll::Ready(None),
1665            };
1666
1667            // Split the batch and buffer the results
1668            split_record_batch(record_batch, &mut self.batches);
1669            // Continue the loop to return the first split batch
1670        }
1671    }
1672}
1673
1674/// Splits the batch so each sub-batch has strictly increasing timestamps.
1675///
1676/// # Panics
1677/// Panics if the timestamp array is invalid.
1678pub(crate) fn split_record_batch(record_batch: RecordBatch, batches: &mut VecDeque<RecordBatch>) {
1679    let batch_rows = record_batch.num_rows();
1680    if batch_rows == 0 {
1681        return;
1682    }
1683    if batch_rows < 2 {
1684        batches.push_back(record_batch);
1685        return;
1686    }
1687
1688    let time_index_pos = time_index_column_index(record_batch.num_columns());
1689    let timestamps = record_batch.column(time_index_pos);
1690    let (ts_values, _unit) = timestamp_array_to_primitive(timestamps).unwrap();
1691    let mut offsets = Vec::with_capacity(16);
1692    offsets.push(0);
1693    let values = ts_values.values();
1694    for (i, &value) in values.iter().take(batch_rows - 1).enumerate() {
1695        if value >= values[i + 1] {
1696            offsets.push(i + 1);
1697        }
1698    }
1699    offsets.push(values.len());
1700
1701    // Splits the batch by offsets.
1702    for (i, &start) in offsets[..offsets.len() - 1].iter().enumerate() {
1703        let end = offsets[i + 1];
1704        let rows_in_batch = end - start;
1705        batches.push_back(record_batch.slice(start, rows_in_batch));
1706    }
1707}
1708
1709#[cfg(test)]
1710mod tests {
1711    use std::sync::Arc;
1712    use std::time::Instant;
1713
1714    use common_time::Timestamp;
1715    use smallvec::{SmallVec, smallvec};
1716    use store_api::storage::RegionId;
1717
1718    use super::*;
1719    use crate::cache::CacheStrategy;
1720    use crate::memtable::{
1721        BoxedBatchIterator, BoxedRecordBatchIterator, IterBuilder, MemtableRange,
1722        MemtableRangeContext, MemtableStats,
1723    };
1724    use crate::read::flat_projection::FlatProjectionMapper;
1725    use crate::read::range::{MemRangeBuilder, SourceIndex};
1726    use crate::read::scan_region::ScanInput;
1727    use crate::sst::file::{FileHandle, FileMeta};
1728    use crate::sst::file_purger::NoopFilePurger;
1729    use crate::test_util::memtable_util::metadata_for_test;
1730    use crate::test_util::scheduler_util::SchedulerEnv;
1731
1732    struct EmptyIterBuilder;
1733
1734    impl IterBuilder for EmptyIterBuilder {
1735        fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
1736            Ok(Box::new(std::iter::empty()))
1737        }
1738
1739        fn is_record_batch(&self) -> bool {
1740            true
1741        }
1742
1743        fn build_record_batch(
1744            &self,
1745            _time_range: Option<(Timestamp, Timestamp)>,
1746            _metrics: Option<MemScanMetrics>,
1747        ) -> Result<BoxedRecordBatchIterator> {
1748            Ok(Box::new(std::iter::empty()))
1749        }
1750    }
1751
1752    async fn new_test_stream_ctx(
1753        files: Vec<FileHandle>,
1754        memtables: Vec<MemRangeBuilder>,
1755    ) -> Arc<StreamContext> {
1756        let env = SchedulerEnv::new().await;
1757        let metadata = metadata_for_test();
1758        let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap();
1759        let input = ScanInput::new(env.access_layer.clone(), mapper)
1760            .with_cache(CacheStrategy::Disabled)
1761            .with_memtables(memtables)
1762            .with_files(files);
1763
1764        Arc::new(StreamContext {
1765            input,
1766            ranges: Vec::new(),
1767            scan_fingerprint: None,
1768            scan_implied_time_range: None,
1769            query_start: Instant::now(),
1770        })
1771    }
1772
1773    fn new_test_file(num_rows: u64, num_series: u64) -> FileHandle {
1774        let meta = FileMeta {
1775            region_id: RegionId::new(123, 456),
1776            file_id: Default::default(),
1777            level: 1,
1778            time_range: (
1779                Timestamp::new_millisecond(0),
1780                Timestamp::new_millisecond(1000),
1781            ),
1782            num_rows,
1783            num_series,
1784            ..Default::default()
1785        };
1786        FileHandle::new(meta, Arc::new(NoopFilePurger))
1787    }
1788
1789    fn new_test_memtable(num_rows: usize, series_count: usize) -> MemRangeBuilder {
1790        let context = Arc::new(MemtableRangeContext::new(
1791            0,
1792            Box::new(EmptyIterBuilder),
1793            Default::default(),
1794        ));
1795        let stats = MemtableStats {
1796            time_range: Some((
1797                Timestamp::new_millisecond(0),
1798                Timestamp::new_millisecond(1000),
1799            )),
1800            num_rows,
1801            num_ranges: 1,
1802            series_count,
1803            ..Default::default()
1804        };
1805        let range = MemtableRange::new(context, stats.clone());
1806        MemRangeBuilder::new(range, stats)
1807    }
1808
1809    fn new_test_range_meta(row_group_indices: SmallVec<[RowGroupIndex; 2]>) -> RangeMeta {
1810        let indices = row_group_indices
1811            .iter()
1812            .map(|row_group_index| SourceIndex {
1813                index: row_group_index.index,
1814                num_row_groups: 1,
1815            })
1816            .collect();
1817
1818        RangeMeta {
1819            time_range: (
1820                Timestamp::new_millisecond(0),
1821                Timestamp::new_millisecond(1000),
1822            ),
1823            indices,
1824            row_group_indices,
1825            num_rows: 0,
1826        }
1827    }
1828
1829    #[tokio::test]
1830    async fn test_should_split_flat_batches_for_merge_uses_splittable_file_rows_per_series() {
1831        let num_rows = SPLIT_ROW_THRESHOLD * 2;
1832        let num_series = (num_rows / 100).max(1);
1833        let stream_ctx =
1834            new_test_stream_ctx(vec![new_test_file(num_rows, num_series)], vec![]).await;
1835        let range_meta = new_test_range_meta(smallvec![RowGroupIndex {
1836            index: 0,
1837            row_group_index: 0,
1838        }]);
1839
1840        assert_eq!(
1841            Some((num_rows / num_series) as usize),
1842            should_split_flat_batches_for_merge(&stream_ctx, &range_meta)
1843        );
1844    }
1845
1846    #[tokio::test]
1847    async fn test_should_split_flat_batches_for_merge_skips_small_or_unknown_series_files() {
1848        let stream_ctx = new_test_stream_ctx(
1849            vec![
1850                new_test_file(SPLIT_ROW_THRESHOLD.saturating_sub(1), 1),
1851                new_test_file(SPLIT_ROW_THRESHOLD * 2, 0),
1852            ],
1853            vec![],
1854        )
1855        .await;
1856        let range_meta = new_test_range_meta(smallvec![
1857            RowGroupIndex {
1858                index: 0,
1859                row_group_index: 0,
1860            },
1861            RowGroupIndex {
1862                index: 1,
1863                row_group_index: 0,
1864            }
1865        ]);
1866
1867        assert_eq!(
1868            None,
1869            should_split_flat_batches_for_merge(&stream_ctx, &range_meta)
1870        );
1871    }
1872
1873    #[tokio::test]
1874    async fn test_should_split_flat_batches_for_merge_returns_none_for_unsplittable_file() {
1875        let num_series =
1876            (SPLIT_ROW_THRESHOLD / (BATCH_SIZE_THRESHOLD - 1)).max(NUM_SERIES_THRESHOLD) + 1;
1877        let stream_ctx =
1878            new_test_stream_ctx(vec![new_test_file(SPLIT_ROW_THRESHOLD, num_series)], vec![]).await;
1879        let range_meta = new_test_range_meta(smallvec![RowGroupIndex {
1880            index: 0,
1881            row_group_index: 0,
1882        }]);
1883
1884        assert_eq!(
1885            None,
1886            should_split_flat_batches_for_merge(&stream_ctx, &range_meta)
1887        );
1888    }
1889
1890    #[tokio::test]
1891    async fn test_should_split_flat_batches_for_merge_falls_back_to_memtables() {
1892        let stream_ctx = new_test_stream_ctx(vec![], vec![new_test_memtable(5_000, 100)]).await;
1893        let range_meta = new_test_range_meta(smallvec![RowGroupIndex {
1894            index: 0,
1895            row_group_index: 0,
1896        }]);
1897
1898        assert_eq!(
1899            Some(50),
1900            should_split_flat_batches_for_merge(&stream_ctx, &range_meta)
1901        );
1902    }
1903
1904    #[tokio::test]
1905    async fn test_should_split_flat_batches_for_merge_clamps_estimate() {
1906        let stream_ctx =
1907            new_test_stream_ctx(vec![new_test_file(SPLIT_ROW_THRESHOLD * 2, 1)], vec![]).await;
1908        let range_meta = new_test_range_meta(smallvec![RowGroupIndex {
1909            index: 0,
1910            row_group_index: 0,
1911        }]);
1912
1913        assert_eq!(
1914            Some(DEFAULT_READ_BATCH_SIZE),
1915            should_split_flat_batches_for_merge(&stream_ctx, &range_meta)
1916        );
1917    }
1918
1919    #[test]
1920    fn test_compute_parallel_channel_size_clamps_to_max_for_small_batches() {
1921        assert_eq!(64, compute_parallel_channel_size(0));
1922        assert_eq!(64, compute_parallel_channel_size(1));
1923    }
1924
1925    #[test]
1926    fn test_compute_parallel_channel_size_returns_expected_mid_range_size() {
1927        assert_eq!(
1928            4,
1929            compute_parallel_channel_size(DEFAULT_READ_BATCH_SIZE / 2)
1930        );
1931    }
1932
1933    #[test]
1934    fn test_compute_parallel_channel_size_clamps_to_min_for_large_batches() {
1935        assert_eq!(2, compute_parallel_channel_size(DEFAULT_READ_BATCH_SIZE));
1936        assert_eq!(
1937            2,
1938            compute_parallel_channel_size(DEFAULT_READ_BATCH_SIZE * 2)
1939        );
1940    }
1941
1942    #[test]
1943    fn test_compute_average_batch_size_uses_arithmetic_mean() {
1944        assert_eq!(24, compute_average_batch_size([16, 24, 32]));
1945    }
1946
1947    #[test]
1948    fn test_compute_average_batch_size_clamps_values() {
1949        assert_eq!(
1950            DEFAULT_READ_BATCH_SIZE,
1951            compute_average_batch_size([DEFAULT_READ_BATCH_SIZE, DEFAULT_READ_BATCH_SIZE * 2])
1952        );
1953        assert_eq!(1, compute_average_batch_size([0, 1]));
1954    }
1955
1956    #[test]
1957    fn test_compute_average_batch_size_falls_back_when_empty() {
1958        assert_eq!(
1959            DEFAULT_READ_BATCH_SIZE,
1960            compute_average_batch_size(std::iter::empty())
1961        );
1962    }
1963
1964    /// Builds a flat-format record batch whose time index column holds `timestamps`.
1965    fn flat_ts_batch(timestamps: &[i64]) -> RecordBatch {
1966        use datatypes::arrow::array::{TimestampMillisecondArray, UInt8Array, UInt64Array};
1967        use datatypes::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
1968
1969        let num_rows = timestamps.len();
1970        let schema = Arc::new(Schema::new(vec![
1971            Field::new(
1972                "ts",
1973                DataType::Timestamp(TimeUnit::Millisecond, None),
1974                false,
1975            ),
1976            Field::new("pk", DataType::UInt64, false),
1977            Field::new("seq", DataType::UInt64, false),
1978            Field::new("op", DataType::UInt8, false),
1979        ]));
1980        RecordBatch::try_new(
1981            schema,
1982            vec![
1983                Arc::new(TimestampMillisecondArray::from(timestamps.to_vec())),
1984                Arc::new(UInt64Array::from(vec![0u64; num_rows])),
1985                Arc::new(UInt64Array::from(vec![0u64; num_rows])),
1986                Arc::new(UInt8Array::from(vec![0u8; num_rows])),
1987            ],
1988        )
1989        .unwrap()
1990    }
1991
1992    /// Splits `timestamps` and returns the time index values of each sub-batch.
1993    fn split_ts(timestamps: &[i64]) -> Vec<Vec<i64>> {
1994        let mut batches = VecDeque::new();
1995        split_record_batch(flat_ts_batch(timestamps), &mut batches);
1996        batches
1997            .iter()
1998            .map(|batch| {
1999                let pos = time_index_column_index(batch.num_columns());
2000                let (values, _) = timestamp_array_to_primitive(batch.column(pos)).unwrap();
2001                values.values().to_vec()
2002            })
2003            .collect()
2004    }
2005
2006    #[test]
2007    fn test_split_record_batch_on_equal_timestamps() {
2008        // Splits on both decreasing and equal timestamps.
2009        assert_eq!(
2010            split_ts(&[1, 2, 2, 3, 1]),
2011            vec![vec![1, 2], vec![2, 3], vec![1]]
2012        );
2013        // A run of equal timestamps yields single-row sub-batches.
2014        assert_eq!(split_ts(&[5, 5, 5]), vec![vec![5], vec![5], vec![5]]);
2015        // Equal-ts run at the leading edge of the batch.
2016        assert_eq!(split_ts(&[5, 5, 1, 2]), vec![vec![5], vec![5], vec![1, 2]]);
2017        // Equal-ts run at the trailing edge of the batch.
2018        assert_eq!(split_ts(&[1, 2, 5, 5]), vec![vec![1, 2, 5], vec![5]]);
2019    }
2020
2021    #[test]
2022    fn test_split_record_batch_on_decreasing_timestamps() {
2023        assert_eq!(split_ts(&[1, 2, 3]), vec![vec![1, 2, 3]]);
2024        assert_eq!(split_ts(&[1, 3, 2, 4]), vec![vec![1, 3], vec![2, 4]]);
2025    }
2026
2027    #[test]
2028    fn test_split_record_batch_empty_and_single_row() {
2029        let mut batches = VecDeque::new();
2030        split_record_batch(flat_ts_batch(&[]), &mut batches);
2031        assert!(batches.is_empty());
2032
2033        assert_eq!(split_ts(&[42]), vec![vec![42]]);
2034    }
2035}