servers/
export_metrics.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::Duration;
18
19use common_base::Plugins;
20use common_telemetry::metric::{convert_metric_to_write_request, MetricFilter};
21use common_telemetry::{error, info};
22use common_time::Timestamp;
23use prost::Message;
24use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
25use serde::{Deserialize, Serialize};
26use session::context::QueryContextBuilder;
27use snafu::{ensure, ResultExt};
28use tokio::time::{self, Interval};
29
30use crate::error::{InvalidExportMetricsConfigSnafu, Result, SendPromRemoteRequestSnafu};
31use crate::prom_store::{snappy_compress, to_grpc_row_insert_requests};
32use crate::query_handler::PromStoreProtocolHandlerRef;
33
34/// Use to export the metrics generated by greptimedb.
35///
36/// Encoded to Prometheus [RemoteWrite format](https://prometheus.io/docs/concepts/remote_write_spec/),
37/// and send to Prometheus remote-write compatible receiver (e.g. send to `greptimedb` itself)
38#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
39#[serde(default)]
40pub struct ExportMetricsOption {
41    pub enable: bool,
42    #[serde(with = "humantime_serde")]
43    pub write_interval: Duration,
44    pub self_import: Option<SelfImportOption>,
45    pub remote_write: Option<RemoteWriteOption>,
46}
47
48#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Default)]
49#[serde(default)]
50pub struct RemoteWriteOption {
51    pub url: String,
52    pub headers: HashMap<String, String>,
53}
54
55#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
56#[serde(default)]
57pub struct SelfImportOption {
58    pub db: String,
59}
60
61impl Default for SelfImportOption {
62    fn default() -> Self {
63        Self {
64            db: "greptime_metrics".to_string(),
65        }
66    }
67}
68
69impl Default for ExportMetricsOption {
70    fn default() -> Self {
71        Self {
72            enable: false,
73            write_interval: Duration::from_secs(30),
74            self_import: None,
75            remote_write: None,
76        }
77    }
78}
79
80#[derive(Default, Clone)]
81pub struct ExportMetricsTask {
82    config: ExportMetricsOption,
83    filter: Option<MetricFilter>,
84    headers: HeaderMap<HeaderValue>,
85    pub send_by_handler: bool,
86}
87
88impl ExportMetricsTask {
89    pub fn try_new(
90        config: &ExportMetricsOption,
91        plugins: Option<&Plugins>,
92    ) -> Result<Option<Self>> {
93        if !config.enable {
94            return Ok(None);
95        }
96        let filter = plugins.map(|p| p.get::<MetricFilter>()).unwrap_or(None);
97        ensure!(
98            config.write_interval.as_secs() != 0,
99            InvalidExportMetricsConfigSnafu {
100                msg: "Expected export metrics write_interval greater than zero"
101            }
102        );
103        ensure!(
104            (config.remote_write.is_none() && config.self_import.is_some())
105                || (config.remote_write.is_some() && config.self_import.is_none()),
106            InvalidExportMetricsConfigSnafu {
107                msg: "Only one of `self_import` or `remote_write` can be used as the export method"
108            }
109        );
110        if let Some(self_import) = &config.self_import {
111            ensure!(
112                !self_import.db.is_empty(),
113                InvalidExportMetricsConfigSnafu {
114                    msg: "Expected `self_import` metrics `db` not empty"
115                }
116            );
117        }
118        let mut headers = HeaderMap::new();
119        if let Some(remote_write) = &config.remote_write {
120            ensure!(
121                !remote_write.url.is_empty(),
122                InvalidExportMetricsConfigSnafu {
123                    msg: "Expected `remote_write` metrics `url` not empty"
124                }
125            );
126            // construct http header
127            remote_write.headers.iter().try_for_each(|(k, v)| {
128                let header = match TryInto::<HeaderName>::try_into(k) {
129                    Ok(header) => header,
130                    Err(_) => {
131                        return InvalidExportMetricsConfigSnafu {
132                            msg: format!("Export metrics: invalid HTTP header name: {}", k),
133                        }
134                        .fail()
135                    }
136                };
137                match TryInto::<HeaderValue>::try_into(v) {
138                    Ok(value) => headers.insert(header, value),
139                    Err(_) => {
140                        return InvalidExportMetricsConfigSnafu {
141                            msg: format!("Export metrics: invalid HTTP header value: {}", v),
142                        }
143                        .fail()
144                    }
145                };
146                Ok(())
147            })?;
148        }
149        Ok(Some(Self {
150            config: config.clone(),
151            filter,
152            headers,
153            send_by_handler: config.self_import.is_some(),
154        }))
155    }
156
157    pub fn start(&self, handler: Option<PromStoreProtocolHandlerRef>) -> Result<()> {
158        if !self.config.enable {
159            return Ok(());
160        }
161        let interval = time::interval(self.config.write_interval);
162        let filter = self.filter.clone();
163        let _handle = if let Some(self_import) = &self.config.self_import {
164            ensure!(
165                handler.is_some(),
166                InvalidExportMetricsConfigSnafu {
167                    msg: "Only `frontend` or `standalone` can use `self_import` as export method."
168                }
169            );
170            common_runtime::spawn_global(write_system_metric_by_handler(
171                self_import.db.clone(),
172                handler.unwrap(),
173                filter,
174                interval,
175            ))
176        } else if let Some(remote_write) = &self.config.remote_write {
177            common_runtime::spawn_global(write_system_metric_by_network(
178                self.headers.clone(),
179                remote_write.url.clone(),
180                filter,
181                interval,
182            ))
183        } else {
184            unreachable!()
185        };
186        Ok(())
187    }
188}
189
190/// Send metrics collected by standard Prometheus [RemoteWrite format](https://prometheus.io/docs/concepts/remote_write_spec/)
191pub async fn write_system_metric_by_network(
192    headers: HeaderMap,
193    endpoint: String,
194    filter: Option<MetricFilter>,
195    mut interval: Interval,
196) {
197    info!(
198        "Start export metrics task to endpoint: {}, interval: {}s",
199        endpoint,
200        interval.period().as_secs()
201    );
202    // Pass the first tick. Because the first tick completes immediately.
203    interval.tick().await;
204    let client = reqwest::Client::new();
205    loop {
206        interval.tick().await;
207        let metric_families = prometheus::gather();
208        let request = convert_metric_to_write_request(
209            metric_families,
210            filter.as_ref(),
211            Timestamp::current_millis().value(),
212        );
213        let resp = match snappy_compress(&request.encode_to_vec()) {
214            Ok(body) => client
215                .post(endpoint.as_str())
216                .header("X-Prometheus-Remote-Write-Version", "0.1.0")
217                .header("Content-Type", "application/x-protobuf")
218                .headers(headers.clone())
219                .body(body)
220                .send()
221                .await
222                .context(SendPromRemoteRequestSnafu),
223            Err(e) => Err(e),
224        };
225        match resp {
226            Ok(resp) => {
227                if !resp.status().is_success() {
228                    error!("report export metrics error, msg: {:#?}", resp);
229                }
230            }
231            Err(e) => error!(e; "report export metrics failed"),
232        };
233    }
234}
235
236/// Send metrics collected by our internal handler
237/// for case `frontend` and `standalone` dispose it's own metrics,
238/// reducing compression and network transmission overhead.
239pub async fn write_system_metric_by_handler(
240    db: String,
241    handler: PromStoreProtocolHandlerRef,
242    filter: Option<MetricFilter>,
243    mut interval: Interval,
244) {
245    info!(
246        "Start export metrics task by handler, interval: {}s",
247        interval.period().as_secs()
248    );
249    // Pass the first tick. Because the first tick completes immediately.
250    interval.tick().await;
251    let ctx = Arc::new(QueryContextBuilder::default().current_schema(db).build());
252    loop {
253        interval.tick().await;
254        let metric_families = prometheus::gather();
255        let request = convert_metric_to_write_request(
256            metric_families,
257            filter.as_ref(),
258            Timestamp::current_millis().value(),
259        );
260
261        let (requests, samples) = match to_grpc_row_insert_requests(&request) {
262            Ok((requests, samples)) => (requests, samples),
263            Err(e) => {
264                error!(e; "Failed to convert gathered metrics to RowInsertRequests");
265                continue;
266            }
267        };
268
269        if let Err(e) = handler.write(requests, ctx.clone(), false).await {
270            error!(e; "report export metrics by handler failed");
271        } else {
272            crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64);
273        }
274    }
275}
276
277#[cfg(test)]
278mod test {
279    use std::time::Duration;
280
281    use crate::export_metrics::{
282        ExportMetricsOption, ExportMetricsTask, RemoteWriteOption, SelfImportOption,
283    };
284
285    #[tokio::test]
286    async fn test_config() {
287        // zero write_interval
288        assert!(ExportMetricsTask::try_new(
289            &ExportMetricsOption {
290                enable: true,
291                write_interval: Duration::from_secs(0),
292                ..Default::default()
293            },
294            None
295        )
296        .is_err());
297        // none self_import and remote_write
298        assert!(ExportMetricsTask::try_new(
299            &ExportMetricsOption {
300                enable: true,
301                ..Default::default()
302            },
303            None
304        )
305        .is_err());
306        // both self_import and remote_write
307        assert!(ExportMetricsTask::try_new(
308            &ExportMetricsOption {
309                enable: true,
310                self_import: Some(SelfImportOption::default()),
311                remote_write: Some(RemoteWriteOption::default()),
312                ..Default::default()
313            },
314            None
315        )
316        .is_err());
317        // empty db
318        assert!(ExportMetricsTask::try_new(
319            &ExportMetricsOption {
320                enable: true,
321                self_import: Some(SelfImportOption {
322                    db: String::default()
323                }),
324                remote_write: None,
325                ..Default::default()
326            },
327            None
328        )
329        .is_err());
330        // empty url
331        assert!(ExportMetricsTask::try_new(
332            &ExportMetricsOption {
333                enable: true,
334                self_import: None,
335                remote_write: Some(RemoteWriteOption {
336                    url: String::default(),
337                    ..Default::default()
338                }),
339                ..Default::default()
340            },
341            None
342        )
343        .is_err());
344        // self import but no handle
345        let s = ExportMetricsTask::try_new(
346            &ExportMetricsOption {
347                enable: true,
348                self_import: Some(SelfImportOption::default()),
349                ..Default::default()
350            },
351            None,
352        )
353        .unwrap()
354        .unwrap();
355        assert!(s.start(None).is_err());
356    }
357}