mito2/read/
scan_util.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities for scanners.
16
17use std::collections::{BinaryHeap, HashMap, VecDeque};
18use std::fmt;
19use std::pin::Pin;
20use std::sync::{Arc, Mutex};
21use std::task::{Context, Poll};
22use std::time::{Duration, Instant};
23
24use async_stream::try_stream;
25use common_telemetry::tracing;
26use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time};
27use datatypes::arrow::record_batch::RecordBatch;
28use datatypes::timestamp::timestamp_array_to_primitive;
29use futures::Stream;
30use prometheus::IntGauge;
31use smallvec::SmallVec;
32use snafu::OptionExt;
33use store_api::storage::RegionId;
34
35use crate::error::{Result, UnexpectedSnafu};
36use crate::memtable::MemScanMetrics;
37use crate::metrics::{
38    IN_PROGRESS_SCAN, PRECISE_FILTER_ROWS_TOTAL, READ_BATCHES_RETURN, READ_ROW_GROUPS_TOTAL,
39    READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_RETURN, READ_STAGE_ELAPSED,
40};
41use crate::read::dedup::{DedupMetrics, DedupMetricsReport};
42use crate::read::merge::{MergeMetrics, MergeMetricsReport};
43use crate::read::pruner::PartitionPruner;
44use crate::read::range::{RangeMeta, RowGroupIndex};
45use crate::read::scan_region::StreamContext;
46use crate::read::{Batch, BoxedBatchStream, BoxedRecordBatchStream, ScannerMetrics, Source};
47use crate::sst::file::{FileTimeRange, RegionFileId};
48use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplyMetrics;
49use crate::sst::index::fulltext_index::applier::FulltextIndexApplyMetrics;
50use crate::sst::index::inverted_index::applier::InvertedIndexApplyMetrics;
51use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
52use crate::sst::parquet::file_range::FileRange;
53use crate::sst::parquet::flat_format::time_index_column_index;
54use crate::sst::parquet::reader::{MetadataCacheMetrics, ReaderFilterMetrics, ReaderMetrics};
55use crate::sst::parquet::row_group::ParquetFetchMetrics;
56
57/// Per-file scan metrics.
58#[derive(Default, Clone)]
59pub struct FileScanMetrics {
60    /// Number of ranges (row groups) read from this file.
61    pub num_ranges: usize,
62    /// Number of rows read from this file.
63    pub num_rows: usize,
64    /// Time spent building file ranges/parts (file-level preparation).
65    pub build_part_cost: Duration,
66    /// Time spent building readers for this file (accumulated across all ranges).
67    pub build_reader_cost: Duration,
68    /// Time spent scanning this file (accumulated across all ranges).
69    pub scan_cost: Duration,
70}
71
72impl fmt::Debug for FileScanMetrics {
73    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
74        write!(f, "{{\"build_part_cost\":\"{:?}\"", self.build_part_cost)?;
75
76        if self.num_ranges > 0 {
77            write!(f, ", \"num_ranges\":{}", self.num_ranges)?;
78        }
79        if self.num_rows > 0 {
80            write!(f, ", \"num_rows\":{}", self.num_rows)?;
81        }
82        if !self.build_reader_cost.is_zero() {
83            write!(
84                f,
85                ", \"build_reader_cost\":\"{:?}\"",
86                self.build_reader_cost
87            )?;
88        }
89        if !self.scan_cost.is_zero() {
90            write!(f, ", \"scan_cost\":\"{:?}\"", self.scan_cost)?;
91        }
92
93        write!(f, "}}")
94    }
95}
96
97impl FileScanMetrics {
98    /// Merges another FileMetrics into this one.
99    pub(crate) fn merge_from(&mut self, other: &FileScanMetrics) {
100        self.num_ranges += other.num_ranges;
101        self.num_rows += other.num_rows;
102        self.build_part_cost += other.build_part_cost;
103        self.build_reader_cost += other.build_reader_cost;
104        self.scan_cost += other.scan_cost;
105    }
106}
107
108/// Verbose scan metrics for a partition.
109#[derive(Default)]
110pub(crate) struct ScanMetricsSet {
111    /// Duration to prepare the scan task.
112    prepare_scan_cost: Duration,
113    /// Duration to build the (merge) reader.
114    build_reader_cost: Duration,
115    /// Duration to scan data.
116    scan_cost: Duration,
117    /// Duration while waiting for `yield`.
118    yield_cost: Duration,
119    /// Duration to convert [`Batch`]es.
120    convert_cost: Option<Time>,
121    /// Duration of the scan.
122    total_cost: Duration,
123    /// Number of rows returned.
124    num_rows: usize,
125    /// Number of batches returned.
126    num_batches: usize,
127    /// Number of mem ranges scanned.
128    num_mem_ranges: usize,
129    /// Number of file ranges scanned.
130    num_file_ranges: usize,
131
132    // Memtable related metrics:
133    /// Duration to scan memtables.
134    mem_scan_cost: Duration,
135    /// Number of rows read from memtables.
136    mem_rows: usize,
137    /// Number of batches read from memtables.
138    mem_batches: usize,
139    /// Number of series read from memtables.
140    mem_series: usize,
141
142    // SST related metrics:
143    /// Duration to build file ranges.
144    build_parts_cost: Duration,
145    /// Duration to scan SST files.
146    sst_scan_cost: Duration,
147    /// Number of row groups before filtering.
148    rg_total: usize,
149    /// Number of row groups filtered by fulltext index.
150    rg_fulltext_filtered: usize,
151    /// Number of row groups filtered by inverted index.
152    rg_inverted_filtered: usize,
153    /// Number of row groups filtered by min-max index.
154    rg_minmax_filtered: usize,
155    /// Number of row groups filtered by bloom filter index.
156    rg_bloom_filtered: usize,
157    /// Number of row groups filtered by vector index.
158    rg_vector_filtered: usize,
159    /// Number of rows in row group before filtering.
160    rows_before_filter: usize,
161    /// Number of rows in row group filtered by fulltext index.
162    rows_fulltext_filtered: usize,
163    /// Number of rows in row group filtered by inverted index.
164    rows_inverted_filtered: usize,
165    /// Number of rows in row group filtered by bloom filter index.
166    rows_bloom_filtered: usize,
167    /// Number of rows filtered by vector index.
168    rows_vector_filtered: usize,
169    /// Number of rows selected by vector index.
170    rows_vector_selected: usize,
171    /// Number of rows filtered by precise filter.
172    rows_precise_filtered: usize,
173    /// Number of index result cache hits for fulltext index.
174    fulltext_index_cache_hit: usize,
175    /// Number of index result cache misses for fulltext index.
176    fulltext_index_cache_miss: usize,
177    /// Number of index result cache hits for inverted index.
178    inverted_index_cache_hit: usize,
179    /// Number of index result cache misses for inverted index.
180    inverted_index_cache_miss: usize,
181    /// Number of index result cache hits for bloom filter index.
182    bloom_filter_cache_hit: usize,
183    /// Number of index result cache misses for bloom filter index.
184    bloom_filter_cache_miss: usize,
185    /// Number of index result cache hits for minmax pruning.
186    minmax_cache_hit: usize,
187    /// Number of index result cache misses for minmax pruning.
188    minmax_cache_miss: usize,
189    /// Number of pruner builder cache hits.
190    pruner_cache_hit: usize,
191    /// Number of pruner builder cache misses.
192    pruner_cache_miss: usize,
193    /// Duration spent waiting for pruner to build file ranges.
194    pruner_prune_cost: Duration,
195    /// Number of record batches read from SST.
196    num_sst_record_batches: usize,
197    /// Number of batches decoded from SST.
198    num_sst_batches: usize,
199    /// Number of rows read from SST.
200    num_sst_rows: usize,
201
202    /// Elapsed time before the first poll operation.
203    first_poll: Duration,
204
205    /// Number of send timeout in SeriesScan.
206    num_series_send_timeout: usize,
207    /// Number of send full in SeriesScan.
208    num_series_send_full: usize,
209    /// Number of rows the series distributor scanned.
210    num_distributor_rows: usize,
211    /// Number of batches the series distributor scanned.
212    num_distributor_batches: usize,
213    /// Duration of the series distributor to scan.
214    distributor_scan_cost: Duration,
215    /// Duration of the series distributor to yield.
216    distributor_yield_cost: Duration,
217    /// Duration spent in divider operations.
218    distributor_divider_cost: Duration,
219
220    /// Merge metrics.
221    merge_metrics: MergeMetrics,
222    /// Dedup metrics.
223    dedup_metrics: DedupMetrics,
224
225    /// The stream reached EOF
226    stream_eof: bool,
227
228    // Optional verbose metrics:
229    /// Inverted index apply metrics.
230    inverted_index_apply_metrics: Option<InvertedIndexApplyMetrics>,
231    /// Bloom filter index apply metrics.
232    bloom_filter_apply_metrics: Option<BloomFilterIndexApplyMetrics>,
233    /// Fulltext index apply metrics.
234    fulltext_index_apply_metrics: Option<FulltextIndexApplyMetrics>,
235    /// Parquet fetch metrics.
236    fetch_metrics: Option<ParquetFetchMetrics>,
237    /// Metadata cache metrics.
238    metadata_cache_metrics: Option<MetadataCacheMetrics>,
239    /// Per-file scan metrics, only populated when explain_verbose is true.
240    per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
241
242    /// Current memory usage for file range builders.
243    build_ranges_mem_size: isize,
244    /// Peak memory usage for file range builders.
245    build_ranges_peak_mem_size: isize,
246    /// Current number of file range builders.
247    num_range_builders: isize,
248    /// Peak number of file range builders.
249    num_peak_range_builders: isize,
250    /// Total bytes added to the range cache during this scan.
251    range_cache_size: usize,
252    /// Number of range cache hits during this scan.
253    range_cache_hit: usize,
254    /// Number of range cache misses during this scan.
255    range_cache_miss: usize,
256}
257
258/// Wrapper for file metrics that compares by total cost in reverse order.
259/// This allows using BinaryHeap as a min-heap for efficient top-K selection.
260struct CompareCostReverse<'a> {
261    total_cost: Duration,
262    file_id: RegionFileId,
263    metrics: &'a FileScanMetrics,
264}
265
266impl Ord for CompareCostReverse<'_> {
267    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
268        // Reverse comparison: smaller costs are "greater"
269        other.total_cost.cmp(&self.total_cost)
270    }
271}
272
273impl PartialOrd for CompareCostReverse<'_> {
274    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
275        Some(self.cmp(other))
276    }
277}
278
279impl Eq for CompareCostReverse<'_> {}
280
281impl PartialEq for CompareCostReverse<'_> {
282    fn eq(&self, other: &Self) -> bool {
283        self.total_cost == other.total_cost
284    }
285}
286
287impl fmt::Debug for ScanMetricsSet {
288    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
289        let ScanMetricsSet {
290            prepare_scan_cost,
291            build_reader_cost,
292            scan_cost,
293            yield_cost,
294            convert_cost,
295            total_cost,
296            num_rows,
297            num_batches,
298            num_mem_ranges,
299            num_file_ranges,
300            build_parts_cost,
301            sst_scan_cost,
302            rg_total,
303            rg_fulltext_filtered,
304            rg_inverted_filtered,
305            rg_minmax_filtered,
306            rg_bloom_filtered,
307            rg_vector_filtered,
308            rows_before_filter,
309            rows_fulltext_filtered,
310            rows_inverted_filtered,
311            rows_bloom_filtered,
312            rows_vector_filtered,
313            rows_vector_selected,
314            rows_precise_filtered,
315            fulltext_index_cache_hit,
316            fulltext_index_cache_miss,
317            inverted_index_cache_hit,
318            inverted_index_cache_miss,
319            bloom_filter_cache_hit,
320            bloom_filter_cache_miss,
321            minmax_cache_hit,
322            minmax_cache_miss,
323            pruner_cache_hit,
324            pruner_cache_miss,
325            pruner_prune_cost,
326            num_sst_record_batches,
327            num_sst_batches,
328            num_sst_rows,
329            first_poll,
330            num_series_send_timeout,
331            num_series_send_full,
332            num_distributor_rows,
333            num_distributor_batches,
334            distributor_scan_cost,
335            distributor_yield_cost,
336            distributor_divider_cost,
337            merge_metrics,
338            dedup_metrics,
339            stream_eof,
340            mem_scan_cost,
341            mem_rows,
342            mem_batches,
343            mem_series,
344            inverted_index_apply_metrics,
345            bloom_filter_apply_metrics,
346            fulltext_index_apply_metrics,
347            fetch_metrics,
348            metadata_cache_metrics,
349            per_file_metrics,
350            build_ranges_mem_size: _,
351            build_ranges_peak_mem_size,
352            num_range_builders: _,
353            num_peak_range_builders,
354            range_cache_size,
355            range_cache_hit,
356            range_cache_miss,
357        } = self;
358
359        // Write core metrics
360        write!(
361            f,
362            "{{\"prepare_scan_cost\":\"{prepare_scan_cost:?}\", \
363            \"build_reader_cost\":\"{build_reader_cost:?}\", \
364            \"scan_cost\":\"{scan_cost:?}\", \
365            \"yield_cost\":\"{yield_cost:?}\", \
366            \"total_cost\":\"{total_cost:?}\", \
367            \"num_rows\":{num_rows}, \
368            \"num_batches\":{num_batches}, \
369            \"num_mem_ranges\":{num_mem_ranges}, \
370            \"num_file_ranges\":{num_file_ranges}, \
371            \"build_parts_cost\":\"{build_parts_cost:?}\", \
372            \"sst_scan_cost\":\"{sst_scan_cost:?}\", \
373            \"rg_total\":{rg_total}, \
374            \"rows_before_filter\":{rows_before_filter}, \
375            \"num_sst_record_batches\":{num_sst_record_batches}, \
376            \"num_sst_batches\":{num_sst_batches}, \
377            \"num_sst_rows\":{num_sst_rows}, \
378            \"first_poll\":\"{first_poll:?}\""
379        )?;
380
381        // Write convert_cost if present
382        if let Some(time) = convert_cost {
383            let duration = Duration::from_nanos(time.value() as u64);
384            write!(f, ", \"convert_cost\":\"{duration:?}\"")?;
385        }
386
387        // Write non-zero filter counters
388        if *rg_fulltext_filtered > 0 {
389            write!(f, ", \"rg_fulltext_filtered\":{rg_fulltext_filtered}")?;
390        }
391        if *rg_inverted_filtered > 0 {
392            write!(f, ", \"rg_inverted_filtered\":{rg_inverted_filtered}")?;
393        }
394        if *rg_minmax_filtered > 0 {
395            write!(f, ", \"rg_minmax_filtered\":{rg_minmax_filtered}")?;
396        }
397        if *rg_bloom_filtered > 0 {
398            write!(f, ", \"rg_bloom_filtered\":{rg_bloom_filtered}")?;
399        }
400        if *rg_vector_filtered > 0 {
401            write!(f, ", \"rg_vector_filtered\":{rg_vector_filtered}")?;
402        }
403        if *rows_fulltext_filtered > 0 {
404            write!(f, ", \"rows_fulltext_filtered\":{rows_fulltext_filtered}")?;
405        }
406        if *rows_inverted_filtered > 0 {
407            write!(f, ", \"rows_inverted_filtered\":{rows_inverted_filtered}")?;
408        }
409        if *rows_bloom_filtered > 0 {
410            write!(f, ", \"rows_bloom_filtered\":{rows_bloom_filtered}")?;
411        }
412        if *rows_vector_filtered > 0 {
413            write!(f, ", \"rows_vector_filtered\":{rows_vector_filtered}")?;
414        }
415        if *rows_vector_selected > 0 {
416            write!(f, ", \"rows_vector_selected\":{rows_vector_selected}")?;
417        }
418        if *rows_precise_filtered > 0 {
419            write!(f, ", \"rows_precise_filtered\":{rows_precise_filtered}")?;
420        }
421        if *fulltext_index_cache_hit > 0 {
422            write!(
423                f,
424                ", \"fulltext_index_cache_hit\":{fulltext_index_cache_hit}"
425            )?;
426        }
427        if *fulltext_index_cache_miss > 0 {
428            write!(
429                f,
430                ", \"fulltext_index_cache_miss\":{fulltext_index_cache_miss}"
431            )?;
432        }
433        if *inverted_index_cache_hit > 0 {
434            write!(
435                f,
436                ", \"inverted_index_cache_hit\":{inverted_index_cache_hit}"
437            )?;
438        }
439        if *inverted_index_cache_miss > 0 {
440            write!(
441                f,
442                ", \"inverted_index_cache_miss\":{inverted_index_cache_miss}"
443            )?;
444        }
445        if *bloom_filter_cache_hit > 0 {
446            write!(f, ", \"bloom_filter_cache_hit\":{bloom_filter_cache_hit}")?;
447        }
448        if *bloom_filter_cache_miss > 0 {
449            write!(f, ", \"bloom_filter_cache_miss\":{bloom_filter_cache_miss}")?;
450        }
451        if *minmax_cache_hit > 0 {
452            write!(f, ", \"minmax_cache_hit\":{minmax_cache_hit}")?;
453        }
454        if *minmax_cache_miss > 0 {
455            write!(f, ", \"minmax_cache_miss\":{minmax_cache_miss}")?;
456        }
457        if *pruner_cache_hit > 0 {
458            write!(f, ", \"pruner_cache_hit\":{pruner_cache_hit}")?;
459        }
460        if *pruner_cache_miss > 0 {
461            write!(f, ", \"pruner_cache_miss\":{pruner_cache_miss}")?;
462        }
463        if !pruner_prune_cost.is_zero() {
464            write!(f, ", \"pruner_prune_cost\":\"{pruner_prune_cost:?}\"")?;
465        }
466
467        // Write non-zero distributor metrics
468        if *num_series_send_timeout > 0 {
469            write!(f, ", \"num_series_send_timeout\":{num_series_send_timeout}")?;
470        }
471        if *num_series_send_full > 0 {
472            write!(f, ", \"num_series_send_full\":{num_series_send_full}")?;
473        }
474        if *num_distributor_rows > 0 {
475            write!(f, ", \"num_distributor_rows\":{num_distributor_rows}")?;
476        }
477        if *num_distributor_batches > 0 {
478            write!(f, ", \"num_distributor_batches\":{num_distributor_batches}")?;
479        }
480        if !distributor_scan_cost.is_zero() {
481            write!(
482                f,
483                ", \"distributor_scan_cost\":\"{distributor_scan_cost:?}\""
484            )?;
485        }
486        if !distributor_yield_cost.is_zero() {
487            write!(
488                f,
489                ", \"distributor_yield_cost\":\"{distributor_yield_cost:?}\""
490            )?;
491        }
492        if !distributor_divider_cost.is_zero() {
493            write!(
494                f,
495                ", \"distributor_divider_cost\":\"{distributor_divider_cost:?}\""
496            )?;
497        }
498
499        // Write non-zero memtable metrics
500        if *mem_rows > 0 {
501            write!(f, ", \"mem_rows\":{mem_rows}")?;
502        }
503        if *mem_batches > 0 {
504            write!(f, ", \"mem_batches\":{mem_batches}")?;
505        }
506        if *mem_series > 0 {
507            write!(f, ", \"mem_series\":{mem_series}")?;
508        }
509        if !mem_scan_cost.is_zero() {
510            write!(f, ", \"mem_scan_cost\":\"{mem_scan_cost:?}\"")?;
511        }
512
513        // Write optional verbose metrics if they are not empty
514        if let Some(metrics) = inverted_index_apply_metrics
515            && !metrics.is_empty()
516        {
517            write!(f, ", \"inverted_index_apply_metrics\":{:?}", metrics)?;
518        }
519        if let Some(metrics) = bloom_filter_apply_metrics
520            && !metrics.is_empty()
521        {
522            write!(f, ", \"bloom_filter_apply_metrics\":{:?}", metrics)?;
523        }
524        if let Some(metrics) = fulltext_index_apply_metrics
525            && !metrics.is_empty()
526        {
527            write!(f, ", \"fulltext_index_apply_metrics\":{:?}", metrics)?;
528        }
529        if let Some(metrics) = fetch_metrics
530            && !metrics.is_empty()
531        {
532            write!(f, ", \"fetch_metrics\":{:?}", metrics)?;
533        }
534        if let Some(metrics) = metadata_cache_metrics
535            && !metrics.is_empty()
536        {
537            write!(f, ", \"metadata_cache_metrics\":{:?}", metrics)?;
538        }
539
540        // Write merge metrics if not empty
541        if !merge_metrics.scan_cost.is_zero() {
542            write!(f, ", \"merge_metrics\":{:?}", merge_metrics)?;
543        }
544
545        // Write dedup metrics if not empty
546        if !dedup_metrics.dedup_cost.is_zero() {
547            write!(f, ", \"dedup_metrics\":{:?}", dedup_metrics)?;
548        }
549
550        // Write top file metrics if present and non-empty
551        if let Some(file_metrics) = per_file_metrics
552            && !file_metrics.is_empty()
553        {
554            // Use min-heap (BinaryHeap with reverse comparison) to keep only top 10
555            let mut heap = BinaryHeap::new();
556            for (file_id, metrics) in file_metrics.iter() {
557                let total_cost =
558                    metrics.build_part_cost + metrics.build_reader_cost + metrics.scan_cost;
559
560                // If the file has been pruned by a pruner, the build part cost may be zero.
561                // If we didn't read any ranges from it, we don't output the file.
562                if total_cost.is_zero() && metrics.num_ranges == 0 {
563                    continue;
564                }
565
566                if heap.len() < 10 {
567                    // Haven't reached 10 yet, just push
568                    heap.push(CompareCostReverse {
569                        total_cost,
570                        file_id: *file_id,
571                        metrics,
572                    });
573                } else if let Some(min_entry) = heap.peek() {
574                    // If current cost is higher than the minimum in our top-10, replace it
575                    if total_cost > min_entry.total_cost {
576                        heap.pop();
577                        heap.push(CompareCostReverse {
578                            total_cost,
579                            file_id: *file_id,
580                            metrics,
581                        });
582                    }
583                }
584            }
585
586            let top_files = heap.into_sorted_vec();
587            write!(f, ", \"top_file_metrics\": {{")?;
588            for (i, item) in top_files.iter().enumerate() {
589                let CompareCostReverse {
590                    total_cost: _,
591                    file_id,
592                    metrics,
593                } = item;
594                if i > 0 {
595                    write!(f, ", ")?;
596                }
597                write!(f, "\"{}\": {:?}", file_id, metrics)?;
598            }
599            write!(f, "}}")?;
600        }
601
602        if *range_cache_size > 0 {
603            write!(f, ", \"range_cache_size\":{range_cache_size}")?;
604        }
605        if *range_cache_hit > 0 {
606            write!(f, ", \"range_cache_hit\":{range_cache_hit}")?;
607        }
608        if *range_cache_miss > 0 {
609            write!(f, ", \"range_cache_miss\":{range_cache_miss}")?;
610        }
611
612        write!(
613            f,
614            ", \"build_ranges_peak_mem_size\":{build_ranges_peak_mem_size}, \
615             \"num_peak_range_builders\":{num_peak_range_builders}, \
616             \"stream_eof\":{stream_eof}}}"
617        )
618    }
619}
620impl ScanMetricsSet {
621    /// Attaches the `prepare_scan_cost` to the metrics set.
622    fn with_prepare_scan_cost(mut self, cost: Duration) -> Self {
623        self.prepare_scan_cost += cost;
624        self
625    }
626
627    /// Attaches the `convert_cost` to the metrics set.
628    fn with_convert_cost(mut self, time: Time) -> Self {
629        self.convert_cost = Some(time);
630        self
631    }
632
633    /// Merges the local scanner metrics.
634    fn merge_scanner_metrics(&mut self, other: &ScannerMetrics) {
635        let ScannerMetrics {
636            scan_cost,
637            yield_cost,
638            num_batches,
639            num_rows,
640        } = other;
641
642        self.scan_cost += *scan_cost;
643        self.yield_cost += *yield_cost;
644        self.num_rows += *num_rows;
645        self.num_batches += *num_batches;
646    }
647
648    /// Merges the local reader metrics.
649    fn merge_reader_metrics(&mut self, other: &ReaderMetrics) {
650        let ReaderMetrics {
651            build_cost,
652            filter_metrics:
653                ReaderFilterMetrics {
654                    rg_total,
655                    rg_fulltext_filtered,
656                    rg_inverted_filtered,
657                    rg_minmax_filtered,
658                    rg_bloom_filtered,
659                    rg_vector_filtered,
660                    rows_total,
661                    rows_fulltext_filtered,
662                    rows_inverted_filtered,
663                    rows_bloom_filtered,
664                    rows_vector_filtered,
665                    rows_vector_selected,
666                    rows_precise_filtered,
667                    fulltext_index_cache_hit,
668                    fulltext_index_cache_miss,
669                    inverted_index_cache_hit,
670                    inverted_index_cache_miss,
671                    bloom_filter_cache_hit,
672                    bloom_filter_cache_miss,
673                    minmax_cache_hit,
674                    minmax_cache_miss,
675                    pruner_cache_hit,
676                    pruner_cache_miss,
677                    pruner_prune_cost,
678                    inverted_index_apply_metrics,
679                    bloom_filter_apply_metrics,
680                    fulltext_index_apply_metrics,
681                },
682            num_record_batches,
683            num_batches,
684            num_rows,
685            scan_cost,
686            metadata_cache_metrics,
687            fetch_metrics,
688            metadata_mem_size,
689            num_range_builders,
690        } = other;
691
692        self.build_parts_cost += *build_cost;
693        self.sst_scan_cost += *scan_cost;
694
695        self.rg_total += *rg_total;
696        self.rg_fulltext_filtered += *rg_fulltext_filtered;
697        self.rg_inverted_filtered += *rg_inverted_filtered;
698        self.rg_minmax_filtered += *rg_minmax_filtered;
699        self.rg_bloom_filtered += *rg_bloom_filtered;
700        self.rg_vector_filtered += *rg_vector_filtered;
701
702        self.rows_before_filter += *rows_total;
703        self.rows_fulltext_filtered += *rows_fulltext_filtered;
704        self.rows_inverted_filtered += *rows_inverted_filtered;
705        self.rows_bloom_filtered += *rows_bloom_filtered;
706        self.rows_vector_filtered += *rows_vector_filtered;
707        self.rows_vector_selected += *rows_vector_selected;
708        self.rows_precise_filtered += *rows_precise_filtered;
709
710        self.fulltext_index_cache_hit += *fulltext_index_cache_hit;
711        self.fulltext_index_cache_miss += *fulltext_index_cache_miss;
712        self.inverted_index_cache_hit += *inverted_index_cache_hit;
713        self.inverted_index_cache_miss += *inverted_index_cache_miss;
714        self.bloom_filter_cache_hit += *bloom_filter_cache_hit;
715        self.bloom_filter_cache_miss += *bloom_filter_cache_miss;
716        self.minmax_cache_hit += *minmax_cache_hit;
717        self.minmax_cache_miss += *minmax_cache_miss;
718        self.pruner_cache_hit += *pruner_cache_hit;
719        self.pruner_cache_miss += *pruner_cache_miss;
720        self.pruner_prune_cost += *pruner_prune_cost;
721
722        self.num_sst_record_batches += *num_record_batches;
723        self.num_sst_batches += *num_batches;
724        self.num_sst_rows += *num_rows;
725
726        // Merge optional verbose metrics
727        if let Some(metrics) = inverted_index_apply_metrics {
728            self.inverted_index_apply_metrics
729                .get_or_insert_with(InvertedIndexApplyMetrics::default)
730                .merge_from(metrics);
731        }
732        if let Some(metrics) = bloom_filter_apply_metrics {
733            self.bloom_filter_apply_metrics
734                .get_or_insert_with(BloomFilterIndexApplyMetrics::default)
735                .merge_from(metrics);
736        }
737        if let Some(metrics) = fulltext_index_apply_metrics {
738            self.fulltext_index_apply_metrics
739                .get_or_insert_with(FulltextIndexApplyMetrics::default)
740                .merge_from(metrics);
741        }
742        if let Some(metrics) = fetch_metrics {
743            self.fetch_metrics
744                .get_or_insert_with(ParquetFetchMetrics::default)
745                .merge_from(metrics);
746        }
747        self.metadata_cache_metrics
748            .get_or_insert_with(MetadataCacheMetrics::default)
749            .merge_from(metadata_cache_metrics);
750
751        // Track memory usage and update peak.
752        self.build_ranges_mem_size += *metadata_mem_size;
753        if self.build_ranges_mem_size > self.build_ranges_peak_mem_size {
754            self.build_ranges_peak_mem_size = self.build_ranges_mem_size;
755        }
756
757        // Track number of builders and update peak.
758        self.num_range_builders += *num_range_builders;
759        if self.num_range_builders > self.num_peak_range_builders {
760            self.num_peak_range_builders = self.num_range_builders;
761        }
762    }
763
764    /// Merges per-file metrics.
765    fn merge_per_file_metrics(&mut self, other: &HashMap<RegionFileId, FileScanMetrics>) {
766        let self_file_metrics = self.per_file_metrics.get_or_insert_with(HashMap::new);
767        for (file_id, metrics) in other {
768            self_file_metrics
769                .entry(*file_id)
770                .or_default()
771                .merge_from(metrics);
772        }
773    }
774
775    /// Sets distributor metrics.
776    fn set_distributor_metrics(&mut self, distributor_metrics: &SeriesDistributorMetrics) {
777        let SeriesDistributorMetrics {
778            num_series_send_timeout,
779            num_series_send_full,
780            num_rows,
781            num_batches,
782            scan_cost,
783            yield_cost,
784            divider_cost,
785        } = distributor_metrics;
786
787        self.num_series_send_timeout += *num_series_send_timeout;
788        self.num_series_send_full += *num_series_send_full;
789        self.num_distributor_rows += *num_rows;
790        self.num_distributor_batches += *num_batches;
791        self.distributor_scan_cost += *scan_cost;
792        self.distributor_yield_cost += *yield_cost;
793        self.distributor_divider_cost += *divider_cost;
794    }
795
796    /// Observes metrics.
797    fn observe_metrics(&self) {
798        READ_STAGE_ELAPSED
799            .with_label_values(&["prepare_scan"])
800            .observe(self.prepare_scan_cost.as_secs_f64());
801        READ_STAGE_ELAPSED
802            .with_label_values(&["build_reader"])
803            .observe(self.build_reader_cost.as_secs_f64());
804        READ_STAGE_ELAPSED
805            .with_label_values(&["scan"])
806            .observe(self.scan_cost.as_secs_f64());
807        READ_STAGE_ELAPSED
808            .with_label_values(&["yield"])
809            .observe(self.yield_cost.as_secs_f64());
810        if let Some(time) = &self.convert_cost {
811            READ_STAGE_ELAPSED
812                .with_label_values(&["convert"])
813                .observe(Duration::from_nanos(time.value() as u64).as_secs_f64());
814        }
815        READ_STAGE_ELAPSED
816            .with_label_values(&["total"])
817            .observe(self.total_cost.as_secs_f64());
818        READ_ROWS_RETURN.observe(self.num_rows as f64);
819        READ_BATCHES_RETURN.observe(self.num_batches as f64);
820
821        READ_STAGE_ELAPSED
822            .with_label_values(&["build_parts"])
823            .observe(self.build_parts_cost.as_secs_f64());
824
825        READ_ROW_GROUPS_TOTAL
826            .with_label_values(&["before_filtering"])
827            .inc_by(self.rg_total as u64);
828        READ_ROW_GROUPS_TOTAL
829            .with_label_values(&["fulltext_index_filtered"])
830            .inc_by(self.rg_fulltext_filtered as u64);
831        READ_ROW_GROUPS_TOTAL
832            .with_label_values(&["inverted_index_filtered"])
833            .inc_by(self.rg_inverted_filtered as u64);
834        READ_ROW_GROUPS_TOTAL
835            .with_label_values(&["minmax_index_filtered"])
836            .inc_by(self.rg_minmax_filtered as u64);
837        READ_ROW_GROUPS_TOTAL
838            .with_label_values(&["bloom_filter_index_filtered"])
839            .inc_by(self.rg_bloom_filtered as u64);
840        #[cfg(feature = "vector_index")]
841        READ_ROW_GROUPS_TOTAL
842            .with_label_values(&["vector_index_filtered"])
843            .inc_by(self.rg_vector_filtered as u64);
844
845        PRECISE_FILTER_ROWS_TOTAL
846            .with_label_values(&["parquet"])
847            .inc_by(self.rows_precise_filtered as u64);
848        READ_ROWS_IN_ROW_GROUP_TOTAL
849            .with_label_values(&["before_filtering"])
850            .inc_by(self.rows_before_filter as u64);
851        READ_ROWS_IN_ROW_GROUP_TOTAL
852            .with_label_values(&["fulltext_index_filtered"])
853            .inc_by(self.rows_fulltext_filtered as u64);
854        READ_ROWS_IN_ROW_GROUP_TOTAL
855            .with_label_values(&["inverted_index_filtered"])
856            .inc_by(self.rows_inverted_filtered as u64);
857        READ_ROWS_IN_ROW_GROUP_TOTAL
858            .with_label_values(&["bloom_filter_index_filtered"])
859            .inc_by(self.rows_bloom_filtered as u64);
860        #[cfg(feature = "vector_index")]
861        READ_ROWS_IN_ROW_GROUP_TOTAL
862            .with_label_values(&["vector_index_filtered"])
863            .inc_by(self.rows_vector_filtered as u64);
864    }
865}
866
867struct PartitionMetricsInner {
868    region_id: RegionId,
869    /// Index of the partition to scan.
870    partition: usize,
871    /// Label to distinguish different scan operation.
872    scanner_type: &'static str,
873    /// Query start time.
874    query_start: Instant,
875    /// Whether to use verbose logging.
876    explain_verbose: bool,
877    /// Verbose scan metrics that only log to debug logs by default.
878    metrics: Mutex<ScanMetricsSet>,
879    in_progress_scan: IntGauge,
880
881    // Normal metrics that always report to the [ExecutionPlanMetricsSet]:
882    /// Duration to build file ranges.
883    build_parts_cost: Time,
884    /// Duration to build the (merge) reader.
885    build_reader_cost: Time,
886    /// Duration to scan data.
887    scan_cost: Time,
888    /// Duration while waiting for `yield`.
889    yield_cost: Time,
890    /// Duration to convert [`Batch`]es.
891    convert_cost: Time,
892    /// Aggregated compute time reported to DataFusion.
893    elapsed_compute: Time,
894}
895
896impl PartitionMetricsInner {
897    fn on_finish(&self, stream_eof: bool) {
898        let mut metrics = self.metrics.lock().unwrap();
899        if metrics.total_cost.is_zero() {
900            metrics.total_cost = self.query_start.elapsed();
901        }
902        if !metrics.stream_eof {
903            metrics.stream_eof = stream_eof;
904        }
905    }
906}
907
908impl MergeMetricsReport for PartitionMetricsInner {
909    fn report(&self, metrics: &mut MergeMetrics) {
910        let mut scan_metrics = self.metrics.lock().unwrap();
911        // Merge the metrics into scan_metrics
912        scan_metrics.merge_metrics.merge(metrics);
913
914        // Reset the input metrics
915        *metrics = MergeMetrics::default();
916    }
917}
918
919impl DedupMetricsReport for PartitionMetricsInner {
920    fn report(&self, metrics: &mut DedupMetrics) {
921        let mut scan_metrics = self.metrics.lock().unwrap();
922        // Merge the metrics into scan_metrics
923        scan_metrics.dedup_metrics.merge(metrics);
924
925        // Reset the input metrics
926        *metrics = DedupMetrics::default();
927    }
928}
929
930impl Drop for PartitionMetricsInner {
931    fn drop(&mut self) {
932        self.on_finish(false);
933        let metrics = self.metrics.lock().unwrap();
934        metrics.observe_metrics();
935        self.in_progress_scan.dec();
936
937        if self.explain_verbose {
938            common_telemetry::info!(
939                "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}",
940                self.scanner_type,
941                self.region_id,
942                self.partition,
943                metrics,
944            );
945        } else {
946            common_telemetry::debug!(
947                "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}",
948                self.scanner_type,
949                self.region_id,
950                self.partition,
951                metrics,
952            );
953        }
954    }
955}
956
957/// List of PartitionMetrics.
958#[derive(Default)]
959pub(crate) struct PartitionMetricsList(Mutex<Vec<Option<PartitionMetrics>>>);
960
961impl PartitionMetricsList {
962    /// Sets a new [PartitionMetrics] at the specified partition.
963    pub(crate) fn set(&self, partition: usize, metrics: PartitionMetrics) {
964        let mut list = self.0.lock().unwrap();
965        if list.len() <= partition {
966            list.resize(partition + 1, None);
967        }
968        list[partition] = Some(metrics);
969    }
970
971    /// Format verbose metrics for each partition for explain.
972    pub(crate) fn format_verbose_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result {
973        let list = self.0.lock().unwrap();
974        write!(f, ", \"metrics_per_partition\": ")?;
975        f.debug_list()
976            .entries(list.iter().filter_map(|p| p.as_ref()))
977            .finish()?;
978        write!(f, "}}")
979    }
980}
981
982/// Metrics while reading a partition.
983#[derive(Clone)]
984pub struct PartitionMetrics(Arc<PartitionMetricsInner>);
985
986impl PartitionMetrics {
987    pub(crate) fn new(
988        region_id: RegionId,
989        partition: usize,
990        scanner_type: &'static str,
991        query_start: Instant,
992        explain_verbose: bool,
993        metrics_set: &ExecutionPlanMetricsSet,
994    ) -> Self {
995        let partition_str = partition.to_string();
996        let in_progress_scan = IN_PROGRESS_SCAN.with_label_values(&[scanner_type, &partition_str]);
997        in_progress_scan.inc();
998        let convert_cost = MetricBuilder::new(metrics_set).subset_time("convert_cost", partition);
999        let metrics = ScanMetricsSet::default()
1000            .with_prepare_scan_cost(query_start.elapsed())
1001            .with_convert_cost(convert_cost.clone());
1002        let inner = PartitionMetricsInner {
1003            region_id,
1004            partition,
1005            scanner_type,
1006            query_start,
1007            explain_verbose,
1008            metrics: Mutex::new(metrics),
1009            in_progress_scan,
1010            build_parts_cost: MetricBuilder::new(metrics_set)
1011                .subset_time("build_parts_cost", partition),
1012            build_reader_cost: MetricBuilder::new(metrics_set)
1013                .subset_time("build_reader_cost", partition),
1014            scan_cost: MetricBuilder::new(metrics_set).subset_time("scan_cost", partition),
1015            yield_cost: MetricBuilder::new(metrics_set).subset_time("yield_cost", partition),
1016            convert_cost,
1017            elapsed_compute: MetricBuilder::new(metrics_set).elapsed_compute(partition),
1018        };
1019        Self(Arc::new(inner))
1020    }
1021
1022    pub(crate) fn on_first_poll(&self) {
1023        let mut metrics = self.0.metrics.lock().unwrap();
1024        metrics.first_poll = self.0.query_start.elapsed();
1025    }
1026
1027    pub(crate) fn inc_num_mem_ranges(&self, num: usize) {
1028        let mut metrics = self.0.metrics.lock().unwrap();
1029        metrics.num_mem_ranges += num;
1030    }
1031
1032    pub fn inc_num_file_ranges(&self, num: usize) {
1033        let mut metrics = self.0.metrics.lock().unwrap();
1034        metrics.num_file_ranges += num;
1035    }
1036
1037    fn record_elapsed_compute(&self, duration: Duration) {
1038        if duration.is_zero() {
1039            return;
1040        }
1041        self.0.elapsed_compute.add_duration(duration);
1042    }
1043
1044    /// Merges `build_reader_cost`.
1045    pub(crate) fn inc_build_reader_cost(&self, cost: Duration) {
1046        self.0.build_reader_cost.add_duration(cost);
1047
1048        let mut metrics = self.0.metrics.lock().unwrap();
1049        metrics.build_reader_cost += cost;
1050    }
1051
1052    pub(crate) fn inc_convert_batch_cost(&self, cost: Duration) {
1053        self.0.convert_cost.add_duration(cost);
1054        self.record_elapsed_compute(cost);
1055    }
1056
1057    /// Reports memtable scan metrics.
1058    pub(crate) fn report_mem_scan_metrics(&self, data: &crate::memtable::MemScanMetricsData) {
1059        let mut metrics = self.0.metrics.lock().unwrap();
1060        metrics.mem_scan_cost += data.scan_cost;
1061        metrics.mem_rows += data.num_rows;
1062        metrics.mem_batches += data.num_batches;
1063        metrics.mem_series += data.total_series;
1064    }
1065
1066    /// Merges [ScannerMetrics], `build_reader_cost`, `scan_cost` and `yield_cost`.
1067    pub(crate) fn merge_metrics(&self, metrics: &ScannerMetrics) {
1068        self.0.scan_cost.add_duration(metrics.scan_cost);
1069        self.record_elapsed_compute(metrics.scan_cost);
1070        self.0.yield_cost.add_duration(metrics.yield_cost);
1071        self.record_elapsed_compute(metrics.yield_cost);
1072
1073        let mut metrics_set = self.0.metrics.lock().unwrap();
1074        metrics_set.merge_scanner_metrics(metrics);
1075    }
1076
1077    /// Merges [ReaderMetrics] and `build_reader_cost`.
1078    pub fn merge_reader_metrics(
1079        &self,
1080        metrics: &ReaderMetrics,
1081        per_file_metrics: Option<&HashMap<RegionFileId, FileScanMetrics>>,
1082    ) {
1083        self.0.build_parts_cost.add_duration(metrics.build_cost);
1084
1085        let mut metrics_set = self.0.metrics.lock().unwrap();
1086        metrics_set.merge_reader_metrics(metrics);
1087
1088        // Merge per-file metrics if provided
1089        if let Some(file_metrics) = per_file_metrics {
1090            metrics_set.merge_per_file_metrics(file_metrics);
1091        }
1092    }
1093
1094    /// Finishes the query.
1095    pub(crate) fn on_finish(&self) {
1096        self.0.on_finish(true);
1097    }
1098
1099    /// Sets the distributor metrics.
1100    pub(crate) fn set_distributor_metrics(&self, metrics: &SeriesDistributorMetrics) {
1101        let mut metrics_set = self.0.metrics.lock().unwrap();
1102        metrics_set.set_distributor_metrics(metrics);
1103    }
1104
1105    /// Returns whether verbose explain is enabled.
1106    pub(crate) fn explain_verbose(&self) -> bool {
1107        self.0.explain_verbose
1108    }
1109
1110    /// Returns a MergeMetricsReport trait object for reporting merge metrics.
1111    pub(crate) fn merge_metrics_reporter(&self) -> Arc<dyn MergeMetricsReport> {
1112        self.0.clone()
1113    }
1114
1115    /// Returns a DedupMetricsReport trait object for reporting dedup metrics.
1116    pub(crate) fn dedup_metrics_reporter(&self) -> Arc<dyn DedupMetricsReport> {
1117        self.0.clone()
1118    }
1119
1120    /// Increments the total bytes added to the range cache.
1121    #[allow(dead_code)]
1122    pub(crate) fn inc_range_cache_size(&self, size: usize) {
1123        let mut metrics = self.0.metrics.lock().unwrap();
1124        metrics.range_cache_size += size;
1125    }
1126
1127    /// Increments the range cache hit counter.
1128    #[allow(dead_code)]
1129    pub(crate) fn inc_range_cache_hit(&self) {
1130        let mut metrics = self.0.metrics.lock().unwrap();
1131        metrics.range_cache_hit += 1;
1132    }
1133
1134    /// Increments the range cache miss counter.
1135    #[allow(dead_code)]
1136    pub(crate) fn inc_range_cache_miss(&self) {
1137        let mut metrics = self.0.metrics.lock().unwrap();
1138        metrics.range_cache_miss += 1;
1139    }
1140}
1141
1142impl fmt::Debug for PartitionMetrics {
1143    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1144        let metrics = self.0.metrics.lock().unwrap();
1145        write!(
1146            f,
1147            r#"{{"partition":{}, "metrics":{:?}}}"#,
1148            self.0.partition, metrics
1149        )
1150    }
1151}
1152
1153/// Metrics for the series distributor.
1154#[derive(Default)]
1155pub(crate) struct SeriesDistributorMetrics {
1156    /// Number of send timeout in SeriesScan.
1157    pub(crate) num_series_send_timeout: usize,
1158    /// Number of send full in SeriesScan.
1159    pub(crate) num_series_send_full: usize,
1160    /// Number of rows the series distributor scanned.
1161    pub(crate) num_rows: usize,
1162    /// Number of batches the series distributor scanned.
1163    pub(crate) num_batches: usize,
1164    /// Duration of the series distributor to scan.
1165    pub(crate) scan_cost: Duration,
1166    /// Duration of the series distributor to yield.
1167    pub(crate) yield_cost: Duration,
1168    /// Duration spent in divider operations.
1169    pub(crate) divider_cost: Duration,
1170}
1171
1172/// Scans memtable ranges at `index`.
1173#[tracing::instrument(
1174    skip_all,
1175    fields(
1176        region_id = %stream_ctx.input.region_metadata().region_id,
1177        file_or_mem_index = %index.index,
1178        row_group_index = %index.row_group_index,
1179        source = "mem"
1180    )
1181)]
1182pub(crate) fn scan_mem_ranges(
1183    stream_ctx: Arc<StreamContext>,
1184    part_metrics: PartitionMetrics,
1185    index: RowGroupIndex,
1186    time_range: FileTimeRange,
1187) -> impl Stream<Item = Result<Batch>> {
1188    try_stream! {
1189        let ranges = stream_ctx.input.build_mem_ranges(index);
1190        part_metrics.inc_num_mem_ranges(ranges.len());
1191        for range in ranges {
1192            let build_reader_start = Instant::now();
1193            let mem_scan_metrics = Some(MemScanMetrics::default());
1194            let iter = range.build_prune_iter(time_range, mem_scan_metrics.clone())?;
1195            part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
1196
1197            let mut source = Source::Iter(iter);
1198            while let Some(batch) = source.next_batch().await? {
1199                yield batch;
1200            }
1201
1202            // Report the memtable scan metrics to partition metrics
1203            if let Some(ref metrics) = mem_scan_metrics {
1204                let data = metrics.data();
1205                part_metrics.report_mem_scan_metrics(&data);
1206            }
1207        }
1208    }
1209}
1210
1211/// Scans memtable ranges at `index` using flat format that returns RecordBatch.
1212#[tracing::instrument(
1213    skip_all,
1214    fields(
1215        region_id = %stream_ctx.input.region_metadata().region_id,
1216        row_group_index = %index.index,
1217        source = "mem_flat"
1218    )
1219)]
1220pub(crate) fn scan_flat_mem_ranges(
1221    stream_ctx: Arc<StreamContext>,
1222    part_metrics: PartitionMetrics,
1223    index: RowGroupIndex,
1224    time_range: FileTimeRange,
1225) -> impl Stream<Item = Result<RecordBatch>> {
1226    try_stream! {
1227        let ranges = stream_ctx.input.build_mem_ranges(index);
1228        part_metrics.inc_num_mem_ranges(ranges.len());
1229        for range in ranges {
1230            let build_reader_start = Instant::now();
1231            let mem_scan_metrics = Some(MemScanMetrics::default());
1232            let mut iter = range.build_record_batch_iter(Some(time_range), mem_scan_metrics.clone())?;
1233            part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
1234
1235            while let Some(record_batch) = iter.next().transpose()? {
1236                yield record_batch;
1237            }
1238
1239            // Report the memtable scan metrics to partition metrics
1240            if let Some(ref metrics) = mem_scan_metrics {
1241                let data = metrics.data();
1242                part_metrics.report_mem_scan_metrics(&data);
1243            }
1244        }
1245    }
1246}
1247
1248/// Files with row count greater than this threshold can contribute to the estimation.
1249const SPLIT_ROW_THRESHOLD: u64 = DEFAULT_ROW_GROUP_SIZE as u64;
1250/// Number of series threshold for splitting batches.
1251const NUM_SERIES_THRESHOLD: u64 = 10240;
1252/// Minimum batch size after splitting. The batch size is less than 60 because a series may only have
1253/// 60 samples per hour.
1254const BATCH_SIZE_THRESHOLD: u64 = 50;
1255
1256/// Returns true if splitting flat record batches may improve merge performance.
1257pub(crate) fn should_split_flat_batches_for_merge(
1258    stream_ctx: &Arc<StreamContext>,
1259    range_meta: &RangeMeta,
1260) -> bool {
1261    // Number of files to split and scan.
1262    let mut num_files_to_split = 0;
1263    let mut num_mem_rows = 0;
1264    let mut num_mem_series = 0;
1265    // Checks each file range, returns early if any range is not splittable.
1266    // For mem ranges, we collect the total number of rows and series because the number of rows in a
1267    // mem range may be too small.
1268    for index in &range_meta.row_group_indices {
1269        if stream_ctx.is_mem_range_index(*index) {
1270            let memtable = &stream_ctx.input.memtables[index.index];
1271            // Is mem range
1272            let stats = memtable.stats();
1273            num_mem_rows += stats.num_rows();
1274            num_mem_series += stats.series_count();
1275        } else if stream_ctx.is_file_range_index(*index) {
1276            // This is a file range.
1277            let file_index = index.index - stream_ctx.input.num_memtables();
1278            let file = &stream_ctx.input.files[file_index];
1279            if file.meta_ref().num_rows < SPLIT_ROW_THRESHOLD || file.meta_ref().num_series == 0 {
1280                // If the file doesn't have enough rows, or the number of series is unavailable, skips it.
1281                continue;
1282            }
1283            debug_assert!(file.meta_ref().num_rows > 0);
1284            if !can_split_series(file.meta_ref().num_rows, file.meta_ref().num_series) {
1285                // We can't split batches in a file.
1286                return false;
1287            } else {
1288                num_files_to_split += 1;
1289            }
1290        }
1291        // Skips non-file and non-mem ranges.
1292    }
1293
1294    if num_files_to_split > 0 {
1295        // We mainly consider file ranges because they have enough data for sampling.
1296        true
1297    } else if num_mem_series > 0 && num_mem_rows > 0 {
1298        // If we don't have files to scan, we check whether to split by the memtable.
1299        can_split_series(num_mem_rows as u64, num_mem_series as u64)
1300    } else {
1301        false
1302    }
1303}
1304
1305fn can_split_series(num_rows: u64, num_series: u64) -> bool {
1306    assert!(num_series > 0);
1307    assert!(num_rows > 0);
1308
1309    // It doesn't have too many series or it will have enough rows for each batch.
1310    num_series < NUM_SERIES_THRESHOLD || num_rows / num_series >= BATCH_SIZE_THRESHOLD
1311}
1312
1313/// Creates a new [ReaderFilterMetrics] with optional apply metrics initialized
1314/// based on the `explain_verbose` flag.
1315fn new_filter_metrics(explain_verbose: bool) -> ReaderFilterMetrics {
1316    if explain_verbose {
1317        ReaderFilterMetrics {
1318            inverted_index_apply_metrics: Some(InvertedIndexApplyMetrics::default()),
1319            bloom_filter_apply_metrics: Some(BloomFilterIndexApplyMetrics::default()),
1320            fulltext_index_apply_metrics: Some(FulltextIndexApplyMetrics::default()),
1321            ..Default::default()
1322        }
1323    } else {
1324        ReaderFilterMetrics::default()
1325    }
1326}
1327
1328/// Scans file ranges at `index`.
1329#[tracing::instrument(
1330    skip_all,
1331    fields(
1332        region_id = %stream_ctx.input.region_metadata().region_id,
1333        row_group_index = %index.index,
1334        source = read_type
1335    )
1336)]
1337pub(crate) async fn scan_file_ranges(
1338    stream_ctx: Arc<StreamContext>,
1339    part_metrics: PartitionMetrics,
1340    index: RowGroupIndex,
1341    read_type: &'static str,
1342    partition_pruner: Arc<PartitionPruner>,
1343) -> Result<impl Stream<Item = Result<Batch>>> {
1344    let mut reader_metrics = ReaderMetrics {
1345        filter_metrics: new_filter_metrics(part_metrics.explain_verbose()),
1346        ..Default::default()
1347    };
1348    let ranges = partition_pruner
1349        .build_file_ranges(index, &part_metrics, &mut reader_metrics)
1350        .await?;
1351    part_metrics.inc_num_file_ranges(ranges.len());
1352    part_metrics.merge_reader_metrics(&reader_metrics, None);
1353
1354    // Creates initial per-file metrics with build_part_cost.
1355    let init_per_file_metrics = if part_metrics.explain_verbose() {
1356        let file = stream_ctx.input.file_from_index(index);
1357        let file_id = file.file_id();
1358
1359        let mut map = HashMap::new();
1360        map.insert(
1361            file_id,
1362            FileScanMetrics {
1363                build_part_cost: reader_metrics.build_cost,
1364                ..Default::default()
1365            },
1366        );
1367        Some(map)
1368    } else {
1369        None
1370    };
1371
1372    Ok(build_file_range_scan_stream(
1373        stream_ctx,
1374        part_metrics,
1375        read_type,
1376        ranges,
1377        init_per_file_metrics,
1378    ))
1379}
1380
1381/// Scans file ranges at `index` using flat reader that returns RecordBatch.
1382#[tracing::instrument(
1383    skip_all,
1384    fields(
1385        region_id = %stream_ctx.input.region_metadata().region_id,
1386        row_group_index = %index.index,
1387        source = read_type
1388    )
1389)]
1390pub(crate) async fn scan_flat_file_ranges(
1391    stream_ctx: Arc<StreamContext>,
1392    part_metrics: PartitionMetrics,
1393    index: RowGroupIndex,
1394    read_type: &'static str,
1395    partition_pruner: Arc<PartitionPruner>,
1396) -> Result<impl Stream<Item = Result<RecordBatch>>> {
1397    let mut reader_metrics = ReaderMetrics {
1398        filter_metrics: new_filter_metrics(part_metrics.explain_verbose()),
1399        ..Default::default()
1400    };
1401    let ranges = partition_pruner
1402        .build_file_ranges(index, &part_metrics, &mut reader_metrics)
1403        .await?;
1404    part_metrics.inc_num_file_ranges(ranges.len());
1405    part_metrics.merge_reader_metrics(&reader_metrics, None);
1406
1407    // Creates initial per-file metrics with build_part_cost.
1408    let init_per_file_metrics = if part_metrics.explain_verbose() {
1409        let file = stream_ctx.input.file_from_index(index);
1410        let file_id = file.file_id();
1411
1412        let mut map = HashMap::new();
1413        map.insert(
1414            file_id,
1415            FileScanMetrics {
1416                build_part_cost: reader_metrics.build_cost,
1417                ..Default::default()
1418            },
1419        );
1420        Some(map)
1421    } else {
1422        None
1423    };
1424
1425    Ok(build_flat_file_range_scan_stream(
1426        stream_ctx,
1427        part_metrics,
1428        read_type,
1429        ranges,
1430        init_per_file_metrics,
1431    ))
1432}
1433
1434/// Build the stream of scanning the input [`FileRange`]s.
1435#[tracing::instrument(
1436    skip_all,
1437    fields(read_type = read_type, range_count = ranges.len())
1438)]
1439pub fn build_file_range_scan_stream(
1440    stream_ctx: Arc<StreamContext>,
1441    part_metrics: PartitionMetrics,
1442    read_type: &'static str,
1443    ranges: SmallVec<[FileRange; 2]>,
1444    mut per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
1445) -> impl Stream<Item = Result<Batch>> {
1446    try_stream! {
1447        let fetch_metrics = if part_metrics.explain_verbose() {
1448            Some(Arc::new(ParquetFetchMetrics::default()))
1449        } else {
1450            None
1451        };
1452        let reader_metrics = &mut ReaderMetrics {
1453            fetch_metrics: fetch_metrics.clone(),
1454            ..Default::default()
1455        };
1456        for range in ranges {
1457            let build_reader_start = Instant::now();
1458            let Some(reader) = range.reader(stream_ctx.input.series_row_selector, fetch_metrics.as_deref()).await? else {
1459                continue;
1460            };
1461            let build_cost = build_reader_start.elapsed();
1462            part_metrics.inc_build_reader_cost(build_cost);
1463            let compat_batch = range.compat_batch();
1464            let mut source = Source::PruneReader(reader);
1465            while let Some(mut batch) = source.next_batch().await? {
1466                if let Some(compact_batch) = compat_batch {
1467                    batch = compact_batch.as_primary_key().unwrap().compat_batch(batch)?;
1468                }
1469                yield batch;
1470            }
1471            if let Source::PruneReader(reader) = source {
1472                let prune_metrics = reader.metrics();
1473
1474                // Update per-file metrics if tracking is enabled
1475                if let Some(file_metrics_map) = per_file_metrics.as_mut() {
1476                    let file_id = range.file_handle().file_id();
1477                    let file_metrics = file_metrics_map
1478                        .entry(file_id)
1479                        .or_insert_with(FileScanMetrics::default);
1480
1481                    file_metrics.num_ranges += 1;
1482                    file_metrics.num_rows += prune_metrics.num_rows;
1483                    file_metrics.build_reader_cost += build_cost;
1484                    file_metrics.scan_cost += prune_metrics.scan_cost;
1485                }
1486
1487                reader_metrics.merge_from(&prune_metrics);
1488            }
1489        }
1490
1491        // Reports metrics.
1492        reader_metrics.observe_rows(read_type);
1493        reader_metrics.filter_metrics.observe();
1494        part_metrics.merge_reader_metrics(reader_metrics, per_file_metrics.as_ref());
1495    }
1496}
1497
1498/// Build the stream of scanning the input [`FileRange`]s using flat reader that returns RecordBatch.
1499#[tracing::instrument(
1500    skip_all,
1501    fields(read_type = read_type, range_count = ranges.len())
1502)]
1503pub fn build_flat_file_range_scan_stream(
1504    _stream_ctx: Arc<StreamContext>,
1505    part_metrics: PartitionMetrics,
1506    read_type: &'static str,
1507    ranges: SmallVec<[FileRange; 2]>,
1508    mut per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
1509) -> impl Stream<Item = Result<RecordBatch>> {
1510    try_stream! {
1511        let fetch_metrics = if part_metrics.explain_verbose() {
1512            Some(Arc::new(ParquetFetchMetrics::default()))
1513        } else {
1514            None
1515        };
1516        let reader_metrics = &mut ReaderMetrics {
1517            fetch_metrics: fetch_metrics.clone(),
1518            ..Default::default()
1519        };
1520        for range in ranges {
1521            let build_reader_start = Instant::now();
1522            let Some(mut reader) = range.flat_reader(_stream_ctx.input.series_row_selector, fetch_metrics.as_deref()).await? else{continue};
1523            let build_cost = build_reader_start.elapsed();
1524            part_metrics.inc_build_reader_cost(build_cost);
1525
1526            let may_compat = range
1527                .compat_batch()
1528                .map(|compat| {
1529                    compat.as_flat().context(UnexpectedSnafu {
1530                        reason: "Invalid compat for flat format",
1531                    })
1532                })
1533                .transpose()?;
1534
1535            let mapper = range.compaction_projection_mapper();
1536            while let Some(record_batch) = reader.next_batch().await? {
1537                let record_batch = if let Some(mapper) = mapper {
1538                    let batch = mapper.project(record_batch)?;
1539                    batch
1540                } else {
1541                    record_batch
1542                };
1543
1544                if let Some(flat_compat) = may_compat {
1545                    let batch = flat_compat.compat(record_batch)?;
1546                    yield batch;
1547                } else {
1548                    yield record_batch;
1549                }
1550            }
1551
1552            let prune_metrics = reader.metrics();
1553
1554            // Update per-file metrics if tracking is enabled
1555            if let Some(file_metrics_map) = per_file_metrics.as_mut() {
1556                let file_id = range.file_handle().file_id();
1557                let file_metrics = file_metrics_map
1558                    .entry(file_id)
1559                    .or_insert_with(FileScanMetrics::default);
1560
1561                file_metrics.num_ranges += 1;
1562                file_metrics.num_rows += prune_metrics.num_rows;
1563                file_metrics.build_reader_cost += build_cost;
1564                file_metrics.scan_cost += prune_metrics.scan_cost;
1565            }
1566
1567            reader_metrics.merge_from(&prune_metrics);
1568        }
1569
1570        // Reports metrics.
1571        reader_metrics.observe_rows(read_type);
1572        reader_metrics.filter_metrics.observe();
1573        part_metrics.merge_reader_metrics(reader_metrics, per_file_metrics.as_ref());
1574    }
1575}
1576
1577/// Build the stream of scanning the extension range denoted by the [`RowGroupIndex`].
1578#[cfg(feature = "enterprise")]
1579pub(crate) async fn scan_extension_range(
1580    context: Arc<StreamContext>,
1581    index: RowGroupIndex,
1582    partition_metrics: PartitionMetrics,
1583) -> Result<BoxedBatchStream> {
1584    use snafu::ResultExt;
1585
1586    let range = context.input.extension_range(index.index);
1587    let reader = range.reader(context.as_ref());
1588    let stream = reader
1589        .read(context, partition_metrics, index)
1590        .await
1591        .context(crate::error::ScanExternalRangeSnafu)?;
1592    Ok(stream)
1593}
1594
1595pub(crate) async fn maybe_scan_other_ranges(
1596    context: &Arc<StreamContext>,
1597    index: RowGroupIndex,
1598    metrics: &PartitionMetrics,
1599) -> Result<BoxedBatchStream> {
1600    #[cfg(feature = "enterprise")]
1601    {
1602        scan_extension_range(context.clone(), index, metrics.clone()).await
1603    }
1604
1605    #[cfg(not(feature = "enterprise"))]
1606    {
1607        let _ = context;
1608        let _ = index;
1609        let _ = metrics;
1610
1611        crate::error::UnexpectedSnafu {
1612            reason: "no other ranges scannable",
1613        }
1614        .fail()
1615    }
1616}
1617
1618/// Build the stream of scanning the extension range in flat format denoted by the [`RowGroupIndex`].
1619#[cfg(feature = "enterprise")]
1620pub(crate) async fn scan_flat_extension_range(
1621    context: Arc<StreamContext>,
1622    index: RowGroupIndex,
1623    partition_metrics: PartitionMetrics,
1624) -> Result<BoxedRecordBatchStream> {
1625    use snafu::ResultExt;
1626
1627    let range = context.input.extension_range(index.index);
1628    let reader = range.flat_reader(context.as_ref());
1629    let stream = reader
1630        .read(context, partition_metrics, index)
1631        .await
1632        .context(crate::error::ScanExternalRangeSnafu)?;
1633    Ok(stream)
1634}
1635
1636pub(crate) async fn maybe_scan_flat_other_ranges(
1637    context: &Arc<StreamContext>,
1638    index: RowGroupIndex,
1639    metrics: &PartitionMetrics,
1640) -> Result<BoxedRecordBatchStream> {
1641    #[cfg(feature = "enterprise")]
1642    {
1643        scan_flat_extension_range(context.clone(), index, metrics.clone()).await
1644    }
1645
1646    #[cfg(not(feature = "enterprise"))]
1647    {
1648        let _ = context;
1649        let _ = index;
1650        let _ = metrics;
1651
1652        crate::error::UnexpectedSnafu {
1653            reason: "no other ranges scannable in flat format",
1654        }
1655        .fail()
1656    }
1657}
1658
1659/// A stream wrapper that splits record batches from an inner stream.
1660pub(crate) struct SplitRecordBatchStream<S> {
1661    /// The inner stream that yields record batches.
1662    inner: S,
1663    /// Buffer for split batches.
1664    batches: VecDeque<RecordBatch>,
1665}
1666
1667impl<S> SplitRecordBatchStream<S> {
1668    /// Creates a new splitting stream wrapper.
1669    pub(crate) fn new(inner: S) -> Self {
1670        Self {
1671            inner,
1672            batches: VecDeque::new(),
1673        }
1674    }
1675}
1676
1677impl<S> Stream for SplitRecordBatchStream<S>
1678where
1679    S: Stream<Item = Result<RecordBatch>> + Unpin,
1680{
1681    type Item = Result<RecordBatch>;
1682
1683    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1684        loop {
1685            // First, check if we have buffered split batches
1686            if let Some(batch) = self.batches.pop_front() {
1687                return Poll::Ready(Some(Ok(batch)));
1688            }
1689
1690            // Poll the inner stream for the next batch
1691            let record_batch = match futures::ready!(Pin::new(&mut self.inner).poll_next(cx)) {
1692                Some(Ok(batch)) => batch,
1693                Some(Err(e)) => return Poll::Ready(Some(Err(e))),
1694                None => return Poll::Ready(None),
1695            };
1696
1697            // Split the batch and buffer the results
1698            split_record_batch(record_batch, &mut self.batches);
1699            // Continue the loop to return the first split batch
1700        }
1701    }
1702}
1703
1704/// Splits the batch by timestamps.
1705///
1706/// # Panics
1707/// Panics if the timestamp array is invalid.
1708pub(crate) fn split_record_batch(record_batch: RecordBatch, batches: &mut VecDeque<RecordBatch>) {
1709    let batch_rows = record_batch.num_rows();
1710    if batch_rows == 0 {
1711        return;
1712    }
1713    if batch_rows < 2 {
1714        batches.push_back(record_batch);
1715        return;
1716    }
1717
1718    let time_index_pos = time_index_column_index(record_batch.num_columns());
1719    let timestamps = record_batch.column(time_index_pos);
1720    let (ts_values, _unit) = timestamp_array_to_primitive(timestamps).unwrap();
1721    let mut offsets = Vec::with_capacity(16);
1722    offsets.push(0);
1723    let values = ts_values.values();
1724    for (i, &value) in values.iter().take(batch_rows - 1).enumerate() {
1725        if value > values[i + 1] {
1726            offsets.push(i + 1);
1727        }
1728    }
1729    offsets.push(values.len());
1730
1731    // Splits the batch by offsets.
1732    for (i, &start) in offsets[..offsets.len() - 1].iter().enumerate() {
1733        let end = offsets[i + 1];
1734        let rows_in_batch = end - start;
1735        batches.push_back(record_batch.slice(start, rows_in_batch));
1736    }
1737}