Skip to main content

servers/
metrics.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(not(windows))]
16pub(crate) mod jemalloc;
17
18use std::task::{Context, Poll};
19use std::time::Instant;
20
21use axum::extract::{MatchedPath, Request};
22use axum::middleware::Next;
23use axum::response::IntoResponse;
24use lazy_static::lazy_static;
25use prometheus::{
26    Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, register_histogram,
27    register_histogram_vec, register_int_counter, register_int_counter_vec, register_int_gauge,
28};
29use session::context::QueryContext;
30use tonic::body::Body;
31use tower::{Layer, Service};
32
33pub(crate) const METRIC_DB_LABEL: &str = "db";
34pub(crate) const METRIC_CODE_LABEL: &str = "code";
35pub(crate) const METRIC_TYPE_LABEL: &str = "type";
36pub(crate) const METRIC_PROTOCOL_LABEL: &str = "protocol";
37pub(crate) const METRIC_ERROR_COUNTER_LABEL_MYSQL: &str = "mysql";
38pub(crate) const METRIC_MYSQL_SUBPROTOCOL_LABEL: &str = "subprotocol";
39pub(crate) const METRIC_MYSQL_BINQUERY: &str = "binquery";
40pub(crate) const METRIC_MYSQL_TEXTQUERY: &str = "textquery";
41pub(crate) const METRIC_POSTGRES_SUBPROTOCOL_LABEL: &str = "subprotocol";
42pub(crate) const METRIC_POSTGRES_SIMPLE_QUERY: &str = "simple";
43pub(crate) const METRIC_POSTGRES_EXTENDED_QUERY: &str = "extended";
44pub(crate) const METRIC_METHOD_LABEL: &str = "method";
45pub(crate) const METRIC_PATH_LABEL: &str = "path";
46pub(crate) const METRIC_RESULT_LABEL: &str = "result";
47
48pub(crate) const METRIC_SUCCESS_VALUE: &str = "success";
49pub(crate) const METRIC_FAILURE_VALUE: &str = "failure";
50
51lazy_static! {
52
53    pub static ref HTTP_REQUEST_COUNTER: IntCounterVec = register_int_counter_vec!(
54        "greptime_servers_http_request_counter",
55        "servers http request counter",
56        &[METRIC_METHOD_LABEL, METRIC_PATH_LABEL, METRIC_CODE_LABEL, METRIC_DB_LABEL]
57    ).unwrap();
58
59    pub static ref METRIC_ERROR_COUNTER: IntCounterVec = register_int_counter_vec!(
60        "greptime_servers_error",
61        "servers error",
62        &[METRIC_PROTOCOL_LABEL]
63    )
64    .unwrap();
65    /// Http SQL query duration per database.
66    pub static ref METRIC_HTTP_SQL_ELAPSED: HistogramVec = register_histogram_vec!(
67        "greptime_servers_http_sql_elapsed",
68        "servers http sql elapsed",
69        &[METRIC_DB_LABEL],
70        vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
71    )
72    .unwrap();
73    /// Http pql query duration per database.
74    pub static ref METRIC_HTTP_PROMQL_ELAPSED: HistogramVec = register_histogram_vec!(
75        "greptime_servers_http_promql_elapsed",
76        "servers http promql elapsed",
77        &[METRIC_DB_LABEL],
78        vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
79    )
80    .unwrap();
81    /// Http logs query duration per database.
82    pub static ref METRIC_HTTP_LOGS_ELAPSED: HistogramVec = register_histogram_vec!(
83        "greptime_servers_http_logs_elapsed",
84        "servers http logs elapsed",
85        &[METRIC_DB_LABEL],
86        vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
87    )
88    .unwrap();
89    pub static ref METRIC_AUTH_FAILURE: IntCounterVec = register_int_counter_vec!(
90        "greptime_servers_auth_failure_count",
91        "servers auth failure count",
92        &[METRIC_CODE_LABEL]
93    )
94    .unwrap();
95    /// Http influxdb write duration per database.
96    pub static ref METRIC_HTTP_INFLUXDB_WRITE_ELAPSED: HistogramVec = register_histogram_vec!(
97        "greptime_servers_http_influxdb_write_elapsed",
98        "servers http influxdb write elapsed",
99        &[METRIC_DB_LABEL],
100        vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
101    )
102    .unwrap();
103    /// Http prometheus write duration per database.
104    pub static ref METRIC_HTTP_PROM_STORE_WRITE_ELAPSED: HistogramVec = register_histogram_vec!(
105        "greptime_servers_http_prometheus_write_elapsed",
106        "servers http prometheus write elapsed",
107        &[METRIC_DB_LABEL],
108        vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
109    )
110    .unwrap();
111    /// Prometheus remote write codec duration.
112    pub static ref METRIC_HTTP_PROM_STORE_CODEC_ELAPSED: HistogramVec = register_histogram_vec!(
113        "greptime_servers_http_prometheus_codec_elapsed",
114        "servers http prometheus request codec duration",
115        &["type"],
116    )
117    .unwrap();
118    /// Decode duration of prometheus write request.
119    pub static ref METRIC_HTTP_PROM_STORE_DECODE_ELAPSED: Histogram = METRIC_HTTP_PROM_STORE_CODEC_ELAPSED
120        .with_label_values(&["decode"]);
121    /// Duration to convert prometheus write request to gRPC request.
122    pub static ref METRIC_HTTP_PROM_STORE_CONVERT_ELAPSED: Histogram = METRIC_HTTP_PROM_STORE_CODEC_ELAPSED
123        .with_label_values(&["convert"]);
124    /// The samples count of Prometheus remote write.
125    pub static ref PROM_STORE_REMOTE_WRITE_SAMPLES: IntCounterVec = register_int_counter_vec!(
126        "greptime_servers_prometheus_remote_write_samples",
127        "frontend prometheus remote write samples",
128        &[METRIC_DB_LABEL]
129    )
130    .unwrap();
131    pub static ref PENDING_BATCHES: IntGauge = register_int_gauge!(
132        "greptime_prom_store_pending_batches",
133        "Number of pending batches waiting to be flushed"
134    )
135    .unwrap();
136    pub static ref PENDING_ROWS: IntGauge = register_int_gauge!(
137        "greptime_prom_store_pending_rows",
138        "Number of pending rows waiting to be flushed"
139    )
140    .unwrap();
141    pub static ref PENDING_WORKERS: IntGauge = register_int_gauge!(
142        "greptime_prom_store_pending_workers",
143        "Number of active pending rows batch workers"
144    )
145    .unwrap();
146    pub static ref FLUSH_TOTAL: IntCounter = register_int_counter!(
147        "greptime_prom_store_flush_total",
148        "Total number of batch flushes"
149    )
150    .unwrap();
151    pub static ref FLUSH_ROWS: Histogram = register_histogram!(
152        "greptime_prom_store_flush_rows",
153        "Number of rows per flush",
154        vec![100.0, 1000.0, 10000.0, 50000.0, 100000.0, 500000.0]
155    )
156    .unwrap();
157    pub static ref FLUSH_ELAPSED: Histogram = register_histogram!(
158        "greptime_prom_store_flush_elapsed",
159        "Elapsed time of pending rows batch flush in seconds",
160        vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
161    )
162    .unwrap();
163    pub static ref FLUSH_DROPPED_ROWS: IntCounter = register_int_counter!(
164        "greptime_pending_rows_flush_dropped_rows",
165        "Total rows dropped due to pending rows flush failures"
166    )
167    .unwrap();
168    pub static ref FLUSH_FAILURES: IntCounter = register_int_counter!(
169        "greptime_pending_rows_flush_failures",
170        "Total pending rows flush failures"
171    )
172    .unwrap();
173    pub static ref PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED: HistogramVec = register_histogram_vec!(
174        "greptime_prom_store_pending_rows_batch_ingest_stage_elapsed",
175        "Elapsed time of pending rows batch ingestion stages in seconds",
176        &["stage"],
177        vec![0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0]
178    )
179    .unwrap();
180    pub static ref PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED: HistogramVec = register_histogram_vec!(
181        "greptime_prom_store_pending_rows_batch_flush_stage_elapsed",
182        "Elapsed time of pending rows batch flush stages in seconds",
183        &["stage"],
184        vec![0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0]
185    )
186    .unwrap();
187    /// Http prometheus read duration per database.
188    pub static ref METRIC_HTTP_PROM_STORE_READ_ELAPSED: HistogramVec = register_histogram_vec!(
189        "greptime_servers_http_prometheus_read_elapsed",
190        "servers http prometheus read elapsed",
191        &[METRIC_DB_LABEL]
192    )
193    .unwrap();
194    /// Http prometheus endpoint query duration per database.
195    pub static ref METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED: HistogramVec = register_histogram_vec!(
196        "greptime_servers_http_prometheus_promql_elapsed",
197        "servers http prometheus promql elapsed",
198        &[METRIC_DB_LABEL, METRIC_METHOD_LABEL]
199    )
200    .unwrap();
201    pub static ref METRIC_HTTP_OPENTELEMETRY_METRICS_ELAPSED: HistogramVec =
202        register_histogram_vec!(
203            "greptime_servers_http_otlp_metrics_elapsed",
204            "servers_http_otlp_metrics_elapsed",
205            &[METRIC_DB_LABEL]
206        )
207        .unwrap();
208    pub static ref METRIC_HTTP_OPENTELEMETRY_TRACES_ELAPSED: HistogramVec =
209        register_histogram_vec!(
210            "greptime_servers_http_otlp_traces_elapsed",
211            "servers http otlp traces elapsed",
212            &[METRIC_DB_LABEL]
213        )
214        .unwrap();
215    pub static ref METRIC_HTTP_OPENTELEMETRY_LOGS_ELAPSED: HistogramVec =
216    register_histogram_vec!(
217        "greptime_servers_http_otlp_logs_elapsed",
218        "servers http otlp logs elapsed",
219        &[METRIC_DB_LABEL]
220    )
221    .unwrap();
222    pub static ref METRIC_HTTP_LOGS_INGESTION_COUNTER: IntCounterVec = register_int_counter_vec!(
223        "greptime_servers_http_logs_ingestion_counter",
224        "servers http logs ingestion counter",
225        &[METRIC_DB_LABEL]
226    )
227    .unwrap();
228    pub static ref METRIC_HTTP_LOGS_INGESTION_ELAPSED: HistogramVec =
229        register_histogram_vec!(
230            "greptime_servers_http_logs_ingestion_elapsed",
231            "servers http logs ingestion elapsed",
232            &[METRIC_DB_LABEL, METRIC_RESULT_LABEL]
233        )
234        .unwrap();
235
236    /// Count of logs ingested into Loki.
237    pub static ref METRIC_LOKI_LOGS_INGESTION_COUNTER: IntCounterVec = register_int_counter_vec!(
238        "greptime_servers_loki_logs_ingestion_counter",
239        "servers loki logs ingestion counter",
240        &[METRIC_DB_LABEL]
241    )
242    .unwrap();
243    pub static ref METRIC_LOKI_LOGS_INGESTION_ELAPSED: HistogramVec =
244        register_histogram_vec!(
245            "greptime_servers_loki_logs_ingestion_elapsed",
246            "servers loki logs ingestion elapsed",
247            &[METRIC_DB_LABEL, METRIC_RESULT_LABEL]
248        )
249        .unwrap();
250    pub static ref METRIC_ELASTICSEARCH_LOGS_INGESTION_ELAPSED: HistogramVec =
251        register_histogram_vec!(
252            "greptime_servers_elasticsearch_logs_ingestion_elapsed",
253            "servers elasticsearch logs ingestion elapsed",
254            &[METRIC_DB_LABEL]
255        )
256        .unwrap();
257
258    /// Count of documents ingested into Elasticsearch logs.
259    pub static ref METRIC_ELASTICSEARCH_LOGS_DOCS_COUNT: IntCounterVec = register_int_counter_vec!(
260        "greptime_servers_elasticsearch_logs_docs_count",
261        "servers elasticsearch ingest logs docs count",
262        &[METRIC_DB_LABEL]
263    )
264    .unwrap();
265
266    pub static ref METRIC_HTTP_LOGS_TRANSFORM_ELAPSED: HistogramVec =
267        register_histogram_vec!(
268            "greptime_servers_http_logs_transform_elapsed",
269            "servers http logs transform elapsed",
270            &[METRIC_DB_LABEL, METRIC_RESULT_LABEL]
271        )
272        .unwrap();
273    pub static ref METRIC_MYSQL_CONNECTIONS: IntGauge = register_int_gauge!(
274        "greptime_servers_mysql_connection_count",
275        "servers mysql connection count"
276    )
277    .unwrap();
278    pub static ref METRIC_MYSQL_QUERY_TIMER: HistogramVec = register_histogram_vec!(
279        "greptime_servers_mysql_query_elapsed",
280        "servers mysql query elapsed",
281        &[METRIC_MYSQL_SUBPROTOCOL_LABEL, METRIC_DB_LABEL],
282        vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
283    )
284    .unwrap();
285    pub static ref METRIC_MYSQL_PREPARED_COUNT: IntCounterVec = register_int_counter_vec!(
286        "greptime_servers_mysql_prepared_count",
287        "servers mysql prepared count",
288        &[METRIC_DB_LABEL]
289    )
290    .unwrap();
291    pub static ref METRIC_POSTGRES_CONNECTIONS: IntGauge = register_int_gauge!(
292        "greptime_servers_postgres_connection_count",
293        "servers postgres connection count"
294    )
295    .unwrap();
296    pub static ref METRIC_POSTGRES_QUERY_TIMER: HistogramVec = register_histogram_vec!(
297        "greptime_servers_postgres_query_elapsed",
298        "servers postgres query elapsed",
299        &[METRIC_POSTGRES_SUBPROTOCOL_LABEL, METRIC_DB_LABEL],
300        vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
301    )
302    .unwrap();
303    pub static ref METRIC_POSTGRES_PREPARED_COUNT: IntCounter = register_int_counter!(
304        "greptime_servers_postgres_prepared_count",
305        "servers postgres prepared count"
306    )
307    .unwrap();
308    pub static ref METRIC_SERVER_GRPC_DB_REQUEST_TIMER: HistogramVec = register_histogram_vec!(
309        "greptime_servers_grpc_db_request_elapsed",
310        "servers grpc db request elapsed",
311        &[METRIC_DB_LABEL, METRIC_TYPE_LABEL, METRIC_CODE_LABEL]
312    )
313    .unwrap();
314    pub static ref METRIC_SERVER_GRPC_PROM_REQUEST_TIMER: HistogramVec = register_histogram_vec!(
315        "greptime_servers_grpc_prom_request_elapsed",
316        "servers grpc prom request elapsed",
317        &[METRIC_DB_LABEL],
318        vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
319    )
320    .unwrap();
321    pub static ref METRIC_HTTP_REQUESTS_TOTAL: IntCounterVec = register_int_counter_vec!(
322        "greptime_servers_http_requests_total",
323        "servers http requests total",
324        &[METRIC_METHOD_LABEL, METRIC_PATH_LABEL, METRIC_CODE_LABEL, METRIC_DB_LABEL]
325    )
326    .unwrap();
327    pub static ref METRIC_HTTP_REQUESTS_ELAPSED: HistogramVec = register_histogram_vec!(
328        "greptime_servers_http_requests_elapsed",
329        "servers http requests elapsed",
330        &[METRIC_METHOD_LABEL, METRIC_PATH_LABEL, METRIC_CODE_LABEL, METRIC_DB_LABEL],
331        vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
332    )
333    .unwrap();
334    pub static ref METRIC_GRPC_REQUESTS_TOTAL: IntCounterVec = register_int_counter_vec!(
335        "greptime_servers_grpc_requests_total",
336        "servers grpc requests total",
337        &[METRIC_PATH_LABEL, METRIC_CODE_LABEL]
338    )
339    .unwrap();
340    pub static ref METRIC_GRPC_REQUESTS_ELAPSED: HistogramVec = register_histogram_vec!(
341        "greptime_servers_grpc_requests_elapsed",
342        "servers grpc requests elapsed",
343        &[METRIC_PATH_LABEL, METRIC_CODE_LABEL],
344        vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
345    )
346    .unwrap();
347    pub static ref METRIC_JAEGER_QUERY_ELAPSED: HistogramVec = register_histogram_vec!(
348        "greptime_servers_jaeger_query_elapsed",
349        "servers jaeger query elapsed",
350        &[METRIC_DB_LABEL, METRIC_PATH_LABEL]
351    ).unwrap();
352
353    pub static ref GRPC_BULK_INSERT_ELAPSED: Histogram = register_histogram!(
354        "greptime_servers_bulk_insert_elapsed",
355        "servers handle bulk insert elapsed",
356    ).unwrap();
357
358    // Unified request memory metrics
359    /// Current memory in use by all concurrent requests (HTTP, gRPC, Flight).
360    pub static ref REQUEST_MEMORY_IN_USE: IntGauge = register_int_gauge!(
361        "greptime_servers_request_memory_in_use_bytes",
362        "bytes currently reserved for all concurrent request bodies and messages"
363    ).unwrap();
364
365    /// Maximum configured memory for all concurrent requests.
366    pub static ref REQUEST_MEMORY_LIMIT: IntGauge = register_int_gauge!(
367        "greptime_servers_request_memory_limit_bytes",
368        "maximum bytes allowed for all concurrent request bodies and messages"
369    ).unwrap();
370
371    /// Total number of requests rejected due to memory exhaustion.
372    pub static ref REQUEST_MEMORY_REJECTED: IntCounterVec = register_int_counter_vec!(
373        "greptime_servers_request_memory_rejected_total",
374        "number of requests rejected due to memory limit",
375        &["reason"]
376    ).unwrap();
377}
378
379// Based on https://github.com/hyperium/tonic/blob/master/examples/src/tower/server.rs
380// See https://github.com/hyperium/tonic/issues/242
381/// A metrics middleware.
382#[derive(Debug, Clone, Default)]
383pub(crate) struct MetricsMiddlewareLayer;
384
385impl<S> Layer<S> for MetricsMiddlewareLayer {
386    type Service = MetricsMiddleware<S>;
387
388    fn layer(&self, service: S) -> Self::Service {
389        MetricsMiddleware { inner: service }
390    }
391}
392
393#[derive(Debug, Clone)]
394pub(crate) struct MetricsMiddleware<S> {
395    inner: S,
396}
397
398impl<S> Service<http::Request<Body>> for MetricsMiddleware<S>
399where
400    S: Service<http::Request<Body>, Response = http::Response<Body>> + Clone + Send + 'static,
401    S::Future: Send + 'static,
402{
403    type Response = S::Response;
404    type Error = S::Error;
405    type Future = futures::future::BoxFuture<'static, Result<Self::Response, Self::Error>>;
406
407    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
408        self.inner.poll_ready(cx)
409    }
410
411    fn call(&mut self, req: http::Request<Body>) -> Self::Future {
412        // This is necessary because tonic internally uses `tower::buffer::Buffer`.
413        // See https://github.com/tower-rs/tower/issues/547#issuecomment-767629149
414        // for details on why this is necessary
415        let clone = self.inner.clone();
416        let mut inner = std::mem::replace(&mut self.inner, clone);
417
418        Box::pin(async move {
419            let start = Instant::now();
420            let path = req.uri().path().to_string();
421
422            // Do extra async work here...
423            let response = inner.call(req).await?;
424
425            let latency = start.elapsed().as_secs_f64();
426            let status = response.status().as_u16().to_string();
427
428            let labels = [path.as_str(), status.as_str()];
429            METRIC_GRPC_REQUESTS_TOTAL.with_label_values(&labels).inc();
430            METRIC_GRPC_REQUESTS_ELAPSED
431                .with_label_values(&labels)
432                .observe(latency);
433
434            Ok(response)
435        })
436    }
437}
438
439/// A middleware to record metrics for HTTP.
440// Based on https://github.com/tokio-rs/axum/blob/axum-v0.6.16/examples/prometheus-metrics/src/main.rs
441pub(crate) async fn http_metrics_layer(req: Request, next: Next) -> impl IntoResponse {
442    let start = Instant::now();
443    let path = if let Some(matched_path) = req.extensions().get::<MatchedPath>() {
444        matched_path.as_str().to_string()
445    } else {
446        req.uri().path().to_string()
447    };
448    let method = req.method().clone();
449
450    let db = req
451        .extensions()
452        .get::<QueryContext>()
453        .map(|ctx| ctx.get_db_string())
454        .unwrap_or_else(|| "unknown".to_string());
455
456    let response = next.run(req).await;
457
458    let latency = start.elapsed().as_secs_f64();
459    let status = response.status();
460    let status = status.as_str();
461    let method_str = method.as_str();
462
463    let labels = [method_str, &path, status, db.as_str()];
464    METRIC_HTTP_REQUESTS_TOTAL.with_label_values(&labels).inc();
465    METRIC_HTTP_REQUESTS_ELAPSED
466        .with_label_values(&labels)
467        .observe(latency);
468
469    response
470}