use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use common_base::Plugins;
use common_telemetry::metric::{convert_metric_to_write_request, MetricFilter};
use common_telemetry::{error, info};
use common_time::Timestamp;
use prost::Message;
use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
use serde::{Deserialize, Serialize};
use session::context::QueryContextBuilder;
use snafu::{ensure, ResultExt};
use tokio::time::{self, Interval};
use crate::error::{InvalidExportMetricsConfigSnafu, Result, SendPromRemoteRequestSnafu};
use crate::prom_store::{snappy_compress, to_grpc_row_insert_requests};
use crate::query_handler::PromStoreProtocolHandlerRef;
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct ExportMetricsOption {
pub enable: bool,
#[serde(with = "humantime_serde")]
pub write_interval: Duration,
pub self_import: Option<SelfImportOption>,
pub remote_write: Option<RemoteWriteOption>,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Default)]
#[serde(default)]
pub struct RemoteWriteOption {
pub url: String,
pub headers: HashMap<String, String>,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct SelfImportOption {
pub db: String,
}
impl Default for SelfImportOption {
fn default() -> Self {
Self {
db: "greptime_metrics".to_string(),
}
}
}
impl Default for ExportMetricsOption {
fn default() -> Self {
Self {
enable: false,
write_interval: Duration::from_secs(30),
self_import: None,
remote_write: None,
}
}
}
#[derive(Default, Clone)]
pub struct ExportMetricsTask {
config: ExportMetricsOption,
filter: Option<MetricFilter>,
headers: HeaderMap<HeaderValue>,
pub send_by_handler: bool,
}
impl ExportMetricsTask {
pub fn try_new(
config: &ExportMetricsOption,
plugins: Option<&Plugins>,
) -> Result<Option<Self>> {
if !config.enable {
return Ok(None);
}
let filter = plugins.map(|p| p.get::<MetricFilter>()).unwrap_or(None);
ensure!(
config.write_interval.as_secs() != 0,
InvalidExportMetricsConfigSnafu {
msg: "Expected export metrics write_interval greater than zero"
}
);
ensure!(
(config.remote_write.is_none() && config.self_import.is_some())
|| (config.remote_write.is_some() && config.self_import.is_none()),
InvalidExportMetricsConfigSnafu {
msg: "Only one of `self_import` or `remote_write` can be used as the export method"
}
);
if let Some(self_import) = &config.self_import {
ensure!(
!self_import.db.is_empty(),
InvalidExportMetricsConfigSnafu {
msg: "Expected `self_import` metrics `db` not empty"
}
);
}
let mut headers = HeaderMap::new();
if let Some(remote_write) = &config.remote_write {
ensure!(
!remote_write.url.is_empty(),
InvalidExportMetricsConfigSnafu {
msg: "Expected `remote_write` metrics `url` not empty"
}
);
remote_write.headers.iter().try_for_each(|(k, v)| {
let header = match TryInto::<HeaderName>::try_into(k) {
Ok(header) => header,
Err(_) => {
return InvalidExportMetricsConfigSnafu {
msg: format!("Export metrics: invalid HTTP header name: {}", k),
}
.fail()
}
};
match TryInto::<HeaderValue>::try_into(v) {
Ok(value) => headers.insert(header, value),
Err(_) => {
return InvalidExportMetricsConfigSnafu {
msg: format!("Export metrics: invalid HTTP header value: {}", v),
}
.fail()
}
};
Ok(())
})?;
}
Ok(Some(Self {
config: config.clone(),
filter,
headers,
send_by_handler: config.self_import.is_some(),
}))
}
pub fn start(&self, handler: Option<PromStoreProtocolHandlerRef>) -> Result<()> {
if !self.config.enable {
return Ok(());
}
let interval = time::interval(self.config.write_interval);
let filter = self.filter.clone();
let _handle = if let Some(self_import) = &self.config.self_import {
ensure!(
handler.is_some(),
InvalidExportMetricsConfigSnafu {
msg: "Only `frontend` or `standalone` can use `self_import` as export method."
}
);
common_runtime::spawn_global(write_system_metric_by_handler(
self_import.db.clone(),
handler.unwrap(),
filter,
interval,
))
} else if let Some(remote_write) = &self.config.remote_write {
common_runtime::spawn_global(write_system_metric_by_network(
self.headers.clone(),
remote_write.url.clone(),
filter,
interval,
))
} else {
unreachable!()
};
Ok(())
}
}
pub async fn write_system_metric_by_network(
headers: HeaderMap,
endpoint: String,
filter: Option<MetricFilter>,
mut interval: Interval,
) {
info!(
"Start export metrics task to endpoint: {}, interval: {}s",
endpoint,
interval.period().as_secs()
);
interval.tick().await;
let client = reqwest::Client::new();
loop {
interval.tick().await;
let metric_families = prometheus::gather();
let request = convert_metric_to_write_request(
metric_families,
filter.as_ref(),
Timestamp::current_millis().value(),
);
let resp = match snappy_compress(&request.encode_to_vec()) {
Ok(body) => client
.post(endpoint.as_str())
.header("X-Prometheus-Remote-Write-Version", "0.1.0")
.header("Content-Type", "application/x-protobuf")
.headers(headers.clone())
.body(body)
.send()
.await
.context(SendPromRemoteRequestSnafu),
Err(e) => Err(e),
};
match resp {
Ok(resp) => {
if !resp.status().is_success() {
error!("report export metrics error, msg: {:#?}", resp);
}
}
Err(e) => error!(e; "report export metrics failed"),
};
}
}
pub async fn write_system_metric_by_handler(
db: String,
handler: PromStoreProtocolHandlerRef,
filter: Option<MetricFilter>,
mut interval: Interval,
) {
info!(
"Start export metrics task by handler, interval: {}s",
interval.period().as_secs()
);
interval.tick().await;
let ctx = Arc::new(QueryContextBuilder::default().current_schema(db).build());
loop {
interval.tick().await;
let metric_families = prometheus::gather();
let request = convert_metric_to_write_request(
metric_families,
filter.as_ref(),
Timestamp::current_millis().value(),
);
let (requests, samples) = match to_grpc_row_insert_requests(&request) {
Ok((requests, samples)) => (requests, samples),
Err(e) => {
error!(e; "Failed to convert gathered metrics to RowInsertRequests");
continue;
}
};
if let Err(e) = handler.write(requests, ctx.clone(), false).await {
error!(e; "report export metrics by handler failed");
} else {
crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64);
}
}
}
#[cfg(test)]
mod test {
use std::time::Duration;
use crate::export_metrics::{
ExportMetricsOption, ExportMetricsTask, RemoteWriteOption, SelfImportOption,
};
#[tokio::test]
async fn test_config() {
assert!(ExportMetricsTask::try_new(
&ExportMetricsOption {
enable: true,
write_interval: Duration::from_secs(0),
..Default::default()
},
None
)
.is_err());
assert!(ExportMetricsTask::try_new(
&ExportMetricsOption {
enable: true,
..Default::default()
},
None
)
.is_err());
assert!(ExportMetricsTask::try_new(
&ExportMetricsOption {
enable: true,
self_import: Some(SelfImportOption::default()),
remote_write: Some(RemoteWriteOption::default()),
..Default::default()
},
None
)
.is_err());
assert!(ExportMetricsTask::try_new(
&ExportMetricsOption {
enable: true,
self_import: Some(SelfImportOption {
db: String::default()
}),
remote_write: None,
..Default::default()
},
None
)
.is_err());
assert!(ExportMetricsTask::try_new(
&ExportMetricsOption {
enable: true,
self_import: None,
remote_write: Some(RemoteWriteOption {
url: String::default(),
..Default::default()
}),
..Default::default()
},
None
)
.is_err());
let s = ExportMetricsTask::try_new(
&ExportMetricsOption {
enable: true,
self_import: Some(SelfImportOption::default()),
..Default::default()
},
None,
)
.unwrap()
.unwrap();
assert!(s.start(None).is_err());
}
}