1use std::pin::Pin;
16use std::task::{Context, Poll};
17
18use common_recordbatch::adapter::RecordBatchMetrics;
19use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream};
20use datatypes::schema::SchemaRef;
21use futures::Stream;
22use futures_util::ready;
23use lazy_static::lazy_static;
24use prometheus::*;
25
26lazy_static! {
27 pub static ref QUERY_STAGE_ELAPSED: HistogramVec = register_histogram_vec!(
29 "greptime_query_stage_elapsed",
30 "query engine time elapsed during each stage",
31 &["stage"],
32 vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
33 )
34 .unwrap();
35 pub static ref PARSE_SQL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
36 .with_label_values(&["parse_sql"]);
37 pub static ref PARSE_PROMQL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
38 .with_label_values(&["parse_promql"]);
39 pub static ref OPTIMIZE_LOGICAL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
40 .with_label_values(&["optimize_logicalplan"]);
41 pub static ref OPTIMIZE_PHYSICAL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
42 .with_label_values(&["optimize_physicalplan"]);
43 pub static ref CREATE_PHYSICAL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
44 .with_label_values(&["create_physicalplan"]);
45 pub static ref EXEC_PLAN_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
46 .with_label_values(&["execute_plan"]);
47 pub static ref MERGE_SCAN_POLL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
48 .with_label_values(&["merge_scan_poll"]);
49
50 pub static ref MERGE_SCAN_REGIONS: Histogram = register_histogram!(
51 "greptime_query_merge_scan_regions",
52 "query merge scan regions"
53 )
54 .unwrap();
55 pub static ref MERGE_SCAN_ERRORS_TOTAL: IntCounter = register_int_counter!(
56 "greptime_query_merge_scan_errors_total",
57 "query merge scan errors total"
58 )
59 .unwrap();
60 pub static ref PUSH_DOWN_FALLBACK_ERRORS_TOTAL: IntCounter = register_int_counter!(
61 "greptime_push_down_fallback_errors_total",
62 "query push down fallback errors total"
63 )
64 .unwrap();
65
66 pub static ref QUERY_MEMORY_POOL_USAGE_BYTES: IntGauge = register_int_gauge!(
67 "greptime_query_memory_pool_usage_bytes",
68 "current query memory pool usage in bytes"
69 )
70 .unwrap();
71
72 pub static ref QUERY_MEMORY_POOL_REJECTED_TOTAL: IntCounter = register_int_counter!(
73 "greptime_query_memory_pool_rejected_total",
74 "total number of query memory allocations rejected"
75 )
76 .unwrap();
77}
78
79pub struct OnDone<F> {
81 stream: SendableRecordBatchStream,
82 callback: Option<F>,
83}
84
85impl<F> OnDone<F> {
86 pub fn new(stream: SendableRecordBatchStream, callback: F) -> Self {
88 Self {
89 stream,
90 callback: Some(callback),
91 }
92 }
93}
94
95impl<F: FnOnce() + Unpin> RecordBatchStream for OnDone<F> {
96 fn name(&self) -> &str {
97 self.stream.name()
98 }
99
100 fn schema(&self) -> SchemaRef {
101 self.stream.schema()
102 }
103
104 fn output_ordering(&self) -> Option<&[OrderOption]> {
105 self.stream.output_ordering()
106 }
107
108 fn metrics(&self) -> Option<RecordBatchMetrics> {
109 self.stream.metrics()
110 }
111}
112
113impl<F: FnOnce() + Unpin> Stream for OnDone<F> {
114 type Item = common_recordbatch::error::Result<RecordBatch>;
115
116 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
117 match ready!(Pin::new(&mut self.stream).poll_next(cx)) {
118 Some(rb) => Poll::Ready(Some(rb)),
119 None => {
120 if let Some(callback) = self.callback.take() {
121 callback();
122 }
123 Poll::Ready(None)
124 }
125 }
126 }
127
128 fn size_hint(&self) -> (usize, Option<usize>) {
129 self.stream.size_hint()
130 }
131}