1use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::Duration;
18
19use common_base::Plugins;
20use common_telemetry::metric::{convert_metric_to_write_request, MetricFilter};
21use common_telemetry::{error, info};
22use common_time::Timestamp;
23use prost::Message;
24use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
25use serde::{Deserialize, Serialize};
26use session::context::QueryContextBuilder;
27use snafu::{ensure, ResultExt};
28use tokio::time::{self, Interval};
29
30use crate::error::{InvalidExportMetricsConfigSnafu, Result, SendPromRemoteRequestSnafu};
31use crate::prom_store::{snappy_compress, to_grpc_row_insert_requests};
32use crate::query_handler::PromStoreProtocolHandlerRef;
33
34#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
39#[serde(default)]
40pub struct ExportMetricsOption {
41 pub enable: bool,
42 #[serde(with = "humantime_serde")]
43 pub write_interval: Duration,
44 pub self_import: Option<SelfImportOption>,
45 pub remote_write: Option<RemoteWriteOption>,
46}
47
48#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Default)]
49#[serde(default)]
50pub struct RemoteWriteOption {
51 pub url: String,
52 pub headers: HashMap<String, String>,
53}
54
55#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
56#[serde(default)]
57pub struct SelfImportOption {
58 pub db: String,
59}
60
61impl Default for SelfImportOption {
62 fn default() -> Self {
63 Self {
64 db: "greptime_metrics".to_string(),
65 }
66 }
67}
68
69impl Default for ExportMetricsOption {
70 fn default() -> Self {
71 Self {
72 enable: false,
73 write_interval: Duration::from_secs(30),
74 self_import: None,
75 remote_write: None,
76 }
77 }
78}
79
80#[derive(Default, Clone)]
81pub struct ExportMetricsTask {
82 config: ExportMetricsOption,
83 filter: Option<MetricFilter>,
84 headers: HeaderMap<HeaderValue>,
85 pub send_by_handler: bool,
86}
87
88impl ExportMetricsTask {
89 pub fn try_new(
90 config: &ExportMetricsOption,
91 plugins: Option<&Plugins>,
92 ) -> Result<Option<Self>> {
93 if !config.enable {
94 return Ok(None);
95 }
96 let filter = plugins.map(|p| p.get::<MetricFilter>()).unwrap_or(None);
97 ensure!(
98 config.write_interval.as_secs() != 0,
99 InvalidExportMetricsConfigSnafu {
100 msg: "Expected export metrics write_interval greater than zero"
101 }
102 );
103 ensure!(
104 (config.remote_write.is_none() && config.self_import.is_some())
105 || (config.remote_write.is_some() && config.self_import.is_none()),
106 InvalidExportMetricsConfigSnafu {
107 msg: "Only one of `self_import` or `remote_write` can be used as the export method"
108 }
109 );
110 if let Some(self_import) = &config.self_import {
111 ensure!(
112 !self_import.db.is_empty(),
113 InvalidExportMetricsConfigSnafu {
114 msg: "Expected `self_import` metrics `db` not empty"
115 }
116 );
117 }
118 let mut headers = HeaderMap::new();
119 if let Some(remote_write) = &config.remote_write {
120 ensure!(
121 !remote_write.url.is_empty(),
122 InvalidExportMetricsConfigSnafu {
123 msg: "Expected `remote_write` metrics `url` not empty"
124 }
125 );
126 remote_write.headers.iter().try_for_each(|(k, v)| {
128 let header = match TryInto::<HeaderName>::try_into(k) {
129 Ok(header) => header,
130 Err(_) => {
131 return InvalidExportMetricsConfigSnafu {
132 msg: format!("Export metrics: invalid HTTP header name: {}", k),
133 }
134 .fail()
135 }
136 };
137 match TryInto::<HeaderValue>::try_into(v) {
138 Ok(value) => headers.insert(header, value),
139 Err(_) => {
140 return InvalidExportMetricsConfigSnafu {
141 msg: format!("Export metrics: invalid HTTP header value: {}", v),
142 }
143 .fail()
144 }
145 };
146 Ok(())
147 })?;
148 }
149 Ok(Some(Self {
150 config: config.clone(),
151 filter,
152 headers,
153 send_by_handler: config.self_import.is_some(),
154 }))
155 }
156
157 pub fn start(&self, handler: Option<PromStoreProtocolHandlerRef>) -> Result<()> {
158 if !self.config.enable {
159 return Ok(());
160 }
161 let interval = time::interval(self.config.write_interval);
162 let filter = self.filter.clone();
163 let _handle = if let Some(self_import) = &self.config.self_import {
164 ensure!(
165 handler.is_some(),
166 InvalidExportMetricsConfigSnafu {
167 msg: "Only `frontend` or `standalone` can use `self_import` as export method."
168 }
169 );
170 common_runtime::spawn_global(write_system_metric_by_handler(
171 self_import.db.clone(),
172 handler.unwrap(),
173 filter,
174 interval,
175 ))
176 } else if let Some(remote_write) = &self.config.remote_write {
177 common_runtime::spawn_global(write_system_metric_by_network(
178 self.headers.clone(),
179 remote_write.url.clone(),
180 filter,
181 interval,
182 ))
183 } else {
184 unreachable!()
185 };
186 Ok(())
187 }
188}
189
190pub async fn write_system_metric_by_network(
192 headers: HeaderMap,
193 endpoint: String,
194 filter: Option<MetricFilter>,
195 mut interval: Interval,
196) {
197 info!(
198 "Start export metrics task to endpoint: {}, interval: {}s",
199 endpoint,
200 interval.period().as_secs()
201 );
202 interval.tick().await;
204 let client = reqwest::Client::new();
205 loop {
206 interval.tick().await;
207 let metric_families = prometheus::gather();
208 let request = convert_metric_to_write_request(
209 metric_families,
210 filter.as_ref(),
211 Timestamp::current_millis().value(),
212 );
213 let resp = match snappy_compress(&request.encode_to_vec()) {
214 Ok(body) => client
215 .post(endpoint.as_str())
216 .header("X-Prometheus-Remote-Write-Version", "0.1.0")
217 .header("Content-Type", "application/x-protobuf")
218 .headers(headers.clone())
219 .body(body)
220 .send()
221 .await
222 .context(SendPromRemoteRequestSnafu),
223 Err(e) => Err(e),
224 };
225 match resp {
226 Ok(resp) => {
227 if !resp.status().is_success() {
228 error!("report export metrics error, msg: {:#?}", resp);
229 }
230 }
231 Err(e) => error!(e; "report export metrics failed"),
232 };
233 }
234}
235
236pub async fn write_system_metric_by_handler(
240 db: String,
241 handler: PromStoreProtocolHandlerRef,
242 filter: Option<MetricFilter>,
243 mut interval: Interval,
244) {
245 info!(
246 "Start export metrics task by handler, interval: {}s",
247 interval.period().as_secs()
248 );
249 interval.tick().await;
251 let ctx = Arc::new(QueryContextBuilder::default().current_schema(db).build());
252 loop {
253 interval.tick().await;
254 let metric_families = prometheus::gather();
255 let request = convert_metric_to_write_request(
256 metric_families,
257 filter.as_ref(),
258 Timestamp::current_millis().value(),
259 );
260
261 let (requests, samples) = match to_grpc_row_insert_requests(&request) {
262 Ok((requests, samples)) => (requests, samples),
263 Err(e) => {
264 error!(e; "Failed to convert gathered metrics to RowInsertRequests");
265 continue;
266 }
267 };
268
269 if let Err(e) = handler.write(requests, ctx.clone(), false).await {
270 error!(e; "report export metrics by handler failed");
271 } else {
272 crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64);
273 }
274 }
275}
276
277#[cfg(test)]
278mod test {
279 use std::time::Duration;
280
281 use crate::export_metrics::{
282 ExportMetricsOption, ExportMetricsTask, RemoteWriteOption, SelfImportOption,
283 };
284
285 #[tokio::test]
286 async fn test_config() {
287 assert!(ExportMetricsTask::try_new(
289 &ExportMetricsOption {
290 enable: true,
291 write_interval: Duration::from_secs(0),
292 ..Default::default()
293 },
294 None
295 )
296 .is_err());
297 assert!(ExportMetricsTask::try_new(
299 &ExportMetricsOption {
300 enable: true,
301 ..Default::default()
302 },
303 None
304 )
305 .is_err());
306 assert!(ExportMetricsTask::try_new(
308 &ExportMetricsOption {
309 enable: true,
310 self_import: Some(SelfImportOption::default()),
311 remote_write: Some(RemoteWriteOption::default()),
312 ..Default::default()
313 },
314 None
315 )
316 .is_err());
317 assert!(ExportMetricsTask::try_new(
319 &ExportMetricsOption {
320 enable: true,
321 self_import: Some(SelfImportOption {
322 db: String::default()
323 }),
324 remote_write: None,
325 ..Default::default()
326 },
327 None
328 )
329 .is_err());
330 assert!(ExportMetricsTask::try_new(
332 &ExportMetricsOption {
333 enable: true,
334 self_import: None,
335 remote_write: Some(RemoteWriteOption {
336 url: String::default(),
337 ..Default::default()
338 }),
339 ..Default::default()
340 },
341 None
342 )
343 .is_err());
344 let s = ExportMetricsTask::try_new(
346 &ExportMetricsOption {
347 enable: true,
348 self_import: Some(SelfImportOption::default()),
349 ..Default::default()
350 },
351 None,
352 )
353 .unwrap()
354 .unwrap();
355 assert!(s.start(None).is_err());
356 }
357}