1use std::time::Duration;
16
17use datafusion::physical_plan::metrics::{
18 Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, ScopedTimerGuard, Time, Timestamp,
19};
20
21#[derive(Debug, Clone)]
25pub struct StreamMetrics {
26 end_time: Timestamp,
28 mem_used: Gauge,
30 output_rows: Count,
32 poll_elapsed: Time,
34 await_elapsed: Time,
36}
37
38impl StreamMetrics {
39 pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
41 let start_time = MetricBuilder::new(metrics).start_timestamp(partition);
42 start_time.record();
43
44 Self {
45 end_time: MetricBuilder::new(metrics).end_timestamp(partition),
46 mem_used: MetricBuilder::new(metrics).mem_used(partition),
47 output_rows: MetricBuilder::new(metrics).output_rows(partition),
48 poll_elapsed: MetricBuilder::new(metrics).subset_time("elapsed_poll", partition),
49 await_elapsed: MetricBuilder::new(metrics).subset_time("elapsed_await", partition),
50 }
51 }
52
53 pub fn record_mem_usage(&self, mem_used: usize) {
54 self.mem_used.add(mem_used);
55 }
56
57 pub fn record_output(&self, num_rows: usize) {
58 self.output_rows.add(num_rows);
59 }
60
61 pub fn try_done(&self) {
63 if self.end_time.value().is_none() {
64 self.end_time.record()
65 }
66 }
67
68 pub fn poll_timer(&self) -> ScopedTimerGuard {
70 self.poll_elapsed.timer()
71 }
72
73 pub fn record_await_duration(&self, duration: Duration) {
74 self.await_elapsed.add_duration(duration);
75 }
76}
77
78impl Drop for StreamMetrics {
79 fn drop(&mut self) {
80 self.try_done()
81 }
82}