mito2/read/
scan_util.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities for scanners.
16
17use std::collections::VecDeque;
18use std::fmt;
19use std::pin::Pin;
20use std::sync::{Arc, Mutex};
21use std::task::{Context, Poll};
22use std::time::{Duration, Instant};
23
24use async_stream::try_stream;
25use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time};
26use datatypes::arrow::record_batch::RecordBatch;
27use datatypes::timestamp::timestamp_array_to_primitive;
28use futures::Stream;
29use prometheus::IntGauge;
30use smallvec::SmallVec;
31use snafu::OptionExt;
32use store_api::storage::RegionId;
33
34use crate::error::{Result, UnexpectedSnafu};
35use crate::memtable::MemScanMetrics;
36use crate::metrics::{
37    IN_PROGRESS_SCAN, PRECISE_FILTER_ROWS_TOTAL, READ_BATCHES_RETURN, READ_ROW_GROUPS_TOTAL,
38    READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_RETURN, READ_STAGE_ELAPSED,
39};
40use crate::read::range::{RangeBuilderList, RangeMeta, RowGroupIndex};
41use crate::read::scan_region::StreamContext;
42use crate::read::{Batch, BoxedBatchStream, BoxedRecordBatchStream, ScannerMetrics, Source};
43use crate::sst::file::FileTimeRange;
44use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
45use crate::sst::parquet::file_range::FileRange;
46use crate::sst::parquet::flat_format::time_index_column_index;
47use crate::sst::parquet::reader::{ReaderFilterMetrics, ReaderMetrics};
48
49/// Verbose scan metrics for a partition.
50#[derive(Default)]
51pub(crate) struct ScanMetricsSet {
52    /// Duration to prepare the scan task.
53    prepare_scan_cost: Duration,
54    /// Duration to build the (merge) reader.
55    build_reader_cost: Duration,
56    /// Duration to scan data.
57    scan_cost: Duration,
58    /// Duration while waiting for `yield`.
59    yield_cost: Duration,
60    /// Duration of the scan.
61    total_cost: Duration,
62    /// Number of rows returned.
63    num_rows: usize,
64    /// Number of batches returned.
65    num_batches: usize,
66    /// Number of mem ranges scanned.
67    num_mem_ranges: usize,
68    /// Number of file ranges scanned.
69    num_file_ranges: usize,
70
71    // Memtable related metrics:
72    /// Duration to scan memtables.
73    mem_scan_cost: Duration,
74    /// Number of rows read from memtables.
75    mem_rows: usize,
76    /// Number of batches read from memtables.
77    mem_batches: usize,
78    /// Number of series read from memtables.
79    mem_series: usize,
80
81    // SST related metrics:
82    /// Duration to build file ranges.
83    build_parts_cost: Duration,
84    /// Number of row groups before filtering.
85    rg_total: usize,
86    /// Number of row groups filtered by fulltext index.
87    rg_fulltext_filtered: usize,
88    /// Number of row groups filtered by inverted index.
89    rg_inverted_filtered: usize,
90    /// Number of row groups filtered by min-max index.
91    rg_minmax_filtered: usize,
92    /// Number of row groups filtered by bloom filter index.
93    rg_bloom_filtered: usize,
94    /// Number of rows in row group before filtering.
95    rows_before_filter: usize,
96    /// Number of rows in row group filtered by fulltext index.
97    rows_fulltext_filtered: usize,
98    /// Number of rows in row group filtered by inverted index.
99    rows_inverted_filtered: usize,
100    /// Number of rows in row group filtered by bloom filter index.
101    rows_bloom_filtered: usize,
102    /// Number of rows filtered by precise filter.
103    rows_precise_filtered: usize,
104    /// Number of record batches read from SST.
105    num_sst_record_batches: usize,
106    /// Number of batches decoded from SST.
107    num_sst_batches: usize,
108    /// Number of rows read from SST.
109    num_sst_rows: usize,
110
111    /// Elapsed time before the first poll operation.
112    first_poll: Duration,
113
114    /// Number of send timeout in SeriesScan.
115    num_series_send_timeout: usize,
116    /// Number of send full in SeriesScan.
117    num_series_send_full: usize,
118    /// Number of rows the series distributor scanned.
119    num_distributor_rows: usize,
120    /// Number of batches the series distributor scanned.
121    num_distributor_batches: usize,
122    /// Duration of the series distributor to scan.
123    distributor_scan_cost: Duration,
124    /// Duration of the series distributor to yield.
125    distributor_yield_cost: Duration,
126
127    /// The stream reached EOF
128    stream_eof: bool,
129}
130
131impl fmt::Debug for ScanMetricsSet {
132    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
133        let ScanMetricsSet {
134            prepare_scan_cost,
135            build_reader_cost,
136            scan_cost,
137            yield_cost,
138            total_cost,
139            num_rows,
140            num_batches,
141            num_mem_ranges,
142            num_file_ranges,
143            build_parts_cost,
144            rg_total,
145            rg_fulltext_filtered,
146            rg_inverted_filtered,
147            rg_minmax_filtered,
148            rg_bloom_filtered,
149            rows_before_filter,
150            rows_fulltext_filtered,
151            rows_inverted_filtered,
152            rows_bloom_filtered,
153            rows_precise_filtered,
154            num_sst_record_batches,
155            num_sst_batches,
156            num_sst_rows,
157            first_poll,
158            num_series_send_timeout,
159            num_series_send_full,
160            num_distributor_rows,
161            num_distributor_batches,
162            distributor_scan_cost,
163            distributor_yield_cost,
164            stream_eof,
165            mem_scan_cost,
166            mem_rows,
167            mem_batches,
168            mem_series,
169        } = self;
170
171        // Write core metrics
172        write!(
173            f,
174            "{{\"prepare_scan_cost\":\"{prepare_scan_cost:?}\", \
175            \"build_reader_cost\":\"{build_reader_cost:?}\", \
176            \"scan_cost\":\"{scan_cost:?}\", \
177            \"yield_cost\":\"{yield_cost:?}\", \
178            \"total_cost\":\"{total_cost:?}\", \
179            \"num_rows\":{num_rows}, \
180            \"num_batches\":{num_batches}, \
181            \"num_mem_ranges\":{num_mem_ranges}, \
182            \"num_file_ranges\":{num_file_ranges}, \
183            \"build_parts_cost\":\"{build_parts_cost:?}\", \
184            \"rg_total\":{rg_total}, \
185            \"rows_before_filter\":{rows_before_filter}, \
186            \"num_sst_record_batches\":{num_sst_record_batches}, \
187            \"num_sst_batches\":{num_sst_batches}, \
188            \"num_sst_rows\":{num_sst_rows}, \
189            \"first_poll\":\"{first_poll:?}\""
190        )?;
191
192        // Write non-zero filter counters
193        if *rg_fulltext_filtered > 0 {
194            write!(f, ", \"rg_fulltext_filtered\":{rg_fulltext_filtered}")?;
195        }
196        if *rg_inverted_filtered > 0 {
197            write!(f, ", \"rg_inverted_filtered\":{rg_inverted_filtered}")?;
198        }
199        if *rg_minmax_filtered > 0 {
200            write!(f, ", \"rg_minmax_filtered\":{rg_minmax_filtered}")?;
201        }
202        if *rg_bloom_filtered > 0 {
203            write!(f, ", \"rg_bloom_filtered\":{rg_bloom_filtered}")?;
204        }
205        if *rows_fulltext_filtered > 0 {
206            write!(f, ", \"rows_fulltext_filtered\":{rows_fulltext_filtered}")?;
207        }
208        if *rows_inverted_filtered > 0 {
209            write!(f, ", \"rows_inverted_filtered\":{rows_inverted_filtered}")?;
210        }
211        if *rows_bloom_filtered > 0 {
212            write!(f, ", \"rows_bloom_filtered\":{rows_bloom_filtered}")?;
213        }
214        if *rows_precise_filtered > 0 {
215            write!(f, ", \"rows_precise_filtered\":{rows_precise_filtered}")?;
216        }
217
218        // Write non-zero distributor metrics
219        if *num_series_send_timeout > 0 {
220            write!(f, ", \"num_series_send_timeout\":{num_series_send_timeout}")?;
221        }
222        if *num_series_send_full > 0 {
223            write!(f, ", \"num_series_send_full\":{num_series_send_full}")?;
224        }
225        if *num_distributor_rows > 0 {
226            write!(f, ", \"num_distributor_rows\":{num_distributor_rows}")?;
227        }
228        if *num_distributor_batches > 0 {
229            write!(f, ", \"num_distributor_batches\":{num_distributor_batches}")?;
230        }
231        if !distributor_scan_cost.is_zero() {
232            write!(
233                f,
234                ", \"distributor_scan_cost\":\"{distributor_scan_cost:?}\""
235            )?;
236        }
237        if !distributor_yield_cost.is_zero() {
238            write!(
239                f,
240                ", \"distributor_yield_cost\":\"{distributor_yield_cost:?}\""
241            )?;
242        }
243
244        // Write non-zero memtable metrics
245        if *mem_rows > 0 {
246            write!(f, ", \"mem_rows\":{mem_rows}")?;
247        }
248        if *mem_batches > 0 {
249            write!(f, ", \"mem_batches\":{mem_batches}")?;
250        }
251        if *mem_series > 0 {
252            write!(f, ", \"mem_series\":{mem_series}")?;
253        }
254        if !mem_scan_cost.is_zero() {
255            write!(f, ", \"mem_scan_cost\":\"{mem_scan_cost:?}\"")?;
256        }
257
258        write!(f, ", \"stream_eof\":{stream_eof}}}")
259    }
260}
261impl ScanMetricsSet {
262    /// Attaches the `prepare_scan_cost` to the metrics set.
263    fn with_prepare_scan_cost(mut self, cost: Duration) -> Self {
264        self.prepare_scan_cost += cost;
265        self
266    }
267
268    /// Merges the local scanner metrics.
269    fn merge_scanner_metrics(&mut self, other: &ScannerMetrics) {
270        let ScannerMetrics {
271            prepare_scan_cost,
272            build_reader_cost,
273            scan_cost,
274            yield_cost,
275            num_batches,
276            num_rows,
277            num_mem_ranges,
278            num_file_ranges,
279        } = other;
280
281        self.prepare_scan_cost += *prepare_scan_cost;
282        self.build_reader_cost += *build_reader_cost;
283        self.scan_cost += *scan_cost;
284        self.yield_cost += *yield_cost;
285        self.num_rows += *num_rows;
286        self.num_batches += *num_batches;
287        self.num_mem_ranges += *num_mem_ranges;
288        self.num_file_ranges += *num_file_ranges;
289    }
290
291    /// Merges the local reader metrics.
292    fn merge_reader_metrics(&mut self, other: &ReaderMetrics) {
293        let ReaderMetrics {
294            build_cost,
295            filter_metrics:
296                ReaderFilterMetrics {
297                    rg_total,
298                    rg_fulltext_filtered,
299                    rg_inverted_filtered,
300                    rg_minmax_filtered,
301                    rg_bloom_filtered,
302                    rows_total,
303                    rows_fulltext_filtered,
304                    rows_inverted_filtered,
305                    rows_bloom_filtered,
306                    rows_precise_filtered,
307                },
308            num_record_batches,
309            num_batches,
310            num_rows,
311            scan_cost: _,
312        } = other;
313
314        self.build_parts_cost += *build_cost;
315
316        self.rg_total += *rg_total;
317        self.rg_fulltext_filtered += *rg_fulltext_filtered;
318        self.rg_inverted_filtered += *rg_inverted_filtered;
319        self.rg_minmax_filtered += *rg_minmax_filtered;
320        self.rg_bloom_filtered += *rg_bloom_filtered;
321
322        self.rows_before_filter += *rows_total;
323        self.rows_fulltext_filtered += *rows_fulltext_filtered;
324        self.rows_inverted_filtered += *rows_inverted_filtered;
325        self.rows_bloom_filtered += *rows_bloom_filtered;
326        self.rows_precise_filtered += *rows_precise_filtered;
327
328        self.num_sst_record_batches += *num_record_batches;
329        self.num_sst_batches += *num_batches;
330        self.num_sst_rows += *num_rows;
331    }
332
333    /// Sets distributor metrics.
334    fn set_distributor_metrics(&mut self, distributor_metrics: &SeriesDistributorMetrics) {
335        let SeriesDistributorMetrics {
336            num_series_send_timeout,
337            num_series_send_full,
338            num_rows,
339            num_batches,
340            scan_cost,
341            yield_cost,
342        } = distributor_metrics;
343
344        self.num_series_send_timeout += *num_series_send_timeout;
345        self.num_series_send_full += *num_series_send_full;
346        self.num_distributor_rows += *num_rows;
347        self.num_distributor_batches += *num_batches;
348        self.distributor_scan_cost += *scan_cost;
349        self.distributor_yield_cost += *yield_cost;
350    }
351
352    /// Observes metrics.
353    fn observe_metrics(&self) {
354        READ_STAGE_ELAPSED
355            .with_label_values(&["prepare_scan"])
356            .observe(self.prepare_scan_cost.as_secs_f64());
357        READ_STAGE_ELAPSED
358            .with_label_values(&["build_reader"])
359            .observe(self.build_reader_cost.as_secs_f64());
360        READ_STAGE_ELAPSED
361            .with_label_values(&["scan"])
362            .observe(self.scan_cost.as_secs_f64());
363        READ_STAGE_ELAPSED
364            .with_label_values(&["yield"])
365            .observe(self.yield_cost.as_secs_f64());
366        READ_STAGE_ELAPSED
367            .with_label_values(&["total"])
368            .observe(self.total_cost.as_secs_f64());
369        READ_ROWS_RETURN.observe(self.num_rows as f64);
370        READ_BATCHES_RETURN.observe(self.num_batches as f64);
371
372        READ_STAGE_ELAPSED
373            .with_label_values(&["build_parts"])
374            .observe(self.build_parts_cost.as_secs_f64());
375
376        READ_ROW_GROUPS_TOTAL
377            .with_label_values(&["before_filtering"])
378            .inc_by(self.rg_total as u64);
379        READ_ROW_GROUPS_TOTAL
380            .with_label_values(&["fulltext_index_filtered"])
381            .inc_by(self.rg_fulltext_filtered as u64);
382        READ_ROW_GROUPS_TOTAL
383            .with_label_values(&["inverted_index_filtered"])
384            .inc_by(self.rg_inverted_filtered as u64);
385        READ_ROW_GROUPS_TOTAL
386            .with_label_values(&["minmax_index_filtered"])
387            .inc_by(self.rg_minmax_filtered as u64);
388        READ_ROW_GROUPS_TOTAL
389            .with_label_values(&["bloom_filter_index_filtered"])
390            .inc_by(self.rg_bloom_filtered as u64);
391
392        PRECISE_FILTER_ROWS_TOTAL
393            .with_label_values(&["parquet"])
394            .inc_by(self.rows_precise_filtered as u64);
395        READ_ROWS_IN_ROW_GROUP_TOTAL
396            .with_label_values(&["before_filtering"])
397            .inc_by(self.rows_before_filter as u64);
398        READ_ROWS_IN_ROW_GROUP_TOTAL
399            .with_label_values(&["fulltext_index_filtered"])
400            .inc_by(self.rows_fulltext_filtered as u64);
401        READ_ROWS_IN_ROW_GROUP_TOTAL
402            .with_label_values(&["inverted_index_filtered"])
403            .inc_by(self.rows_inverted_filtered as u64);
404        READ_ROWS_IN_ROW_GROUP_TOTAL
405            .with_label_values(&["bloom_filter_index_filtered"])
406            .inc_by(self.rows_bloom_filtered as u64);
407    }
408}
409
410struct PartitionMetricsInner {
411    region_id: RegionId,
412    /// Index of the partition to scan.
413    partition: usize,
414    /// Label to distinguish different scan operation.
415    scanner_type: &'static str,
416    /// Query start time.
417    query_start: Instant,
418    /// Whether to use verbose logging.
419    explain_verbose: bool,
420    /// Verbose scan metrics that only log to debug logs by default.
421    metrics: Mutex<ScanMetricsSet>,
422    in_progress_scan: IntGauge,
423
424    // Normal metrics that always report to the [ExecutionPlanMetricsSet]:
425    /// Duration to build file ranges.
426    build_parts_cost: Time,
427    /// Duration to build the (merge) reader.
428    build_reader_cost: Time,
429    /// Duration to scan data.
430    scan_cost: Time,
431    /// Duration while waiting for `yield`.
432    yield_cost: Time,
433    /// Duration to convert [`Batch`]es.
434    convert_cost: Time,
435    /// Aggregated compute time reported to DataFusion.
436    elapsed_compute: Time,
437}
438
439impl PartitionMetricsInner {
440    fn on_finish(&self, stream_eof: bool) {
441        let mut metrics = self.metrics.lock().unwrap();
442        if metrics.total_cost.is_zero() {
443            metrics.total_cost = self.query_start.elapsed();
444        }
445        if !metrics.stream_eof {
446            metrics.stream_eof = stream_eof;
447        }
448    }
449}
450
451impl Drop for PartitionMetricsInner {
452    fn drop(&mut self) {
453        self.on_finish(false);
454        let metrics = self.metrics.lock().unwrap();
455        metrics.observe_metrics();
456        self.in_progress_scan.dec();
457
458        if self.explain_verbose {
459            common_telemetry::info!(
460                "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}, convert_batch_costs: {}",
461                self.scanner_type,
462                self.region_id,
463                self.partition,
464                metrics,
465                self.convert_cost,
466            );
467        } else {
468            common_telemetry::debug!(
469                "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}, convert_batch_costs: {}",
470                self.scanner_type,
471                self.region_id,
472                self.partition,
473                metrics,
474                self.convert_cost,
475            );
476        }
477    }
478}
479
480/// List of PartitionMetrics.
481#[derive(Default)]
482pub(crate) struct PartitionMetricsList(Mutex<Vec<Option<PartitionMetrics>>>);
483
484impl PartitionMetricsList {
485    /// Sets a new [PartitionMetrics] at the specified partition.
486    pub(crate) fn set(&self, partition: usize, metrics: PartitionMetrics) {
487        let mut list = self.0.lock().unwrap();
488        if list.len() <= partition {
489            list.resize(partition + 1, None);
490        }
491        list[partition] = Some(metrics);
492    }
493
494    /// Format verbose metrics for each partition for explain.
495    pub(crate) fn format_verbose_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result {
496        let list = self.0.lock().unwrap();
497        write!(f, ", \"metrics_per_partition\": ")?;
498        f.debug_list()
499            .entries(list.iter().filter_map(|p| p.as_ref()))
500            .finish()?;
501        write!(f, "}}")
502    }
503}
504
505/// Metrics while reading a partition.
506#[derive(Clone)]
507pub struct PartitionMetrics(Arc<PartitionMetricsInner>);
508
509impl PartitionMetrics {
510    pub(crate) fn new(
511        region_id: RegionId,
512        partition: usize,
513        scanner_type: &'static str,
514        query_start: Instant,
515        explain_verbose: bool,
516        metrics_set: &ExecutionPlanMetricsSet,
517    ) -> Self {
518        let partition_str = partition.to_string();
519        let in_progress_scan = IN_PROGRESS_SCAN.with_label_values(&[scanner_type, &partition_str]);
520        in_progress_scan.inc();
521        let metrics = ScanMetricsSet::default().with_prepare_scan_cost(query_start.elapsed());
522        let inner = PartitionMetricsInner {
523            region_id,
524            partition,
525            scanner_type,
526            query_start,
527            explain_verbose,
528            metrics: Mutex::new(metrics),
529            in_progress_scan,
530            build_parts_cost: MetricBuilder::new(metrics_set)
531                .subset_time("build_parts_cost", partition),
532            build_reader_cost: MetricBuilder::new(metrics_set)
533                .subset_time("build_reader_cost", partition),
534            scan_cost: MetricBuilder::new(metrics_set).subset_time("scan_cost", partition),
535            yield_cost: MetricBuilder::new(metrics_set).subset_time("yield_cost", partition),
536            convert_cost: MetricBuilder::new(metrics_set).subset_time("convert_cost", partition),
537            elapsed_compute: MetricBuilder::new(metrics_set).elapsed_compute(partition),
538        };
539        Self(Arc::new(inner))
540    }
541
542    pub(crate) fn on_first_poll(&self) {
543        let mut metrics = self.0.metrics.lock().unwrap();
544        metrics.first_poll = self.0.query_start.elapsed();
545    }
546
547    pub(crate) fn inc_num_mem_ranges(&self, num: usize) {
548        let mut metrics = self.0.metrics.lock().unwrap();
549        metrics.num_mem_ranges += num;
550    }
551
552    pub fn inc_num_file_ranges(&self, num: usize) {
553        let mut metrics = self.0.metrics.lock().unwrap();
554        metrics.num_file_ranges += num;
555    }
556
557    fn record_elapsed_compute(&self, duration: Duration) {
558        if duration.is_zero() {
559            return;
560        }
561        self.0.elapsed_compute.add_duration(duration);
562    }
563
564    /// Merges `build_reader_cost`.
565    pub(crate) fn inc_build_reader_cost(&self, cost: Duration) {
566        self.0.build_reader_cost.add_duration(cost);
567
568        let mut metrics = self.0.metrics.lock().unwrap();
569        metrics.build_reader_cost += cost;
570    }
571
572    pub(crate) fn inc_convert_batch_cost(&self, cost: Duration) {
573        self.0.convert_cost.add_duration(cost);
574        self.record_elapsed_compute(cost);
575    }
576
577    /// Reports memtable scan metrics.
578    pub(crate) fn report_mem_scan_metrics(&self, data: &crate::memtable::MemScanMetricsData) {
579        let mut metrics = self.0.metrics.lock().unwrap();
580        metrics.mem_scan_cost += data.scan_cost;
581        metrics.mem_rows += data.num_rows;
582        metrics.mem_batches += data.num_batches;
583        metrics.mem_series += data.total_series;
584    }
585
586    /// Merges [ScannerMetrics], `build_reader_cost`, `scan_cost` and `yield_cost`.
587    pub(crate) fn merge_metrics(&self, metrics: &ScannerMetrics) {
588        self.0
589            .build_reader_cost
590            .add_duration(metrics.build_reader_cost);
591        self.0.scan_cost.add_duration(metrics.scan_cost);
592        self.record_elapsed_compute(metrics.scan_cost);
593        self.0.yield_cost.add_duration(metrics.yield_cost);
594        self.record_elapsed_compute(metrics.yield_cost);
595
596        let mut metrics_set = self.0.metrics.lock().unwrap();
597        metrics_set.merge_scanner_metrics(metrics);
598    }
599
600    /// Merges [ReaderMetrics] and `build_reader_cost`.
601    pub fn merge_reader_metrics(&self, metrics: &ReaderMetrics) {
602        self.0.build_parts_cost.add_duration(metrics.build_cost);
603
604        let mut metrics_set = self.0.metrics.lock().unwrap();
605        metrics_set.merge_reader_metrics(metrics);
606    }
607
608    /// Finishes the query.
609    pub(crate) fn on_finish(&self) {
610        self.0.on_finish(true);
611    }
612
613    /// Sets the distributor metrics.
614    pub(crate) fn set_distributor_metrics(&self, metrics: &SeriesDistributorMetrics) {
615        let mut metrics_set = self.0.metrics.lock().unwrap();
616        metrics_set.set_distributor_metrics(metrics);
617    }
618}
619
620impl fmt::Debug for PartitionMetrics {
621    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
622        let metrics = self.0.metrics.lock().unwrap();
623        write!(
624            f,
625            r#"{{"partition":{}, "metrics":{:?}}}"#,
626            self.0.partition, metrics
627        )
628    }
629}
630
631/// Metrics for the series distributor.
632#[derive(Default)]
633pub(crate) struct SeriesDistributorMetrics {
634    /// Number of send timeout in SeriesScan.
635    pub(crate) num_series_send_timeout: usize,
636    /// Number of send full in SeriesScan.
637    pub(crate) num_series_send_full: usize,
638    /// Number of rows the series distributor scanned.
639    pub(crate) num_rows: usize,
640    /// Number of batches the series distributor scanned.
641    pub(crate) num_batches: usize,
642    /// Duration of the series distributor to scan.
643    pub(crate) scan_cost: Duration,
644    /// Duration of the series distributor to yield.
645    pub(crate) yield_cost: Duration,
646}
647
648/// Scans memtable ranges at `index`.
649pub(crate) fn scan_mem_ranges(
650    stream_ctx: Arc<StreamContext>,
651    part_metrics: PartitionMetrics,
652    index: RowGroupIndex,
653    time_range: FileTimeRange,
654) -> impl Stream<Item = Result<Batch>> {
655    try_stream! {
656        let ranges = stream_ctx.input.build_mem_ranges(index);
657        part_metrics.inc_num_mem_ranges(ranges.len());
658        for range in ranges {
659            let build_reader_start = Instant::now();
660            let mem_scan_metrics = Some(MemScanMetrics::default());
661            let iter = range.build_prune_iter(time_range, mem_scan_metrics.clone())?;
662            part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
663
664            let mut source = Source::Iter(iter);
665            while let Some(batch) = source.next_batch().await? {
666                yield batch;
667            }
668
669            // Report the memtable scan metrics to partition metrics
670            if let Some(ref metrics) = mem_scan_metrics {
671                let data = metrics.data();
672                part_metrics.report_mem_scan_metrics(&data);
673            }
674        }
675    }
676}
677
678/// Scans memtable ranges at `index` using flat format that returns RecordBatch.
679pub(crate) fn scan_flat_mem_ranges(
680    stream_ctx: Arc<StreamContext>,
681    part_metrics: PartitionMetrics,
682    index: RowGroupIndex,
683) -> impl Stream<Item = Result<RecordBatch>> {
684    try_stream! {
685        let ranges = stream_ctx.input.build_mem_ranges(index);
686        part_metrics.inc_num_mem_ranges(ranges.len());
687        for range in ranges {
688            let build_reader_start = Instant::now();
689            let mem_scan_metrics = Some(MemScanMetrics::default());
690            let mut iter = range.build_record_batch_iter(mem_scan_metrics.clone())?;
691            part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
692
693            while let Some(record_batch) = iter.next().transpose()? {
694                yield record_batch;
695            }
696
697            // Report the memtable scan metrics to partition metrics
698            if let Some(ref metrics) = mem_scan_metrics {
699                let data = metrics.data();
700                part_metrics.report_mem_scan_metrics(&data);
701            }
702        }
703    }
704}
705
706/// Files with row count greater than this threshold can contribute to the estimation.
707const SPLIT_ROW_THRESHOLD: u64 = DEFAULT_ROW_GROUP_SIZE as u64;
708/// Number of series threshold for splitting batches.
709const NUM_SERIES_THRESHOLD: u64 = 10240;
710/// Minimum batch size after splitting. The batch size is less than 60 because a series may only have
711/// 60 samples per hour.
712const BATCH_SIZE_THRESHOLD: u64 = 50;
713
714/// Returns true if splitting flat record batches may improve merge performance.
715pub(crate) fn should_split_flat_batches_for_merge(
716    stream_ctx: &Arc<StreamContext>,
717    range_meta: &RangeMeta,
718) -> bool {
719    // Number of files to split and scan.
720    let mut num_files_to_split = 0;
721    let mut num_mem_rows = 0;
722    let mut num_mem_series = 0;
723    // Checks each file range, returns early if any range is not splittable.
724    // For mem ranges, we collect the total number of rows and series because the number of rows in a
725    // mem range may be too small.
726    for index in &range_meta.row_group_indices {
727        if stream_ctx.is_mem_range_index(*index) {
728            let memtable = &stream_ctx.input.memtables[index.index];
729            // Is mem range
730            let stats = memtable.stats();
731            num_mem_rows += stats.num_rows();
732            num_mem_series += stats.series_count();
733        } else if stream_ctx.is_file_range_index(*index) {
734            // This is a file range.
735            let file_index = index.index - stream_ctx.input.num_memtables();
736            let file = &stream_ctx.input.files[file_index];
737            if file.meta_ref().num_rows < SPLIT_ROW_THRESHOLD || file.meta_ref().num_series == 0 {
738                // If the file doesn't have enough rows, or the number of series is unavailable, skips it.
739                continue;
740            }
741            debug_assert!(file.meta_ref().num_rows > 0);
742            if !can_split_series(file.meta_ref().num_rows, file.meta_ref().num_series) {
743                // We can't split batches in a file.
744                return false;
745            } else {
746                num_files_to_split += 1;
747            }
748        }
749        // Skips non-file and non-mem ranges.
750    }
751
752    if num_files_to_split > 0 {
753        // We mainly consider file ranges because they have enough data for sampling.
754        true
755    } else if num_mem_series > 0 && num_mem_rows > 0 {
756        // If we don't have files to scan, we check whether to split by the memtable.
757        can_split_series(num_mem_rows as u64, num_mem_series as u64)
758    } else {
759        false
760    }
761}
762
763fn can_split_series(num_rows: u64, num_series: u64) -> bool {
764    assert!(num_series > 0);
765    assert!(num_rows > 0);
766
767    // It doesn't have too many series or it will have enough rows for each batch.
768    num_series < NUM_SERIES_THRESHOLD || num_rows / num_series >= BATCH_SIZE_THRESHOLD
769}
770
771/// Scans file ranges at `index`.
772pub(crate) async fn scan_file_ranges(
773    stream_ctx: Arc<StreamContext>,
774    part_metrics: PartitionMetrics,
775    index: RowGroupIndex,
776    read_type: &'static str,
777    range_builder: Arc<RangeBuilderList>,
778) -> Result<impl Stream<Item = Result<Batch>>> {
779    let mut reader_metrics = ReaderMetrics::default();
780    let ranges = range_builder
781        .build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
782        .await?;
783    part_metrics.inc_num_file_ranges(ranges.len());
784    part_metrics.merge_reader_metrics(&reader_metrics);
785
786    Ok(build_file_range_scan_stream(
787        stream_ctx,
788        part_metrics,
789        read_type,
790        ranges,
791    ))
792}
793
794/// Scans file ranges at `index` using flat reader that returns RecordBatch.
795pub(crate) async fn scan_flat_file_ranges(
796    stream_ctx: Arc<StreamContext>,
797    part_metrics: PartitionMetrics,
798    index: RowGroupIndex,
799    read_type: &'static str,
800    range_builder: Arc<RangeBuilderList>,
801) -> Result<impl Stream<Item = Result<RecordBatch>>> {
802    let mut reader_metrics = ReaderMetrics::default();
803    let ranges = range_builder
804        .build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
805        .await?;
806    part_metrics.inc_num_file_ranges(ranges.len());
807    part_metrics.merge_reader_metrics(&reader_metrics);
808
809    Ok(build_flat_file_range_scan_stream(
810        stream_ctx,
811        part_metrics,
812        read_type,
813        ranges,
814    ))
815}
816
817/// Build the stream of scanning the input [`FileRange`]s.
818pub fn build_file_range_scan_stream(
819    stream_ctx: Arc<StreamContext>,
820    part_metrics: PartitionMetrics,
821    read_type: &'static str,
822    ranges: SmallVec<[FileRange; 2]>,
823) -> impl Stream<Item = Result<Batch>> {
824    try_stream! {
825        let reader_metrics = &mut ReaderMetrics::default();
826        for range in ranges {
827            let build_reader_start = Instant::now();
828            let reader = range.reader(stream_ctx.input.series_row_selector).await?;
829            let build_cost = build_reader_start.elapsed();
830            part_metrics.inc_build_reader_cost(build_cost);
831            let compat_batch = range.compat_batch();
832            let mut source = Source::PruneReader(reader);
833            while let Some(mut batch) = source.next_batch().await? {
834                if let Some(compact_batch) = compat_batch {
835                    batch = compact_batch.as_primary_key().unwrap().compat_batch(batch)?;
836                }
837                yield batch;
838            }
839            if let Source::PruneReader(reader) = source {
840                let prune_metrics = reader.metrics();
841                reader_metrics.merge_from(&prune_metrics);
842            }
843        }
844
845        // Reports metrics.
846        reader_metrics.observe_rows(read_type);
847        reader_metrics.filter_metrics.observe();
848        part_metrics.merge_reader_metrics(reader_metrics);
849    }
850}
851
852/// Build the stream of scanning the input [`FileRange`]s using flat reader that returns RecordBatch.
853pub fn build_flat_file_range_scan_stream(
854    _stream_ctx: Arc<StreamContext>,
855    part_metrics: PartitionMetrics,
856    read_type: &'static str,
857    ranges: SmallVec<[FileRange; 2]>,
858) -> impl Stream<Item = Result<RecordBatch>> {
859    try_stream! {
860        let reader_metrics = &mut ReaderMetrics::default();
861        for range in ranges {
862            let build_reader_start = Instant::now();
863            let mut reader = range.flat_reader().await?;
864            let build_cost = build_reader_start.elapsed();
865            part_metrics.inc_build_reader_cost(build_cost);
866
867            let may_compat = range
868                .compat_batch()
869                .map(|compat| {
870                    compat.as_flat().context(UnexpectedSnafu {
871                        reason: "Invalid compat for flat format",
872                    })
873                })
874                .transpose()?;
875            while let Some(record_batch) = reader.next_batch()? {
876                if let Some(flat_compat) = may_compat {
877                    let batch = flat_compat.compat(record_batch)?;
878                    yield batch;
879                } else {
880                    yield record_batch;
881                }
882            }
883
884            let prune_metrics = reader.metrics();
885            reader_metrics.merge_from(&prune_metrics);
886        }
887
888        // Reports metrics.
889        reader_metrics.observe_rows(read_type);
890        reader_metrics.filter_metrics.observe();
891        part_metrics.merge_reader_metrics(reader_metrics);
892    }
893}
894
895/// Build the stream of scanning the extension range denoted by the [`RowGroupIndex`].
896#[cfg(feature = "enterprise")]
897pub(crate) async fn scan_extension_range(
898    context: Arc<StreamContext>,
899    index: RowGroupIndex,
900    partition_metrics: PartitionMetrics,
901) -> Result<BoxedBatchStream> {
902    use snafu::ResultExt;
903
904    let range = context.input.extension_range(index.index);
905    let reader = range.reader(context.as_ref());
906    let stream = reader
907        .read(context, partition_metrics, index)
908        .await
909        .context(crate::error::ScanExternalRangeSnafu)?;
910    Ok(stream)
911}
912
913pub(crate) async fn maybe_scan_other_ranges(
914    context: &Arc<StreamContext>,
915    index: RowGroupIndex,
916    metrics: &PartitionMetrics,
917) -> Result<BoxedBatchStream> {
918    #[cfg(feature = "enterprise")]
919    {
920        scan_extension_range(context.clone(), index, metrics.clone()).await
921    }
922
923    #[cfg(not(feature = "enterprise"))]
924    {
925        let _ = context;
926        let _ = index;
927        let _ = metrics;
928
929        crate::error::UnexpectedSnafu {
930            reason: "no other ranges scannable",
931        }
932        .fail()
933    }
934}
935
936pub(crate) async fn maybe_scan_flat_other_ranges(
937    context: &Arc<StreamContext>,
938    index: RowGroupIndex,
939    metrics: &PartitionMetrics,
940) -> Result<BoxedRecordBatchStream> {
941    let _ = context;
942    let _ = index;
943    let _ = metrics;
944
945    crate::error::UnexpectedSnafu {
946        reason: "no other ranges scannable in flat format",
947    }
948    .fail()
949}
950
951/// A stream wrapper that splits record batches from an inner stream.
952pub(crate) struct SplitRecordBatchStream<S> {
953    /// The inner stream that yields record batches.
954    inner: S,
955    /// Buffer for split batches.
956    batches: VecDeque<RecordBatch>,
957}
958
959impl<S> SplitRecordBatchStream<S> {
960    /// Creates a new splitting stream wrapper.
961    pub(crate) fn new(inner: S) -> Self {
962        Self {
963            inner,
964            batches: VecDeque::new(),
965        }
966    }
967}
968
969impl<S> Stream for SplitRecordBatchStream<S>
970where
971    S: Stream<Item = Result<RecordBatch>> + Unpin,
972{
973    type Item = Result<RecordBatch>;
974
975    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
976        loop {
977            // First, check if we have buffered split batches
978            if let Some(batch) = self.batches.pop_front() {
979                return Poll::Ready(Some(Ok(batch)));
980            }
981
982            // Poll the inner stream for the next batch
983            let record_batch = match futures::ready!(Pin::new(&mut self.inner).poll_next(cx)) {
984                Some(Ok(batch)) => batch,
985                Some(Err(e)) => return Poll::Ready(Some(Err(e))),
986                None => return Poll::Ready(None),
987            };
988
989            // Split the batch and buffer the results
990            split_record_batch(record_batch, &mut self.batches);
991            // Continue the loop to return the first split batch
992        }
993    }
994}
995
996/// Splits the batch by timestamps.
997///
998/// # Panics
999/// Panics if the timestamp array is invalid.
1000pub(crate) fn split_record_batch(record_batch: RecordBatch, batches: &mut VecDeque<RecordBatch>) {
1001    let batch_rows = record_batch.num_rows();
1002    if batch_rows == 0 {
1003        return;
1004    }
1005    if batch_rows < 2 {
1006        batches.push_back(record_batch);
1007        return;
1008    }
1009
1010    let time_index_pos = time_index_column_index(record_batch.num_columns());
1011    let timestamps = record_batch.column(time_index_pos);
1012    let (ts_values, _unit) = timestamp_array_to_primitive(timestamps).unwrap();
1013    let mut offsets = Vec::with_capacity(16);
1014    offsets.push(0);
1015    let values = ts_values.values();
1016    for (i, &value) in values.iter().take(batch_rows - 1).enumerate() {
1017        if value > values[i + 1] {
1018            offsets.push(i + 1);
1019        }
1020    }
1021    offsets.push(values.len());
1022
1023    // Splits the batch by offsets.
1024    for (i, &start) in offsets[..offsets.len() - 1].iter().enumerate() {
1025        let end = offsets[i + 1];
1026        let rows_in_batch = end - start;
1027        batches.push_back(record_batch.slice(start, rows_in_batch));
1028    }
1029}