1use std::pin::Pin;
16use std::task::{Context, Poll};
17
18use common_recordbatch::adapter::RecordBatchMetrics;
19use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream};
20use datatypes::schema::SchemaRef;
21use futures::Stream;
22use futures_util::ready;
23use lazy_static::lazy_static;
24use prometheus::*;
25
26lazy_static! {
27 pub static ref QUERY_STAGE_ELAPSED: HistogramVec = register_histogram_vec!(
29 "greptime_query_stage_elapsed",
30 "query engine time elapsed during each stage",
31 &["stage"],
32 vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
33 )
34 .unwrap();
35 pub static ref PARSE_SQL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
36 .with_label_values(&["parse_sql"]);
37 pub static ref PARSE_PROMQL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
38 .with_label_values(&["parse_promql"]);
39 pub static ref OPTIMIZE_LOGICAL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
40 .with_label_values(&["optimize_logicalplan"]);
41 pub static ref OPTIMIZE_PHYSICAL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
42 .with_label_values(&["optimize_physicalplan"]);
43 pub static ref CREATE_PHYSICAL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
44 .with_label_values(&["create_physicalplan"]);
45 pub static ref EXEC_PLAN_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
46 .with_label_values(&["execute_plan"]);
47 pub static ref MERGE_SCAN_POLL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
48 .with_label_values(&["merge_scan_poll"]);
49
50 pub static ref MERGE_SCAN_REGIONS: Histogram = register_histogram!(
51 "greptime_query_merge_scan_regions",
52 "query merge scan regions"
53 )
54 .unwrap();
55 pub static ref MERGE_SCAN_ERRORS_TOTAL: IntCounter = register_int_counter!(
56 "greptime_query_merge_scan_errors_total",
57 "query merge scan errors total"
58 )
59 .unwrap();
60 pub static ref PUSH_DOWN_FALLBACK_ERRORS_TOTAL: IntCounter = register_int_counter!(
61 "greptime_push_down_fallback_errors_total",
62 "query push down fallback errors total"
63 )
64 .unwrap();
65}
66
67pub struct OnDone<F> {
69 stream: SendableRecordBatchStream,
70 callback: Option<F>,
71}
72
73impl<F> OnDone<F> {
74 pub fn new(stream: SendableRecordBatchStream, callback: F) -> Self {
76 Self {
77 stream,
78 callback: Some(callback),
79 }
80 }
81}
82
83impl<F: FnOnce() + Unpin> RecordBatchStream for OnDone<F> {
84 fn name(&self) -> &str {
85 self.stream.name()
86 }
87
88 fn schema(&self) -> SchemaRef {
89 self.stream.schema()
90 }
91
92 fn output_ordering(&self) -> Option<&[OrderOption]> {
93 self.stream.output_ordering()
94 }
95
96 fn metrics(&self) -> Option<RecordBatchMetrics> {
97 self.stream.metrics()
98 }
99}
100
101impl<F: FnOnce() + Unpin> Stream for OnDone<F> {
102 type Item = common_recordbatch::error::Result<RecordBatch>;
103
104 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
105 match ready!(Pin::new(&mut self.stream).poll_next(cx)) {
106 Some(rb) => Poll::Ready(Some(rb)),
107 None => {
108 if let Some(callback) = self.callback.take() {
109 callback();
110 }
111 Poll::Ready(None)
112 }
113 }
114 }
115
116 fn size_hint(&self) -> (usize, Option<usize>) {
117 self.stream.size_hint()
118 }
119}