common_greptimedb_telemetry/
lib.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::env;
16use std::io::ErrorKind;
17use std::path::{Path, PathBuf};
18use std::sync::atomic::{AtomicBool, Ordering};
19use std::sync::Arc;
20use std::time::Duration;
21
22use common_runtime::error::{Error, Result};
23use common_runtime::{BoxedTaskFunction, RepeatedTask, TaskFunction};
24use common_telemetry::{debug, info};
25use common_version::build_info;
26use reqwest::{Client, Response};
27use serde::{Deserialize, Serialize};
28
29/// The URL to report telemetry data.
30pub const TELEMETRY_URL: &str = "https://telemetry.greptimestats.com/db/otel/statistics";
31/// The local installation uuid cache file
32const UUID_FILE_NAME: &str = ".greptimedb-telemetry-uuid";
33
34/// The default interval of reporting telemetry data to greptime cloud
35pub static TELEMETRY_INTERVAL: Duration = Duration::from_secs(60 * 30);
36/// The default connect timeout to greptime cloud.
37const GREPTIMEDB_TELEMETRY_CLIENT_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
38/// The default request timeout to greptime cloud.
39const GREPTIMEDB_TELEMETRY_CLIENT_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
40
41pub enum GreptimeDBTelemetryTask {
42    Enable((RepeatedTask<Error>, Arc<AtomicBool>)),
43    Disable,
44}
45
46impl GreptimeDBTelemetryTask {
47    pub fn should_report(&self, value: bool) {
48        match self {
49            GreptimeDBTelemetryTask::Enable((_, should_report)) => {
50                should_report.store(value, Ordering::Relaxed);
51            }
52            GreptimeDBTelemetryTask::Disable => {}
53        }
54    }
55
56    pub fn enable(
57        interval: Duration,
58        task_fn: BoxedTaskFunction<Error>,
59        should_report: Arc<AtomicBool>,
60    ) -> Self {
61        GreptimeDBTelemetryTask::Enable((
62            RepeatedTask::new(interval, task_fn).with_initial_delay(Some(Duration::ZERO)),
63            should_report,
64        ))
65    }
66
67    pub fn disable() -> Self {
68        GreptimeDBTelemetryTask::Disable
69    }
70
71    pub fn start(&self) -> Result<()> {
72        match self {
73            GreptimeDBTelemetryTask::Enable((task, _)) => {
74                print_anonymous_usage_data_disclaimer();
75                task.start(common_runtime::global_runtime())
76            }
77            GreptimeDBTelemetryTask::Disable => Ok(()),
78        }
79    }
80
81    pub async fn stop(&self) -> Result<()> {
82        match self {
83            GreptimeDBTelemetryTask::Enable((task, _)) => task.stop().await,
84            GreptimeDBTelemetryTask::Disable => Ok(()),
85        }
86    }
87}
88
89/// Telemetry data to report
90#[derive(Serialize, Deserialize, Debug)]
91struct StatisticData {
92    /// Operating system name, such as `linux`, `windows` etc.
93    pub os: String,
94    /// The greptimedb version
95    pub version: String,
96    /// The architecture of the CPU, such as `x86`, `x86_64` etc.
97    pub arch: String,
98    /// The running mode, `standalone` or `distributed`.
99    pub mode: Mode,
100    /// The git commit revision of greptimedb
101    pub git_commit: String,
102    /// The node number
103    pub nodes: Option<i32>,
104    /// The local installation uuid
105    pub uuid: String,
106}
107
108#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
109#[serde(rename_all = "lowercase")]
110pub enum Mode {
111    Distributed,
112    Standalone,
113}
114
115#[async_trait::async_trait]
116pub trait Collector {
117    fn get_version(&self) -> String {
118        build_info().version.to_string()
119    }
120
121    fn get_git_hash(&self) -> String {
122        build_info().commit.to_string()
123    }
124
125    fn get_os(&self) -> String {
126        env::consts::OS.to_string()
127    }
128
129    fn get_arch(&self) -> String {
130        env::consts::ARCH.to_string()
131    }
132
133    fn get_mode(&self) -> Mode;
134
135    fn get_retry(&self) -> i32;
136
137    fn inc_retry(&mut self);
138
139    fn set_uuid_cache(&mut self, uuid: String);
140
141    fn get_uuid_cache(&self) -> Option<String>;
142
143    async fn get_nodes(&self) -> Option<i32>;
144
145    fn get_uuid(&mut self, working_home: &Option<String>) -> Option<String> {
146        match self.get_uuid_cache() {
147            Some(uuid) => Some(uuid),
148            None => {
149                if self.get_retry() > 3 {
150                    return None;
151                }
152                match default_get_uuid(working_home) {
153                    Some(uuid) => {
154                        self.set_uuid_cache(uuid.clone());
155                        Some(uuid)
156                    }
157                    None => {
158                        self.inc_retry();
159                        None
160                    }
161                }
162            }
163        }
164    }
165}
166
167fn print_anonymous_usage_data_disclaimer() {
168    info!("Attention: GreptimeDB now collects anonymous usage data to help improve its roadmap and prioritize features.");
169    info!(
170        "To learn more about this anonymous program and how to deactivate it if you don't want to participate, please visit the following URL: ");
171    info!("https://docs.greptime.com/reference/telemetry");
172}
173
174pub fn default_get_uuid(working_home: &Option<String>) -> Option<String> {
175    let temp_dir = env::temp_dir();
176
177    let mut path = PathBuf::new();
178    path.push(
179        working_home
180            .as_ref()
181            .map(Path::new)
182            .unwrap_or_else(|| temp_dir.as_path()),
183    );
184    path.push(UUID_FILE_NAME);
185
186    let path = path.as_path();
187    match std::fs::read(path) {
188        Ok(bytes) => Some(String::from_utf8_lossy(&bytes).to_string()),
189        Err(e) => {
190            if e.kind() == ErrorKind::NotFound {
191                let uuid = uuid::Uuid::new_v4().to_string();
192                let _ = std::fs::write(path, uuid.as_bytes());
193                Some(uuid)
194            } else {
195                None
196            }
197        }
198    }
199}
200
201/// Report version info to GreptimeDB.
202///
203/// We do not collect any identity-sensitive information.
204/// This task is scheduled to run every 30 minutes.
205/// The task will be disabled default. It can be enabled by setting the build feature `greptimedb-telemetry`
206/// Collector is used to collect the version info. It can be implemented by different components.
207/// client is used to send the HTTP request to GreptimeDB.
208/// telemetry_url is the GreptimeDB url.
209pub struct GreptimeDBTelemetry {
210    statistics: Box<dyn Collector + Send + Sync>,
211    client: Option<Client>,
212    working_home: Option<String>,
213    telemetry_url: &'static str,
214    should_report: Arc<AtomicBool>,
215    report_times: usize,
216}
217
218#[async_trait::async_trait]
219impl TaskFunction<Error> for GreptimeDBTelemetry {
220    fn name(&self) -> &str {
221        "Greptimedb-telemetry-task"
222    }
223
224    async fn call(&mut self) -> Result<()> {
225        if self.should_report.load(Ordering::Relaxed) {
226            self.report_telemetry_info().await;
227        }
228        Ok(())
229    }
230}
231
232impl GreptimeDBTelemetry {
233    pub fn new(
234        working_home: Option<String>,
235        statistics: Box<dyn Collector + Send + Sync>,
236        should_report: Arc<AtomicBool>,
237    ) -> Self {
238        let client = Client::builder()
239            .connect_timeout(GREPTIMEDB_TELEMETRY_CLIENT_CONNECT_TIMEOUT)
240            .timeout(GREPTIMEDB_TELEMETRY_CLIENT_REQUEST_TIMEOUT)
241            .build();
242        Self {
243            working_home,
244            statistics,
245            client: client.ok(),
246            telemetry_url: TELEMETRY_URL,
247            should_report,
248            report_times: 0,
249        }
250    }
251
252    pub async fn report_telemetry_info(&mut self) -> Option<Response> {
253        match self.statistics.get_uuid(&self.working_home) {
254            Some(uuid) => {
255                let data = StatisticData {
256                    os: self.statistics.get_os(),
257                    version: self.statistics.get_version(),
258                    git_commit: self.statistics.get_git_hash(),
259                    arch: self.statistics.get_arch(),
260                    mode: self.statistics.get_mode(),
261                    nodes: self.statistics.get_nodes().await,
262                    uuid,
263                };
264
265                if let Some(client) = self.client.as_ref() {
266                    if self.report_times == 0 {
267                        info!("reporting greptimedb version: {:?}", data);
268                    }
269                    let result = client.post(self.telemetry_url).json(&data).send().await;
270                    self.report_times += 1;
271                    debug!("report version result: {:?}", result);
272                    result.ok()
273                } else {
274                    None
275                }
276            }
277            None => None,
278        }
279    }
280}
281
282#[cfg(test)]
283mod tests {
284    use std::convert::Infallible;
285    use std::env;
286    use std::sync::atomic::{AtomicBool, AtomicUsize};
287    use std::sync::Arc;
288    use std::time::Duration;
289
290    use common_test_util::ports;
291    use common_version::build_info;
292    use hyper::service::{make_service_fn, service_fn};
293    use hyper::Server;
294    use reqwest::{Client, Response};
295    use tokio::spawn;
296
297    use crate::{default_get_uuid, Collector, GreptimeDBTelemetry, Mode, StatisticData};
298
299    static COUNT: AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
300
301    async fn echo(req: hyper::Request<hyper::Body>) -> hyper::Result<hyper::Response<hyper::Body>> {
302        let path = req.uri().path();
303        if path == "/req-cnt" {
304            let body = hyper::Body::from(format!(
305                "{}",
306                COUNT.load(std::sync::atomic::Ordering::SeqCst)
307            ));
308            Ok(hyper::Response::new(body))
309        } else {
310            COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
311            Ok(hyper::Response::new(req.into_body()))
312        }
313    }
314
315    #[tokio::test]
316    async fn test_gretimedb_telemetry() {
317        let (tx, rx) = tokio::sync::oneshot::channel::<()>();
318        let port: u16 = ports::get_port() as u16;
319        spawn(async move {
320            let make_svc = make_service_fn(|_conn| {
321                // This is the `Service` that will handle the connection.
322                // `service_fn` is a helper to convert a function that
323                // returns a Response into a `Service`.
324                async { Ok::<_, Infallible>(service_fn(echo)) }
325            });
326            let addr = ([127, 0, 0, 1], port).into();
327
328            let server = Server::try_bind(&addr).unwrap().serve(make_svc);
329            let graceful = server.with_graceful_shutdown(async {
330                rx.await.ok();
331            });
332            let _ = graceful.await;
333            Ok::<_, Infallible>(())
334        });
335        struct TestStatistic;
336
337        struct FailedStatistic;
338
339        #[async_trait::async_trait]
340        impl Collector for TestStatistic {
341            fn get_mode(&self) -> Mode {
342                Mode::Standalone
343            }
344
345            async fn get_nodes(&self) -> Option<i32> {
346                Some(1)
347            }
348
349            fn get_retry(&self) -> i32 {
350                unimplemented!()
351            }
352
353            fn inc_retry(&mut self) {
354                unimplemented!()
355            }
356
357            fn set_uuid_cache(&mut self, _: String) {
358                unimplemented!()
359            }
360
361            fn get_uuid_cache(&self) -> Option<String> {
362                unimplemented!()
363            }
364
365            fn get_uuid(&mut self, _working_home: &Option<String>) -> Option<String> {
366                Some("test".to_string())
367            }
368        }
369
370        #[async_trait::async_trait]
371        impl Collector for FailedStatistic {
372            fn get_mode(&self) -> Mode {
373                Mode::Standalone
374            }
375
376            async fn get_nodes(&self) -> Option<i32> {
377                None
378            }
379
380            fn get_retry(&self) -> i32 {
381                unimplemented!()
382            }
383
384            fn inc_retry(&mut self) {
385                unimplemented!()
386            }
387
388            fn set_uuid_cache(&mut self, _: String) {
389                unimplemented!()
390            }
391
392            fn get_uuid_cache(&self) -> Option<String> {
393                unimplemented!()
394            }
395
396            fn get_uuid(&mut self, _working_home: &Option<String>) -> Option<String> {
397                None
398            }
399        }
400
401        async fn get_telemetry_report(
402            mut report: GreptimeDBTelemetry,
403            url: &'static str,
404        ) -> Option<Response> {
405            report.telemetry_url = url;
406            report.report_telemetry_info().await
407        }
408
409        fn contravariance<'a>(x: &'a str) -> &'static str
410        where
411            'static: 'a,
412        {
413            unsafe { std::mem::transmute(x) }
414        }
415
416        let working_home_temp = tempfile::Builder::new()
417            .prefix("greptimedb_telemetry")
418            .tempdir()
419            .unwrap();
420        let working_home = working_home_temp.path().to_str().unwrap().to_string();
421
422        let test_statistic = Box::new(TestStatistic);
423        let test_report = GreptimeDBTelemetry::new(
424            Some(working_home.clone()),
425            test_statistic,
426            Arc::new(AtomicBool::new(true)),
427        );
428        let url = format!("http://localhost:{}", port);
429        let response = {
430            let url = contravariance(url.as_str());
431            get_telemetry_report(test_report, url).await.unwrap()
432        };
433
434        let body = response.json::<StatisticData>().await.unwrap();
435        assert_eq!(env::consts::ARCH, body.arch);
436        assert_eq!(env::consts::OS, body.os);
437        assert_eq!(build_info().version, body.version);
438        assert_eq!(build_info().commit, body.git_commit);
439        assert_eq!(Mode::Standalone, body.mode);
440        assert_eq!(1, body.nodes.unwrap());
441
442        let failed_statistic = Box::new(FailedStatistic);
443        let failed_report = GreptimeDBTelemetry::new(
444            Some(working_home),
445            failed_statistic,
446            Arc::new(AtomicBool::new(true)),
447        );
448        let response = {
449            let url = contravariance(url.as_str());
450            get_telemetry_report(failed_report, url).await
451        };
452        assert!(response.is_none());
453
454        let client = Client::builder()
455            .connect_timeout(Duration::from_secs(3))
456            .timeout(Duration::from_secs(3))
457            .build()
458            .unwrap();
459
460        let cnt_url = format!("{}/req-cnt", url);
461        let response = client.get(cnt_url).send().await.unwrap();
462        let body = response.text().await.unwrap();
463        assert_eq!("1", body);
464        tx.send(()).unwrap();
465    }
466
467    #[test]
468    fn test_get_uuid() {
469        let working_home_temp = tempfile::Builder::new()
470            .prefix("greptimedb_telemetry")
471            .tempdir()
472            .unwrap();
473        let working_home = working_home_temp.path().to_str().unwrap().to_string();
474
475        let uuid = default_get_uuid(&Some(working_home.clone()));
476        assert!(uuid.is_some());
477        assert_eq!(uuid, default_get_uuid(&Some(working_home.clone())));
478        assert_eq!(uuid, default_get_uuid(&Some(working_home)));
479    }
480}