1use std::pin::Pin;
16use std::task::{Context, Poll};
17
18use common_recordbatch::adapter::RecordBatchMetrics;
19use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream};
20use datatypes::schema::SchemaRef;
21use futures::Stream;
22use futures_util::ready;
23use lazy_static::lazy_static;
24use prometheus::*;
25
26lazy_static! {
27 pub static ref QUERY_STAGE_ELAPSED: HistogramVec = register_histogram_vec!(
29 "greptime_query_stage_elapsed",
30 "query engine time elapsed during each stage",
31 &["stage"],
32 vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
33 )
34 .unwrap();
35 pub static ref PARSE_SQL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
36 .with_label_values(&["parse_sql"]);
37 pub static ref PARSE_PROMQL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
38 .with_label_values(&["parse_promql"]);
39 pub static ref OPTIMIZE_LOGICAL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
40 .with_label_values(&["optimize_logicalplan"]);
41 pub static ref OPTIMIZE_PHYSICAL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
42 .with_label_values(&["optimize_physicalplan"]);
43 pub static ref CREATE_PHYSICAL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
44 .with_label_values(&["create_physicalplan"]);
45 pub static ref EXEC_PLAN_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
46 .with_label_values(&["execute_plan"]);
47 pub static ref MERGE_SCAN_POLL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
48 .with_label_values(&["merge_scan_poll"]);
49
50 pub static ref MERGE_SCAN_REGIONS: Histogram = register_histogram!(
51 "greptime_query_merge_scan_regions",
52 "query merge scan regions"
53 )
54 .unwrap();
55 pub static ref MERGE_SCAN_ERRORS_TOTAL: IntCounter = register_int_counter!(
56 "greptime_query_merge_scan_errors_total",
57 "query merge scan errors total"
58 )
59 .unwrap();
60}
61
62pub struct OnDone<F> {
64 stream: SendableRecordBatchStream,
65 callback: Option<F>,
66}
67
68impl<F> OnDone<F> {
69 pub fn new(stream: SendableRecordBatchStream, callback: F) -> Self {
71 Self {
72 stream,
73 callback: Some(callback),
74 }
75 }
76}
77
78impl<F: FnOnce() + Unpin> RecordBatchStream for OnDone<F> {
79 fn name(&self) -> &str {
80 self.stream.name()
81 }
82
83 fn schema(&self) -> SchemaRef {
84 self.stream.schema()
85 }
86
87 fn output_ordering(&self) -> Option<&[OrderOption]> {
88 self.stream.output_ordering()
89 }
90
91 fn metrics(&self) -> Option<RecordBatchMetrics> {
92 self.stream.metrics()
93 }
94}
95
96impl<F: FnOnce() + Unpin> Stream for OnDone<F> {
97 type Item = common_recordbatch::error::Result<RecordBatch>;
98
99 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
100 match ready!(Pin::new(&mut self.stream).poll_next(cx)) {
101 Some(rb) => Poll::Ready(Some(rb)),
102 None => {
103 if let Some(callback) = self.callback.take() {
104 callback();
105 }
106 Poll::Ready(None)
107 }
108 }
109 }
110
111 fn size_hint(&self) -> (usize, Option<usize>) {
112 self.stream.size_hint()
113 }
114}