1#[cfg(not(windows))]
16pub(crate) mod jemalloc;
17
18use std::task::{Context, Poll};
19use std::time::Instant;
20
21use axum::extract::{MatchedPath, Request};
22use axum::middleware::Next;
23use axum::response::IntoResponse;
24use lazy_static::lazy_static;
25use prometheus::{
26 Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, register_histogram,
27 register_histogram_vec, register_int_counter, register_int_counter_vec, register_int_gauge,
28};
29use session::context::QueryContext;
30use tonic::body::Body;
31use tower::{Layer, Service};
32
33pub(crate) const METRIC_DB_LABEL: &str = "db";
34pub(crate) const METRIC_CODE_LABEL: &str = "code";
35pub(crate) const METRIC_TYPE_LABEL: &str = "type";
36pub(crate) const METRIC_PROTOCOL_LABEL: &str = "protocol";
37pub(crate) const METRIC_ERROR_COUNTER_LABEL_MYSQL: &str = "mysql";
38pub(crate) const METRIC_MYSQL_SUBPROTOCOL_LABEL: &str = "subprotocol";
39pub(crate) const METRIC_MYSQL_BINQUERY: &str = "binquery";
40pub(crate) const METRIC_MYSQL_TEXTQUERY: &str = "textquery";
41pub(crate) const METRIC_POSTGRES_SUBPROTOCOL_LABEL: &str = "subprotocol";
42pub(crate) const METRIC_POSTGRES_SIMPLE_QUERY: &str = "simple";
43pub(crate) const METRIC_POSTGRES_EXTENDED_QUERY: &str = "extended";
44pub(crate) const METRIC_METHOD_LABEL: &str = "method";
45pub(crate) const METRIC_PATH_LABEL: &str = "path";
46pub(crate) const METRIC_RESULT_LABEL: &str = "result";
47
48pub(crate) const METRIC_SUCCESS_VALUE: &str = "success";
49pub(crate) const METRIC_FAILURE_VALUE: &str = "failure";
50
51lazy_static! {
52
53 pub static ref HTTP_REQUEST_COUNTER: IntCounterVec = register_int_counter_vec!(
54 "greptime_servers_http_request_counter",
55 "servers http request counter",
56 &[METRIC_METHOD_LABEL, METRIC_PATH_LABEL, METRIC_CODE_LABEL, METRIC_DB_LABEL]
57 ).unwrap();
58
59 pub static ref METRIC_ERROR_COUNTER: IntCounterVec = register_int_counter_vec!(
60 "greptime_servers_error",
61 "servers error",
62 &[METRIC_PROTOCOL_LABEL]
63 )
64 .unwrap();
65 pub static ref METRIC_HTTP_SQL_ELAPSED: HistogramVec = register_histogram_vec!(
67 "greptime_servers_http_sql_elapsed",
68 "servers http sql elapsed",
69 &[METRIC_DB_LABEL],
70 vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
71 )
72 .unwrap();
73 pub static ref METRIC_HTTP_PROMQL_ELAPSED: HistogramVec = register_histogram_vec!(
75 "greptime_servers_http_promql_elapsed",
76 "servers http promql elapsed",
77 &[METRIC_DB_LABEL],
78 vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
79 )
80 .unwrap();
81 pub static ref METRIC_HTTP_LOGS_ELAPSED: HistogramVec = register_histogram_vec!(
83 "greptime_servers_http_logs_elapsed",
84 "servers http logs elapsed",
85 &[METRIC_DB_LABEL],
86 vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
87 )
88 .unwrap();
89 pub static ref METRIC_AUTH_FAILURE: IntCounterVec = register_int_counter_vec!(
90 "greptime_servers_auth_failure_count",
91 "servers auth failure count",
92 &[METRIC_CODE_LABEL]
93 )
94 .unwrap();
95 pub static ref METRIC_HTTP_INFLUXDB_WRITE_ELAPSED: HistogramVec = register_histogram_vec!(
97 "greptime_servers_http_influxdb_write_elapsed",
98 "servers http influxdb write elapsed",
99 &[METRIC_DB_LABEL],
100 vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
101 )
102 .unwrap();
103 pub static ref METRIC_HTTP_PROM_STORE_WRITE_ELAPSED: HistogramVec = register_histogram_vec!(
105 "greptime_servers_http_prometheus_write_elapsed",
106 "servers http prometheus write elapsed",
107 &[METRIC_DB_LABEL],
108 vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
109 )
110 .unwrap();
111 pub static ref METRIC_HTTP_PROM_STORE_CODEC_ELAPSED: HistogramVec = register_histogram_vec!(
113 "greptime_servers_http_prometheus_codec_elapsed",
114 "servers http prometheus request codec duration",
115 &["type"],
116 )
117 .unwrap();
118 pub static ref METRIC_HTTP_PROM_STORE_DECODE_ELAPSED: Histogram = METRIC_HTTP_PROM_STORE_CODEC_ELAPSED
120 .with_label_values(&["decode"]);
121 pub static ref METRIC_HTTP_PROM_STORE_CONVERT_ELAPSED: Histogram = METRIC_HTTP_PROM_STORE_CODEC_ELAPSED
123 .with_label_values(&["convert"]);
124 pub static ref PROM_STORE_REMOTE_WRITE_SAMPLES: IntCounterVec = register_int_counter_vec!(
126 "greptime_servers_prometheus_remote_write_samples",
127 "frontend prometheus remote write samples",
128 &[METRIC_DB_LABEL]
129 )
130 .unwrap();
131 pub static ref PENDING_BATCHES: IntGauge = register_int_gauge!(
132 "greptime_prom_store_pending_batches",
133 "Number of pending batches waiting to be flushed"
134 )
135 .unwrap();
136 pub static ref PENDING_ROWS: IntGauge = register_int_gauge!(
137 "greptime_prom_store_pending_rows",
138 "Number of pending rows waiting to be flushed"
139 )
140 .unwrap();
141 pub static ref PENDING_WORKERS: IntGauge = register_int_gauge!(
142 "greptime_prom_store_pending_workers",
143 "Number of active pending rows batch workers"
144 )
145 .unwrap();
146 pub static ref FLUSH_TOTAL: IntCounter = register_int_counter!(
147 "greptime_prom_store_flush_total",
148 "Total number of batch flushes"
149 )
150 .unwrap();
151 pub static ref FLUSH_ROWS: Histogram = register_histogram!(
152 "greptime_prom_store_flush_rows",
153 "Number of rows per flush",
154 vec![100.0, 1000.0, 10000.0, 50000.0, 100000.0, 500000.0]
155 )
156 .unwrap();
157 pub static ref FLUSH_ELAPSED: Histogram = register_histogram!(
158 "greptime_prom_store_flush_elapsed",
159 "Elapsed time of pending rows batch flush in seconds",
160 vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
161 )
162 .unwrap();
163 pub static ref FLUSH_DROPPED_ROWS: IntCounter = register_int_counter!(
164 "greptime_pending_rows_flush_dropped_rows",
165 "Total rows dropped due to pending rows flush failures"
166 )
167 .unwrap();
168 pub static ref FLUSH_FAILURES: IntCounter = register_int_counter!(
169 "greptime_pending_rows_flush_failures",
170 "Total pending rows flush failures"
171 )
172 .unwrap();
173 pub static ref PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED: HistogramVec = register_histogram_vec!(
174 "greptime_prom_store_pending_rows_batch_ingest_stage_elapsed",
175 "Elapsed time of pending rows batch ingestion stages in seconds",
176 &["stage"],
177 vec![0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0]
178 )
179 .unwrap();
180 pub static ref PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED: HistogramVec = register_histogram_vec!(
181 "greptime_prom_store_pending_rows_batch_flush_stage_elapsed",
182 "Elapsed time of pending rows batch flush stages in seconds",
183 &["stage"],
184 vec![0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0]
185 )
186 .unwrap();
187 pub static ref METRIC_HTTP_PROM_STORE_READ_ELAPSED: HistogramVec = register_histogram_vec!(
189 "greptime_servers_http_prometheus_read_elapsed",
190 "servers http prometheus read elapsed",
191 &[METRIC_DB_LABEL]
192 )
193 .unwrap();
194 pub static ref METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED: HistogramVec = register_histogram_vec!(
196 "greptime_servers_http_prometheus_promql_elapsed",
197 "servers http prometheus promql elapsed",
198 &[METRIC_DB_LABEL, METRIC_METHOD_LABEL]
199 )
200 .unwrap();
201 pub static ref METRIC_HTTP_OPENTELEMETRY_METRICS_ELAPSED: HistogramVec =
202 register_histogram_vec!(
203 "greptime_servers_http_otlp_metrics_elapsed",
204 "servers_http_otlp_metrics_elapsed",
205 &[METRIC_DB_LABEL]
206 )
207 .unwrap();
208 pub static ref METRIC_HTTP_OPENTELEMETRY_TRACES_ELAPSED: HistogramVec =
209 register_histogram_vec!(
210 "greptime_servers_http_otlp_traces_elapsed",
211 "servers http otlp traces elapsed",
212 &[METRIC_DB_LABEL]
213 )
214 .unwrap();
215 pub static ref METRIC_HTTP_OPENTELEMETRY_LOGS_ELAPSED: HistogramVec =
216 register_histogram_vec!(
217 "greptime_servers_http_otlp_logs_elapsed",
218 "servers http otlp logs elapsed",
219 &[METRIC_DB_LABEL]
220 )
221 .unwrap();
222 pub static ref METRIC_HTTP_LOGS_INGESTION_COUNTER: IntCounterVec = register_int_counter_vec!(
223 "greptime_servers_http_logs_ingestion_counter",
224 "servers http logs ingestion counter",
225 &[METRIC_DB_LABEL]
226 )
227 .unwrap();
228 pub static ref METRIC_HTTP_LOGS_INGESTION_ELAPSED: HistogramVec =
229 register_histogram_vec!(
230 "greptime_servers_http_logs_ingestion_elapsed",
231 "servers http logs ingestion elapsed",
232 &[METRIC_DB_LABEL, METRIC_RESULT_LABEL]
233 )
234 .unwrap();
235
236 pub static ref METRIC_LOKI_LOGS_INGESTION_COUNTER: IntCounterVec = register_int_counter_vec!(
238 "greptime_servers_loki_logs_ingestion_counter",
239 "servers loki logs ingestion counter",
240 &[METRIC_DB_LABEL]
241 )
242 .unwrap();
243 pub static ref METRIC_LOKI_LOGS_INGESTION_ELAPSED: HistogramVec =
244 register_histogram_vec!(
245 "greptime_servers_loki_logs_ingestion_elapsed",
246 "servers loki logs ingestion elapsed",
247 &[METRIC_DB_LABEL, METRIC_RESULT_LABEL]
248 )
249 .unwrap();
250 pub static ref METRIC_ELASTICSEARCH_LOGS_INGESTION_ELAPSED: HistogramVec =
251 register_histogram_vec!(
252 "greptime_servers_elasticsearch_logs_ingestion_elapsed",
253 "servers elasticsearch logs ingestion elapsed",
254 &[METRIC_DB_LABEL]
255 )
256 .unwrap();
257
258 pub static ref METRIC_ELASTICSEARCH_LOGS_DOCS_COUNT: IntCounterVec = register_int_counter_vec!(
260 "greptime_servers_elasticsearch_logs_docs_count",
261 "servers elasticsearch ingest logs docs count",
262 &[METRIC_DB_LABEL]
263 )
264 .unwrap();
265
266 pub static ref METRIC_HTTP_LOGS_TRANSFORM_ELAPSED: HistogramVec =
267 register_histogram_vec!(
268 "greptime_servers_http_logs_transform_elapsed",
269 "servers http logs transform elapsed",
270 &[METRIC_DB_LABEL, METRIC_RESULT_LABEL]
271 )
272 .unwrap();
273 pub static ref METRIC_MYSQL_CONNECTIONS: IntGauge = register_int_gauge!(
274 "greptime_servers_mysql_connection_count",
275 "servers mysql connection count"
276 )
277 .unwrap();
278 pub static ref METRIC_MYSQL_QUERY_TIMER: HistogramVec = register_histogram_vec!(
279 "greptime_servers_mysql_query_elapsed",
280 "servers mysql query elapsed",
281 &[METRIC_MYSQL_SUBPROTOCOL_LABEL, METRIC_DB_LABEL],
282 vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
283 )
284 .unwrap();
285 pub static ref METRIC_MYSQL_PREPARED_COUNT: IntCounterVec = register_int_counter_vec!(
286 "greptime_servers_mysql_prepared_count",
287 "servers mysql prepared count",
288 &[METRIC_DB_LABEL]
289 )
290 .unwrap();
291 pub static ref METRIC_POSTGRES_CONNECTIONS: IntGauge = register_int_gauge!(
292 "greptime_servers_postgres_connection_count",
293 "servers postgres connection count"
294 )
295 .unwrap();
296 pub static ref METRIC_POSTGRES_QUERY_TIMER: HistogramVec = register_histogram_vec!(
297 "greptime_servers_postgres_query_elapsed",
298 "servers postgres query elapsed",
299 &[METRIC_POSTGRES_SUBPROTOCOL_LABEL, METRIC_DB_LABEL],
300 vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
301 )
302 .unwrap();
303 pub static ref METRIC_POSTGRES_PREPARED_COUNT: IntCounter = register_int_counter!(
304 "greptime_servers_postgres_prepared_count",
305 "servers postgres prepared count"
306 )
307 .unwrap();
308 pub static ref METRIC_SERVER_GRPC_DB_REQUEST_TIMER: HistogramVec = register_histogram_vec!(
309 "greptime_servers_grpc_db_request_elapsed",
310 "servers grpc db request elapsed",
311 &[METRIC_DB_LABEL, METRIC_TYPE_LABEL, METRIC_CODE_LABEL]
312 )
313 .unwrap();
314 pub static ref METRIC_SERVER_GRPC_PROM_REQUEST_TIMER: HistogramVec = register_histogram_vec!(
315 "greptime_servers_grpc_prom_request_elapsed",
316 "servers grpc prom request elapsed",
317 &[METRIC_DB_LABEL],
318 vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
319 )
320 .unwrap();
321 pub static ref METRIC_HTTP_REQUESTS_TOTAL: IntCounterVec = register_int_counter_vec!(
322 "greptime_servers_http_requests_total",
323 "servers http requests total",
324 &[METRIC_METHOD_LABEL, METRIC_PATH_LABEL, METRIC_CODE_LABEL, METRIC_DB_LABEL]
325 )
326 .unwrap();
327 pub static ref METRIC_HTTP_REQUESTS_ELAPSED: HistogramVec = register_histogram_vec!(
328 "greptime_servers_http_requests_elapsed",
329 "servers http requests elapsed",
330 &[METRIC_METHOD_LABEL, METRIC_PATH_LABEL, METRIC_CODE_LABEL, METRIC_DB_LABEL],
331 vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
332 )
333 .unwrap();
334 pub static ref METRIC_GRPC_REQUESTS_TOTAL: IntCounterVec = register_int_counter_vec!(
335 "greptime_servers_grpc_requests_total",
336 "servers grpc requests total",
337 &[METRIC_PATH_LABEL, METRIC_CODE_LABEL]
338 )
339 .unwrap();
340 pub static ref METRIC_GRPC_REQUESTS_ELAPSED: HistogramVec = register_histogram_vec!(
341 "greptime_servers_grpc_requests_elapsed",
342 "servers grpc requests elapsed",
343 &[METRIC_PATH_LABEL, METRIC_CODE_LABEL],
344 vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
345 )
346 .unwrap();
347 pub static ref METRIC_JAEGER_QUERY_ELAPSED: HistogramVec = register_histogram_vec!(
348 "greptime_servers_jaeger_query_elapsed",
349 "servers jaeger query elapsed",
350 &[METRIC_DB_LABEL, METRIC_PATH_LABEL]
351 ).unwrap();
352
353 pub static ref GRPC_BULK_INSERT_ELAPSED: Histogram = register_histogram!(
354 "greptime_servers_bulk_insert_elapsed",
355 "servers handle bulk insert elapsed",
356 ).unwrap();
357
358 pub static ref REQUEST_MEMORY_IN_USE: IntGauge = register_int_gauge!(
361 "greptime_servers_request_memory_in_use_bytes",
362 "bytes currently reserved for all concurrent request bodies and messages"
363 ).unwrap();
364
365 pub static ref REQUEST_MEMORY_LIMIT: IntGauge = register_int_gauge!(
367 "greptime_servers_request_memory_limit_bytes",
368 "maximum bytes allowed for all concurrent request bodies and messages"
369 ).unwrap();
370
371 pub static ref REQUEST_MEMORY_REJECTED: IntCounterVec = register_int_counter_vec!(
373 "greptime_servers_request_memory_rejected_total",
374 "number of requests rejected due to memory limit",
375 &["reason"]
376 ).unwrap();
377}
378
379#[derive(Debug, Clone, Default)]
383pub(crate) struct MetricsMiddlewareLayer;
384
385impl<S> Layer<S> for MetricsMiddlewareLayer {
386 type Service = MetricsMiddleware<S>;
387
388 fn layer(&self, service: S) -> Self::Service {
389 MetricsMiddleware { inner: service }
390 }
391}
392
393#[derive(Debug, Clone)]
394pub(crate) struct MetricsMiddleware<S> {
395 inner: S,
396}
397
398impl<S> Service<http::Request<Body>> for MetricsMiddleware<S>
399where
400 S: Service<http::Request<Body>, Response = http::Response<Body>> + Clone + Send + 'static,
401 S::Future: Send + 'static,
402{
403 type Response = S::Response;
404 type Error = S::Error;
405 type Future = futures::future::BoxFuture<'static, Result<Self::Response, Self::Error>>;
406
407 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
408 self.inner.poll_ready(cx)
409 }
410
411 fn call(&mut self, req: http::Request<Body>) -> Self::Future {
412 let clone = self.inner.clone();
416 let mut inner = std::mem::replace(&mut self.inner, clone);
417
418 Box::pin(async move {
419 let start = Instant::now();
420 let path = req.uri().path().to_string();
421
422 let response = inner.call(req).await?;
424
425 let latency = start.elapsed().as_secs_f64();
426 let status = response.status().as_u16().to_string();
427
428 let labels = [path.as_str(), status.as_str()];
429 METRIC_GRPC_REQUESTS_TOTAL.with_label_values(&labels).inc();
430 METRIC_GRPC_REQUESTS_ELAPSED
431 .with_label_values(&labels)
432 .observe(latency);
433
434 Ok(response)
435 })
436 }
437}
438
439pub(crate) async fn http_metrics_layer(req: Request, next: Next) -> impl IntoResponse {
442 let start = Instant::now();
443 let path = if let Some(matched_path) = req.extensions().get::<MatchedPath>() {
444 matched_path.as_str().to_string()
445 } else {
446 req.uri().path().to_string()
447 };
448 let method = req.method().clone();
449
450 let db = req
451 .extensions()
452 .get::<QueryContext>()
453 .map(|ctx| ctx.get_db_string())
454 .unwrap_or_else(|| "unknown".to_string());
455
456 let response = next.run(req).await;
457
458 let latency = start.elapsed().as_secs_f64();
459 let status = response.status();
460 let status = status.as_str();
461 let method_str = method.as_str();
462
463 let labels = [method_str, &path, status, db.as_str()];
464 METRIC_HTTP_REQUESTS_TOTAL.with_label_values(&labels).inc();
465 METRIC_HTTP_REQUESTS_ELAPSED
466 .with_label_values(&labels)
467 .observe(latency);
468
469 response
470}