#[cfg(not(windows))]
pub(crate) mod jemalloc;
use std::task::{Context, Poll};
use std::time::Instant;
use axum::extract::MatchedPath;
use axum::http::Request;
use axum::middleware::Next;
use axum::response::IntoResponse;
use hyper::Body;
use lazy_static::lazy_static;
use prometheus::{
register_histogram_vec, register_int_counter, register_int_counter_vec, register_int_gauge,
Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge,
};
use tonic::body::BoxBody;
use tower::{Layer, Service};
pub(crate) const METRIC_DB_LABEL: &str = "db";
pub(crate) const METRIC_CODE_LABEL: &str = "code";
pub(crate) const METRIC_TYPE_LABEL: &str = "type";
pub(crate) const METRIC_PROTOCOL_LABEL: &str = "protocol";
pub(crate) const METRIC_ERROR_COUNTER_LABEL_MYSQL: &str = "mysql";
pub(crate) const METRIC_MYSQL_SUBPROTOCOL_LABEL: &str = "subprotocol";
pub(crate) const METRIC_MYSQL_BINQUERY: &str = "binquery";
pub(crate) const METRIC_MYSQL_TEXTQUERY: &str = "textquery";
pub(crate) const METRIC_POSTGRES_SUBPROTOCOL_LABEL: &str = "subprotocol";
pub(crate) const METRIC_POSTGRES_SIMPLE_QUERY: &str = "simple";
pub(crate) const METRIC_POSTGRES_EXTENDED_QUERY: &str = "extended";
pub(crate) const METRIC_METHOD_LABEL: &str = "method";
pub(crate) const METRIC_PATH_LABEL: &str = "path";
pub(crate) const METRIC_RESULT_LABEL: &str = "result";
pub(crate) const METRIC_SUCCESS_VALUE: &str = "success";
pub(crate) const METRIC_FAILURE_VALUE: &str = "failure";
lazy_static! {
pub static ref METRIC_ERROR_COUNTER: IntCounterVec = register_int_counter_vec!(
"greptime_servers_error",
"servers error",
&[METRIC_PROTOCOL_LABEL]
)
.unwrap();
pub static ref METRIC_HTTP_SQL_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_servers_http_sql_elapsed",
"servers http sql elapsed",
&[METRIC_DB_LABEL],
vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
)
.unwrap();
pub static ref METRIC_HTTP_PROMQL_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_servers_http_promql_elapsed",
"servers http promql elapsed",
&[METRIC_DB_LABEL],
vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
)
.unwrap();
pub static ref METRIC_HTTP_LOGS_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_servers_http_logs_elapsed",
"servers http logs elapsed",
&[METRIC_DB_LABEL],
vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
)
.unwrap();
pub static ref METRIC_AUTH_FAILURE: IntCounterVec = register_int_counter_vec!(
"greptime_servers_auth_failure_count",
"servers auth failure count",
&[METRIC_CODE_LABEL]
)
.unwrap();
pub static ref METRIC_HTTP_INFLUXDB_WRITE_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_servers_http_influxdb_write_elapsed",
"servers http influxdb write elapsed",
&[METRIC_DB_LABEL],
vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
)
.unwrap();
pub static ref METRIC_HTTP_PROM_STORE_WRITE_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_servers_http_prometheus_write_elapsed",
"servers http prometheus write elapsed",
&[METRIC_DB_LABEL],
vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
)
.unwrap();
pub static ref METRIC_HTTP_PROM_STORE_CODEC_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_servers_http_prometheus_codec_elapsed",
"servers http prometheus request codec duration",
&["type"],
)
.unwrap();
pub static ref METRIC_HTTP_PROM_STORE_DECODE_ELAPSED: Histogram = METRIC_HTTP_PROM_STORE_CODEC_ELAPSED
.with_label_values(&["decode"]);
pub static ref METRIC_HTTP_PROM_STORE_CONVERT_ELAPSED: Histogram = METRIC_HTTP_PROM_STORE_CODEC_ELAPSED
.with_label_values(&["convert"]);
pub static ref PROM_STORE_REMOTE_WRITE_SAMPLES: IntCounter = register_int_counter!(
"greptime_servers_prometheus_remote_write_samples",
"frontend prometheus remote write samples"
)
.unwrap();
pub static ref METRIC_HTTP_PROM_STORE_READ_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_servers_http_prometheus_read_elapsed",
"servers http prometheus read elapsed",
&[METRIC_DB_LABEL]
)
.unwrap();
pub static ref METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_servers_http_prometheus_promql_elapsed",
"servers http prometheus promql elapsed",
&[METRIC_DB_LABEL, METRIC_METHOD_LABEL]
)
.unwrap();
pub static ref METRIC_HTTP_OPENTELEMETRY_METRICS_ELAPSED: HistogramVec =
register_histogram_vec!(
"greptime_servers_http_otlp_metrics_elapsed",
"servers_http_otlp_metrics_elapsed",
&[METRIC_DB_LABEL]
)
.unwrap();
pub static ref METRIC_HTTP_OPENTELEMETRY_TRACES_ELAPSED: HistogramVec =
register_histogram_vec!(
"greptime_servers_http_otlp_traces_elapsed",
"servers http otlp traces elapsed",
&[METRIC_DB_LABEL]
)
.unwrap();
pub static ref METRIC_HTTP_OPENTELEMETRY_LOGS_ELAPSED: HistogramVec =
register_histogram_vec!(
"greptime_servers_http_otlp_logs_elapsed",
"servers http otlp logs elapsed",
&[METRIC_DB_LABEL]
)
.unwrap();
pub static ref METRIC_HTTP_LOGS_INGESTION_COUNTER: IntCounterVec = register_int_counter_vec!(
"greptime_servers_http_logs_ingestion_counter",
"servers http logs ingestion counter",
&[METRIC_DB_LABEL]
)
.unwrap();
pub static ref METRIC_HTTP_LOGS_INGESTION_ELAPSED: HistogramVec =
register_histogram_vec!(
"greptime_servers_http_logs_ingestion_elapsed",
"servers http logs ingestion elapsed",
&[METRIC_DB_LABEL, METRIC_RESULT_LABEL]
)
.unwrap();
pub static ref METRIC_LOKI_LOGS_INGESTION_COUNTER: IntCounterVec = register_int_counter_vec!(
"greptime_servers_loki_logs_ingestion_counter",
"servers loki logs ingestion counter",
&[METRIC_DB_LABEL]
)
.unwrap();
pub static ref METRIC_LOKI_LOGS_INGESTION_ELAPSED: HistogramVec =
register_histogram_vec!(
"greptime_servers_loki_logs_ingestion_elapsed",
"servers loki logs ingestion elapsed",
&[METRIC_DB_LABEL, METRIC_RESULT_LABEL]
)
.unwrap();
pub static ref METRIC_HTTP_LOGS_TRANSFORM_ELAPSED: HistogramVec =
register_histogram_vec!(
"greptime_servers_http_logs_transform_elapsed",
"servers http logs transform elapsed",
&[METRIC_DB_LABEL, METRIC_RESULT_LABEL]
)
.unwrap();
pub static ref METRIC_MYSQL_CONNECTIONS: IntGauge = register_int_gauge!(
"greptime_servers_mysql_connection_count",
"servers mysql connection count"
)
.unwrap();
pub static ref METRIC_MYSQL_QUERY_TIMER: HistogramVec = register_histogram_vec!(
"greptime_servers_mysql_query_elapsed",
"servers mysql query elapsed",
&[METRIC_MYSQL_SUBPROTOCOL_LABEL, METRIC_DB_LABEL]
)
.unwrap();
pub static ref METRIC_MYSQL_PREPARED_COUNT: IntCounterVec = register_int_counter_vec!(
"greptime_servers_mysql_prepared_count",
"servers mysql prepared count",
&[METRIC_DB_LABEL]
)
.unwrap();
pub static ref METRIC_POSTGRES_CONNECTIONS: IntGauge = register_int_gauge!(
"greptime_servers_postgres_connection_count",
"servers postgres connection count"
)
.unwrap();
pub static ref METRIC_POSTGRES_QUERY_TIMER: HistogramVec = register_histogram_vec!(
"greptime_servers_postgres_query_elapsed",
"servers postgres query elapsed",
&[METRIC_POSTGRES_SUBPROTOCOL_LABEL, METRIC_DB_LABEL]
)
.unwrap();
pub static ref METRIC_POSTGRES_PREPARED_COUNT: IntCounter = register_int_counter!(
"greptime_servers_postgres_prepared_count",
"servers postgres prepared count"
)
.unwrap();
pub static ref METRIC_SERVER_GRPC_DB_REQUEST_TIMER: HistogramVec = register_histogram_vec!(
"greptime_servers_grpc_db_request_elapsed",
"servers grpc db request elapsed",
&[METRIC_DB_LABEL, METRIC_TYPE_LABEL, METRIC_CODE_LABEL]
)
.unwrap();
pub static ref METRIC_SERVER_GRPC_PROM_REQUEST_TIMER: HistogramVec = register_histogram_vec!(
"greptime_servers_grpc_prom_request_elapsed",
"servers grpc prom request elapsed",
&[METRIC_DB_LABEL]
)
.unwrap();
pub static ref METRIC_HTTP_REQUESTS_TOTAL: IntCounterVec = register_int_counter_vec!(
"greptime_servers_http_requests_total",
"servers http requests total",
&[METRIC_METHOD_LABEL, METRIC_PATH_LABEL, METRIC_CODE_LABEL]
)
.unwrap();
pub static ref METRIC_HTTP_REQUESTS_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_servers_http_requests_elapsed",
"servers http requests elapsed",
&[METRIC_METHOD_LABEL, METRIC_PATH_LABEL, METRIC_CODE_LABEL],
vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
)
.unwrap();
pub static ref METRIC_GRPC_REQUESTS_TOTAL: IntCounterVec = register_int_counter_vec!(
"greptime_servers_grpc_requests_total",
"servers grpc requests total",
&[METRIC_PATH_LABEL, METRIC_CODE_LABEL]
)
.unwrap();
pub static ref METRIC_GRPC_REQUESTS_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_servers_grpc_requests_elapsed",
"servers grpc requests elapsed",
&[METRIC_PATH_LABEL, METRIC_CODE_LABEL],
vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
)
.unwrap();
}
#[derive(Debug, Clone, Default)]
pub(crate) struct MetricsMiddlewareLayer;
impl<S> Layer<S> for MetricsMiddlewareLayer {
type Service = MetricsMiddleware<S>;
fn layer(&self, service: S) -> Self::Service {
MetricsMiddleware { inner: service }
}
}
#[derive(Debug, Clone)]
pub(crate) struct MetricsMiddleware<S> {
inner: S,
}
impl<S> Service<hyper::Request<Body>> for MetricsMiddleware<S>
where
S: Service<hyper::Request<Body>, Response = hyper::Response<BoxBody>> + Clone + Send + 'static,
S::Future: Send + 'static,
{
type Response = S::Response;
type Error = S::Error;
type Future = futures::future::BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: hyper::Request<Body>) -> Self::Future {
let clone = self.inner.clone();
let mut inner = std::mem::replace(&mut self.inner, clone);
Box::pin(async move {
let start = Instant::now();
let path = req.uri().path().to_string();
let response = inner.call(req).await?;
let latency = start.elapsed().as_secs_f64();
let status = response.status().as_u16().to_string();
let labels = [path.as_str(), status.as_str()];
METRIC_GRPC_REQUESTS_TOTAL.with_label_values(&labels).inc();
METRIC_GRPC_REQUESTS_ELAPSED
.with_label_values(&labels)
.observe(latency);
Ok(response)
})
}
}
pub(crate) async fn http_metrics_layer<B>(req: Request<B>, next: Next<B>) -> impl IntoResponse {
let start = Instant::now();
let path = if let Some(matched_path) = req.extensions().get::<MatchedPath>() {
matched_path.as_str().to_owned()
} else {
req.uri().path().to_owned()
};
let method = req.method().clone();
let response = next.run(req).await;
let latency = start.elapsed().as_secs_f64();
let status = response.status().as_u16().to_string();
let method_str = method.to_string();
let labels = [method_str.as_str(), path.as_str(), status.as_str()];
METRIC_HTTP_REQUESTS_TOTAL.with_label_values(&labels).inc();
METRIC_HTTP_REQUESTS_ELAPSED
.with_label_values(&labels)
.observe(latency);
response
}