table/table/
metrics.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Duration;
16
17use datafusion::physical_plan::metrics::{
18    Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, ScopedTimerGuard, Time, Timestamp,
19};
20
21/// This metrics struct is used to record and hold metrics like memory usage
22/// of result batch in [`crate::table::scan::StreamWithMetricWrapper`]
23/// during query execution.
24#[derive(Debug, Clone)]
25pub struct StreamMetrics {
26    /// Timestamp when the stream finished
27    end_time: Timestamp,
28    /// Used memory in bytes
29    mem_used: Gauge,
30    /// Number of rows in output
31    output_rows: Count,
32    /// Elapsed time used to `poll` the stream
33    poll_elapsed: Time,
34    /// Elapsed time used to `.await`ing the stream
35    await_elapsed: Time,
36}
37
38impl StreamMetrics {
39    /// Create a new MemoryUsageMetrics structure, and set `start_time` to now
40    pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
41        let start_time = MetricBuilder::new(metrics).start_timestamp(partition);
42        start_time.record();
43
44        Self {
45            end_time: MetricBuilder::new(metrics).end_timestamp(partition),
46            mem_used: MetricBuilder::new(metrics).mem_used(partition),
47            output_rows: MetricBuilder::new(metrics).output_rows(partition),
48            poll_elapsed: MetricBuilder::new(metrics).subset_time("elapsed_poll", partition),
49            await_elapsed: MetricBuilder::new(metrics).subset_time("elapsed_await", partition),
50        }
51    }
52
53    pub fn record_mem_usage(&self, mem_used: usize) {
54        self.mem_used.add(mem_used);
55    }
56
57    pub fn record_output(&self, num_rows: usize) {
58        self.output_rows.add(num_rows);
59    }
60
61    /// Record the end time of the query
62    pub fn try_done(&self) {
63        if self.end_time.value().is_none() {
64            self.end_time.record()
65        }
66    }
67
68    /// Return a timer guard that records the time elapsed in poll
69    pub fn poll_timer(&self) -> ScopedTimerGuard {
70        self.poll_elapsed.timer()
71    }
72
73    pub fn record_await_duration(&self, duration: Duration) {
74        self.await_elapsed.add_duration(duration);
75    }
76}
77
78impl Drop for StreamMetrics {
79    fn drop(&mut self) {
80        self.try_done()
81    }
82}