query/
metrics.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::pin::Pin;
16use std::task::{Context, Poll};
17
18use common_recordbatch::adapter::RecordBatchMetrics;
19use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream};
20use datatypes::schema::SchemaRef;
21use futures::Stream;
22use futures_util::ready;
23use lazy_static::lazy_static;
24use prometheus::*;
25
26lazy_static! {
27    /// Timer of different stages in query.
28    pub static ref QUERY_STAGE_ELAPSED: HistogramVec = register_histogram_vec!(
29        "greptime_query_stage_elapsed",
30        "query engine time elapsed during each stage",
31        &["stage"],
32        vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
33    )
34    .unwrap();
35    pub static ref PARSE_SQL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
36        .with_label_values(&["parse_sql"]);
37    pub static ref PARSE_PROMQL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
38        .with_label_values(&["parse_promql"]);
39    pub static ref OPTIMIZE_LOGICAL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
40        .with_label_values(&["optimize_logicalplan"]);
41    pub static ref OPTIMIZE_PHYSICAL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
42        .with_label_values(&["optimize_physicalplan"]);
43    pub static ref CREATE_PHYSICAL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
44        .with_label_values(&["create_physicalplan"]);
45    pub static ref EXEC_PLAN_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
46        .with_label_values(&["execute_plan"]);
47    pub static ref MERGE_SCAN_POLL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
48        .with_label_values(&["merge_scan_poll"]);
49
50    pub static ref MERGE_SCAN_REGIONS: Histogram = register_histogram!(
51        "greptime_query_merge_scan_regions",
52        "query merge scan regions"
53    )
54    .unwrap();
55    pub static ref MERGE_SCAN_ERRORS_TOTAL: IntCounter = register_int_counter!(
56        "greptime_query_merge_scan_errors_total",
57        "query merge scan errors total"
58    )
59    .unwrap();
60}
61
62/// A stream to call the callback once a RecordBatch stream is done.
63pub struct OnDone<F> {
64    stream: SendableRecordBatchStream,
65    callback: Option<F>,
66}
67
68impl<F> OnDone<F> {
69    /// Attaches a `callback` to invoke once the `stream` is terminated.
70    pub fn new(stream: SendableRecordBatchStream, callback: F) -> Self {
71        Self {
72            stream,
73            callback: Some(callback),
74        }
75    }
76}
77
78impl<F: FnOnce() + Unpin> RecordBatchStream for OnDone<F> {
79    fn name(&self) -> &str {
80        self.stream.name()
81    }
82
83    fn schema(&self) -> SchemaRef {
84        self.stream.schema()
85    }
86
87    fn output_ordering(&self) -> Option<&[OrderOption]> {
88        self.stream.output_ordering()
89    }
90
91    fn metrics(&self) -> Option<RecordBatchMetrics> {
92        self.stream.metrics()
93    }
94}
95
96impl<F: FnOnce() + Unpin> Stream for OnDone<F> {
97    type Item = common_recordbatch::error::Result<RecordBatch>;
98
99    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
100        match ready!(Pin::new(&mut self.stream).poll_next(cx)) {
101            Some(rb) => Poll::Ready(Some(rb)),
102            None => {
103                if let Some(callback) = self.callback.take() {
104                    callback();
105                }
106                Poll::Ready(None)
107            }
108        }
109    }
110
111    fn size_hint(&self) -> (usize, Option<usize>) {
112        self.stream.size_hint()
113    }
114}