common_greptimedb_telemetry/
lib.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::env;
16use std::io::ErrorKind;
17use std::path::{Path, PathBuf};
18use std::sync::atomic::{AtomicBool, Ordering};
19use std::sync::{Arc, LazyLock};
20use std::time::{Duration, SystemTime};
21
22use common_runtime::error::{Error, Result};
23use common_runtime::{BoxedTaskFunction, RepeatedTask, TaskFunction};
24use common_telemetry::{debug, info};
25use common_version::build_info;
26use reqwest::{Client, Response};
27use serde::{Deserialize, Serialize};
28
29/// The URL to report telemetry data.
30pub const TELEMETRY_URL: &str = "https://telemetry.greptimestats.com/db/otel/statistics";
31/// The local installation uuid cache file
32const UUID_FILE_NAME: &str = ".greptimedb-telemetry-uuid";
33
34/// System start time for uptime calculation
35static START_TIME: LazyLock<SystemTime> = LazyLock::new(SystemTime::now);
36
37/// The default interval of reporting telemetry data to greptime cloud
38pub static TELEMETRY_INTERVAL: Duration = Duration::from_secs(60 * 30);
39/// The default connect timeout to greptime cloud.
40const GREPTIMEDB_TELEMETRY_CLIENT_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
41/// The default request timeout to greptime cloud.
42const GREPTIMEDB_TELEMETRY_CLIENT_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
43
44pub enum GreptimeDBTelemetryTask {
45    Enable((RepeatedTask<Error>, Arc<AtomicBool>)),
46    Disable,
47}
48
49impl GreptimeDBTelemetryTask {
50    pub fn should_report(&self, value: bool) {
51        match self {
52            GreptimeDBTelemetryTask::Enable((_, should_report)) => {
53                should_report.store(value, Ordering::Relaxed);
54            }
55            GreptimeDBTelemetryTask::Disable => {}
56        }
57    }
58
59    pub fn enable(
60        interval: Duration,
61        task_fn: BoxedTaskFunction<Error>,
62        should_report: Arc<AtomicBool>,
63    ) -> Self {
64        GreptimeDBTelemetryTask::Enable((
65            RepeatedTask::new(interval, task_fn).with_initial_delay(Some(Duration::ZERO)),
66            should_report,
67        ))
68    }
69
70    pub fn disable() -> Self {
71        GreptimeDBTelemetryTask::Disable
72    }
73
74    pub fn start(&self) -> Result<()> {
75        match self {
76            GreptimeDBTelemetryTask::Enable((task, _)) => {
77                print_anonymous_usage_data_disclaimer();
78                task.start(common_runtime::global_runtime())
79            }
80            GreptimeDBTelemetryTask::Disable => Ok(()),
81        }
82    }
83
84    pub async fn stop(&self) -> Result<()> {
85        match self {
86            GreptimeDBTelemetryTask::Enable((task, _)) => task.stop().await,
87            GreptimeDBTelemetryTask::Disable => Ok(()),
88        }
89    }
90}
91
92/// Telemetry data to report
93#[derive(Serialize, Deserialize, Debug)]
94struct StatisticData {
95    /// Operating system name, such as `linux`, `windows` etc.
96    pub os: String,
97    /// The greptimedb version
98    pub version: String,
99    /// The architecture of the CPU, such as `x86`, `x86_64` etc.
100    pub arch: String,
101    /// The running mode, `standalone` or `distributed`.
102    pub mode: Mode,
103    /// The git commit revision of greptimedb
104    pub git_commit: String,
105    /// The node number
106    pub nodes: Option<i32>,
107    /// The local installation uuid
108    pub uuid: String,
109    /// System uptime range (e.g., "hours", "days", "weeks")
110    pub uptime: String,
111}
112
113#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
114#[serde(rename_all = "lowercase")]
115pub enum Mode {
116    Distributed,
117    Standalone,
118}
119
120#[async_trait::async_trait]
121pub trait Collector {
122    fn get_version(&self) -> String {
123        build_info().version.to_string()
124    }
125
126    fn get_git_hash(&self) -> String {
127        build_info().commit.to_string()
128    }
129
130    fn get_os(&self) -> String {
131        env::consts::OS.to_string()
132    }
133
134    fn get_arch(&self) -> String {
135        env::consts::ARCH.to_string()
136    }
137
138    fn get_mode(&self) -> Mode;
139
140    fn get_retry(&self) -> i32;
141
142    fn inc_retry(&mut self);
143
144    fn set_uuid_cache(&mut self, uuid: String);
145
146    fn get_uuid_cache(&self) -> Option<String>;
147
148    async fn get_nodes(&self) -> Option<i32>;
149
150    fn get_uuid(&mut self, working_home: &Option<String>) -> Option<String> {
151        match self.get_uuid_cache() {
152            Some(uuid) => Some(uuid),
153            None => {
154                if self.get_retry() > 3 {
155                    return None;
156                }
157                match default_get_uuid(working_home) {
158                    Some(uuid) => {
159                        self.set_uuid_cache(uuid.clone());
160                        Some(uuid)
161                    }
162                    None => {
163                        self.inc_retry();
164                        None
165                    }
166                }
167            }
168        }
169    }
170}
171
172fn print_anonymous_usage_data_disclaimer() {
173    info!(
174        "Attention: GreptimeDB now collects anonymous usage data to help improve its roadmap and prioritize features."
175    );
176    info!(
177        "To learn more about this anonymous program and how to deactivate it if you don't want to participate, please visit the following URL: "
178    );
179    info!("https://docs.greptime.com/reference/telemetry");
180}
181
182/// Format uptime duration into a general time range string
183/// Returns privacy-friendly descriptions like "hours", "days", etc.
184fn format_uptime() -> String {
185    let uptime_duration = START_TIME.elapsed().unwrap_or(Duration::ZERO);
186    let total_seconds = uptime_duration.as_secs();
187
188    if total_seconds < 86400 {
189        "hours".to_string()
190    } else if total_seconds < 604800 {
191        "days".to_string()
192    } else if total_seconds < 2629746 {
193        "weeks".to_string()
194    } else if total_seconds < 31556952 {
195        "months".to_string()
196    } else {
197        "years".to_string()
198    }
199}
200
201pub fn default_get_uuid(working_home: &Option<String>) -> Option<String> {
202    let temp_dir = env::temp_dir();
203
204    let mut path = PathBuf::new();
205    path.push(
206        working_home
207            .as_ref()
208            .map(Path::new)
209            .unwrap_or_else(|| temp_dir.as_path()),
210    );
211    path.push(UUID_FILE_NAME);
212
213    let path = path.as_path();
214    match std::fs::read(path) {
215        Ok(bytes) => Some(String::from_utf8_lossy(&bytes).to_string()),
216        Err(e) => {
217            if e.kind() == ErrorKind::NotFound {
218                let uuid = uuid::Uuid::new_v4().to_string();
219                let _ = std::fs::write(path, uuid.as_bytes());
220                Some(uuid)
221            } else {
222                None
223            }
224        }
225    }
226}
227
228/// Report version info to GreptimeDB.
229///
230/// We do not collect any identity-sensitive information.
231/// This task is scheduled to run every 30 minutes.
232/// The task will be disabled default. It can be enabled by setting the build feature `greptimedb-telemetry`
233/// Collector is used to collect the version info. It can be implemented by different components.
234/// client is used to send the HTTP request to GreptimeDB.
235/// telemetry_url is the GreptimeDB url.
236pub struct GreptimeDBTelemetry {
237    statistics: Box<dyn Collector + Send + Sync>,
238    client: Option<Client>,
239    working_home: Option<String>,
240    telemetry_url: &'static str,
241    should_report: Arc<AtomicBool>,
242    report_times: usize,
243}
244
245#[async_trait::async_trait]
246impl TaskFunction<Error> for GreptimeDBTelemetry {
247    fn name(&self) -> &str {
248        "Greptimedb-telemetry-task"
249    }
250
251    async fn call(&mut self) -> Result<()> {
252        if self.should_report.load(Ordering::Relaxed) {
253            self.report_telemetry_info().await;
254        }
255        Ok(())
256    }
257}
258
259impl GreptimeDBTelemetry {
260    pub fn new(
261        working_home: Option<String>,
262        statistics: Box<dyn Collector + Send + Sync>,
263        should_report: Arc<AtomicBool>,
264    ) -> Self {
265        let client = Client::builder()
266            .connect_timeout(GREPTIMEDB_TELEMETRY_CLIENT_CONNECT_TIMEOUT)
267            .timeout(GREPTIMEDB_TELEMETRY_CLIENT_REQUEST_TIMEOUT)
268            .build();
269        Self {
270            working_home,
271            statistics,
272            client: client.ok(),
273            telemetry_url: TELEMETRY_URL,
274            should_report,
275            report_times: 0,
276        }
277    }
278
279    pub async fn report_telemetry_info(&mut self) -> Option<Response> {
280        match self.statistics.get_uuid(&self.working_home) {
281            Some(uuid) => {
282                let data = StatisticData {
283                    os: self.statistics.get_os(),
284                    version: self.statistics.get_version(),
285                    git_commit: self.statistics.get_git_hash(),
286                    arch: self.statistics.get_arch(),
287                    mode: self.statistics.get_mode(),
288                    nodes: self.statistics.get_nodes().await,
289                    uuid,
290                    uptime: format_uptime(),
291                };
292
293                if let Some(client) = self.client.as_ref() {
294                    if self.report_times == 0 {
295                        info!("reporting greptimedb version: {:?}", data);
296                    }
297                    let result = client.post(self.telemetry_url).json(&data).send().await;
298                    self.report_times += 1;
299                    debug!("report version result: {:?}", result);
300                    result.ok()
301                } else {
302                    None
303                }
304            }
305            None => None,
306        }
307    }
308}
309
310#[cfg(test)]
311mod tests {
312    use std::convert::Infallible;
313    use std::env;
314    use std::sync::Arc;
315    use std::sync::atomic::{AtomicBool, AtomicUsize};
316    use std::time::Duration;
317
318    use common_test_util::ports;
319    use common_version::build_info;
320    use hyper::Server;
321    use hyper::service::{make_service_fn, service_fn};
322    use reqwest::{Client, Response};
323    use tokio::spawn;
324
325    use crate::{
326        Collector, GreptimeDBTelemetry, Mode, StatisticData, default_get_uuid, format_uptime,
327    };
328
329    static COUNT: AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
330
331    async fn echo(req: hyper::Request<hyper::Body>) -> hyper::Result<hyper::Response<hyper::Body>> {
332        let path = req.uri().path();
333        if path == "/req-cnt" {
334            let body = hyper::Body::from(format!(
335                "{}",
336                COUNT.load(std::sync::atomic::Ordering::SeqCst)
337            ));
338            Ok(hyper::Response::new(body))
339        } else {
340            COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
341            Ok(hyper::Response::new(req.into_body()))
342        }
343    }
344
345    #[tokio::test]
346    async fn test_gretimedb_telemetry() {
347        let (tx, rx) = tokio::sync::oneshot::channel::<()>();
348        let port: u16 = ports::get_port() as u16;
349        spawn(async move {
350            let make_svc = make_service_fn(|_conn| {
351                // This is the `Service` that will handle the connection.
352                // `service_fn` is a helper to convert a function that
353                // returns a Response into a `Service`.
354                async { Ok::<_, Infallible>(service_fn(echo)) }
355            });
356            let addr = ([127, 0, 0, 1], port).into();
357
358            let server = Server::try_bind(&addr).unwrap().serve(make_svc);
359            let graceful = server.with_graceful_shutdown(async {
360                rx.await.ok();
361            });
362            let _ = graceful.await;
363            Ok::<_, Infallible>(())
364        });
365        struct TestStatistic;
366
367        struct FailedStatistic;
368
369        #[async_trait::async_trait]
370        impl Collector for TestStatistic {
371            fn get_mode(&self) -> Mode {
372                Mode::Standalone
373            }
374
375            async fn get_nodes(&self) -> Option<i32> {
376                Some(1)
377            }
378
379            fn get_retry(&self) -> i32 {
380                unimplemented!()
381            }
382
383            fn inc_retry(&mut self) {
384                unimplemented!()
385            }
386
387            fn set_uuid_cache(&mut self, _: String) {
388                unimplemented!()
389            }
390
391            fn get_uuid_cache(&self) -> Option<String> {
392                unimplemented!()
393            }
394
395            fn get_uuid(&mut self, _working_home: &Option<String>) -> Option<String> {
396                Some("test".to_string())
397            }
398        }
399
400        #[async_trait::async_trait]
401        impl Collector for FailedStatistic {
402            fn get_mode(&self) -> Mode {
403                Mode::Standalone
404            }
405
406            async fn get_nodes(&self) -> Option<i32> {
407                None
408            }
409
410            fn get_retry(&self) -> i32 {
411                unimplemented!()
412            }
413
414            fn inc_retry(&mut self) {
415                unimplemented!()
416            }
417
418            fn set_uuid_cache(&mut self, _: String) {
419                unimplemented!()
420            }
421
422            fn get_uuid_cache(&self) -> Option<String> {
423                unimplemented!()
424            }
425
426            fn get_uuid(&mut self, _working_home: &Option<String>) -> Option<String> {
427                None
428            }
429        }
430
431        async fn get_telemetry_report(
432            mut report: GreptimeDBTelemetry,
433            url: &'static str,
434        ) -> Option<Response> {
435            report.telemetry_url = url;
436            report.report_telemetry_info().await
437        }
438
439        fn contravariance<'a>(x: &'a str) -> &'static str
440        where
441            'static: 'a,
442        {
443            unsafe { std::mem::transmute(x) }
444        }
445
446        let working_home_temp = tempfile::Builder::new()
447            .prefix("greptimedb_telemetry")
448            .tempdir()
449            .unwrap();
450        let working_home = working_home_temp.path().to_str().unwrap().to_string();
451
452        let test_statistic = Box::new(TestStatistic);
453        let test_report = GreptimeDBTelemetry::new(
454            Some(working_home.clone()),
455            test_statistic,
456            Arc::new(AtomicBool::new(true)),
457        );
458        let url = format!("http://localhost:{}", port);
459        let response = {
460            let url = contravariance(url.as_str());
461            get_telemetry_report(test_report, url).await.unwrap()
462        };
463
464        let body = response.json::<StatisticData>().await.unwrap();
465        assert_eq!(env::consts::ARCH, body.arch);
466        assert_eq!(env::consts::OS, body.os);
467        assert_eq!(build_info().version, body.version);
468        assert_eq!(build_info().commit, body.git_commit);
469        assert_eq!(Mode::Standalone, body.mode);
470        assert_eq!(1, body.nodes.unwrap());
471        assert!(!body.uptime.is_empty());
472
473        let failed_statistic = Box::new(FailedStatistic);
474        let failed_report = GreptimeDBTelemetry::new(
475            Some(working_home),
476            failed_statistic,
477            Arc::new(AtomicBool::new(true)),
478        );
479        let response = {
480            let url = contravariance(url.as_str());
481            get_telemetry_report(failed_report, url).await
482        };
483        assert!(response.is_none());
484
485        let client = Client::builder()
486            .connect_timeout(Duration::from_secs(3))
487            .timeout(Duration::from_secs(3))
488            .build()
489            .unwrap();
490
491        let cnt_url = format!("{}/req-cnt", url);
492        let response = client.get(cnt_url).send().await.unwrap();
493        let body = response.text().await.unwrap();
494        assert_eq!("1", body);
495        tx.send(()).unwrap();
496    }
497
498    #[test]
499    fn test_get_uuid() {
500        let working_home_temp = tempfile::Builder::new()
501            .prefix("greptimedb_telemetry")
502            .tempdir()
503            .unwrap();
504        let working_home = working_home_temp.path().to_str().unwrap().to_string();
505
506        let uuid = default_get_uuid(&Some(working_home.clone()));
507        assert!(uuid.is_some());
508        assert_eq!(uuid, default_get_uuid(&Some(working_home.clone())));
509        assert_eq!(uuid, default_get_uuid(&Some(working_home)));
510    }
511
512    #[test]
513    fn test_format_uptime() {
514        let uptime = format_uptime();
515        assert!(!uptime.is_empty());
516        // Should be a valid general time range (no specific numbers)
517        assert!(
518            uptime == "hours"
519                || uptime == "days"
520                || uptime == "weeks"
521                || uptime == "months"
522                || uptime == "years"
523        );
524    }
525}