common_greptimedb_telemetry/
lib.rs1use std::env;
16use std::io::ErrorKind;
17use std::path::{Path, PathBuf};
18use std::sync::atomic::{AtomicBool, Ordering};
19use std::sync::{Arc, LazyLock};
20use std::time::{Duration, SystemTime};
21
22use common_runtime::error::{Error, Result};
23use common_runtime::{BoxedTaskFunction, RepeatedTask, TaskFunction};
24use common_telemetry::{debug, info};
25use common_version::build_info;
26use reqwest::{Client, Response};
27use serde::{Deserialize, Serialize};
28
29pub const TELEMETRY_URL: &str = "https://telemetry.greptimestats.com/db/otel/statistics";
31const UUID_FILE_NAME: &str = ".greptimedb-telemetry-uuid";
33
34static START_TIME: LazyLock<SystemTime> = LazyLock::new(SystemTime::now);
36
37pub static TELEMETRY_INTERVAL: Duration = Duration::from_secs(60 * 30);
39const GREPTIMEDB_TELEMETRY_CLIENT_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
41const GREPTIMEDB_TELEMETRY_CLIENT_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
43
44pub enum GreptimeDBTelemetryTask {
45 Enable((RepeatedTask<Error>, Arc<AtomicBool>)),
46 Disable,
47}
48
49impl GreptimeDBTelemetryTask {
50 pub fn should_report(&self, value: bool) {
51 match self {
52 GreptimeDBTelemetryTask::Enable((_, should_report)) => {
53 should_report.store(value, Ordering::Relaxed);
54 }
55 GreptimeDBTelemetryTask::Disable => {}
56 }
57 }
58
59 pub fn enable(
60 interval: Duration,
61 task_fn: BoxedTaskFunction<Error>,
62 should_report: Arc<AtomicBool>,
63 ) -> Self {
64 GreptimeDBTelemetryTask::Enable((
65 RepeatedTask::new(interval, task_fn).with_initial_delay(Some(Duration::ZERO)),
66 should_report,
67 ))
68 }
69
70 pub fn disable() -> Self {
71 GreptimeDBTelemetryTask::Disable
72 }
73
74 pub fn start(&self) -> Result<()> {
75 match self {
76 GreptimeDBTelemetryTask::Enable((task, _)) => {
77 print_anonymous_usage_data_disclaimer();
78 task.start(common_runtime::global_runtime())
79 }
80 GreptimeDBTelemetryTask::Disable => Ok(()),
81 }
82 }
83
84 pub async fn stop(&self) -> Result<()> {
85 match self {
86 GreptimeDBTelemetryTask::Enable((task, _)) => task.stop().await,
87 GreptimeDBTelemetryTask::Disable => Ok(()),
88 }
89 }
90}
91
92#[derive(Serialize, Deserialize, Debug)]
94struct StatisticData {
95 pub os: String,
97 pub version: String,
99 pub arch: String,
101 pub mode: Mode,
103 pub git_commit: String,
105 pub nodes: Option<i32>,
107 pub uuid: String,
109 pub uptime: String,
111}
112
113#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
114#[serde(rename_all = "lowercase")]
115pub enum Mode {
116 Distributed,
117 Standalone,
118}
119
120#[async_trait::async_trait]
121pub trait Collector {
122 fn get_version(&self) -> String {
123 build_info().version.to_string()
124 }
125
126 fn get_git_hash(&self) -> String {
127 build_info().commit.to_string()
128 }
129
130 fn get_os(&self) -> String {
131 env::consts::OS.to_string()
132 }
133
134 fn get_arch(&self) -> String {
135 env::consts::ARCH.to_string()
136 }
137
138 fn get_mode(&self) -> Mode;
139
140 fn get_retry(&self) -> i32;
141
142 fn inc_retry(&mut self);
143
144 fn set_uuid_cache(&mut self, uuid: String);
145
146 fn get_uuid_cache(&self) -> Option<String>;
147
148 async fn get_nodes(&self) -> Option<i32>;
149
150 fn get_uuid(&mut self, working_home: &Option<String>) -> Option<String> {
151 match self.get_uuid_cache() {
152 Some(uuid) => Some(uuid),
153 None => {
154 if self.get_retry() > 3 {
155 return None;
156 }
157 match default_get_uuid(working_home) {
158 Some(uuid) => {
159 self.set_uuid_cache(uuid.clone());
160 Some(uuid)
161 }
162 None => {
163 self.inc_retry();
164 None
165 }
166 }
167 }
168 }
169 }
170}
171
172fn print_anonymous_usage_data_disclaimer() {
173 info!(
174 "Attention: GreptimeDB now collects anonymous usage data to help improve its roadmap and prioritize features."
175 );
176 info!(
177 "To learn more about this anonymous program and how to deactivate it if you don't want to participate, please visit the following URL: "
178 );
179 info!("https://docs.greptime.com/reference/telemetry");
180}
181
182fn format_uptime() -> String {
185 let uptime_duration = START_TIME.elapsed().unwrap_or(Duration::ZERO);
186 let total_seconds = uptime_duration.as_secs();
187
188 if total_seconds < 86400 {
189 "hours".to_string()
190 } else if total_seconds < 604800 {
191 "days".to_string()
192 } else if total_seconds < 2629746 {
193 "weeks".to_string()
194 } else if total_seconds < 31556952 {
195 "months".to_string()
196 } else {
197 "years".to_string()
198 }
199}
200
201pub fn default_get_uuid(working_home: &Option<String>) -> Option<String> {
202 let temp_dir = env::temp_dir();
203
204 let mut path = PathBuf::new();
205 path.push(
206 working_home
207 .as_ref()
208 .map(Path::new)
209 .unwrap_or_else(|| temp_dir.as_path()),
210 );
211 path.push(UUID_FILE_NAME);
212
213 let path = path.as_path();
214 match std::fs::read(path) {
215 Ok(bytes) => Some(String::from_utf8_lossy(&bytes).to_string()),
216 Err(e) => {
217 if e.kind() == ErrorKind::NotFound {
218 let uuid = uuid::Uuid::new_v4().to_string();
219 let _ = std::fs::write(path, uuid.as_bytes());
220 Some(uuid)
221 } else {
222 None
223 }
224 }
225 }
226}
227
228pub struct GreptimeDBTelemetry {
237 statistics: Box<dyn Collector + Send + Sync>,
238 client: Option<Client>,
239 working_home: Option<String>,
240 telemetry_url: &'static str,
241 should_report: Arc<AtomicBool>,
242 report_times: usize,
243}
244
245#[async_trait::async_trait]
246impl TaskFunction<Error> for GreptimeDBTelemetry {
247 fn name(&self) -> &str {
248 "Greptimedb-telemetry-task"
249 }
250
251 async fn call(&mut self) -> Result<()> {
252 if self.should_report.load(Ordering::Relaxed) {
253 self.report_telemetry_info().await;
254 }
255 Ok(())
256 }
257}
258
259impl GreptimeDBTelemetry {
260 pub fn new(
261 working_home: Option<String>,
262 statistics: Box<dyn Collector + Send + Sync>,
263 should_report: Arc<AtomicBool>,
264 ) -> Self {
265 let client = Client::builder()
266 .connect_timeout(GREPTIMEDB_TELEMETRY_CLIENT_CONNECT_TIMEOUT)
267 .timeout(GREPTIMEDB_TELEMETRY_CLIENT_REQUEST_TIMEOUT)
268 .build();
269 Self {
270 working_home,
271 statistics,
272 client: client.ok(),
273 telemetry_url: TELEMETRY_URL,
274 should_report,
275 report_times: 0,
276 }
277 }
278
279 pub async fn report_telemetry_info(&mut self) -> Option<Response> {
280 match self.statistics.get_uuid(&self.working_home) {
281 Some(uuid) => {
282 let data = StatisticData {
283 os: self.statistics.get_os(),
284 version: self.statistics.get_version(),
285 git_commit: self.statistics.get_git_hash(),
286 arch: self.statistics.get_arch(),
287 mode: self.statistics.get_mode(),
288 nodes: self.statistics.get_nodes().await,
289 uuid,
290 uptime: format_uptime(),
291 };
292
293 if let Some(client) = self.client.as_ref() {
294 if self.report_times == 0 {
295 info!("reporting greptimedb version: {:?}", data);
296 }
297 let result = client.post(self.telemetry_url).json(&data).send().await;
298 self.report_times += 1;
299 debug!("report version result: {:?}", result);
300 result.ok()
301 } else {
302 None
303 }
304 }
305 None => None,
306 }
307 }
308}
309
310#[cfg(test)]
311mod tests {
312 use std::convert::Infallible;
313 use std::env;
314 use std::sync::Arc;
315 use std::sync::atomic::{AtomicBool, AtomicUsize};
316 use std::time::Duration;
317
318 use common_test_util::ports;
319 use common_version::build_info;
320 use hyper::Server;
321 use hyper::service::{make_service_fn, service_fn};
322 use reqwest::{Client, Response};
323 use tokio::spawn;
324
325 use crate::{
326 Collector, GreptimeDBTelemetry, Mode, StatisticData, default_get_uuid, format_uptime,
327 };
328
329 static COUNT: AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
330
331 async fn echo(req: hyper::Request<hyper::Body>) -> hyper::Result<hyper::Response<hyper::Body>> {
332 let path = req.uri().path();
333 if path == "/req-cnt" {
334 let body = hyper::Body::from(format!(
335 "{}",
336 COUNT.load(std::sync::atomic::Ordering::SeqCst)
337 ));
338 Ok(hyper::Response::new(body))
339 } else {
340 COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
341 Ok(hyper::Response::new(req.into_body()))
342 }
343 }
344
345 #[tokio::test]
346 async fn test_gretimedb_telemetry() {
347 let (tx, rx) = tokio::sync::oneshot::channel::<()>();
348 let port: u16 = ports::get_port() as u16;
349 spawn(async move {
350 let make_svc = make_service_fn(|_conn| {
351 async { Ok::<_, Infallible>(service_fn(echo)) }
355 });
356 let addr = ([127, 0, 0, 1], port).into();
357
358 let server = Server::try_bind(&addr).unwrap().serve(make_svc);
359 let graceful = server.with_graceful_shutdown(async {
360 rx.await.ok();
361 });
362 let _ = graceful.await;
363 Ok::<_, Infallible>(())
364 });
365 struct TestStatistic;
366
367 struct FailedStatistic;
368
369 #[async_trait::async_trait]
370 impl Collector for TestStatistic {
371 fn get_mode(&self) -> Mode {
372 Mode::Standalone
373 }
374
375 async fn get_nodes(&self) -> Option<i32> {
376 Some(1)
377 }
378
379 fn get_retry(&self) -> i32 {
380 unimplemented!()
381 }
382
383 fn inc_retry(&mut self) {
384 unimplemented!()
385 }
386
387 fn set_uuid_cache(&mut self, _: String) {
388 unimplemented!()
389 }
390
391 fn get_uuid_cache(&self) -> Option<String> {
392 unimplemented!()
393 }
394
395 fn get_uuid(&mut self, _working_home: &Option<String>) -> Option<String> {
396 Some("test".to_string())
397 }
398 }
399
400 #[async_trait::async_trait]
401 impl Collector for FailedStatistic {
402 fn get_mode(&self) -> Mode {
403 Mode::Standalone
404 }
405
406 async fn get_nodes(&self) -> Option<i32> {
407 None
408 }
409
410 fn get_retry(&self) -> i32 {
411 unimplemented!()
412 }
413
414 fn inc_retry(&mut self) {
415 unimplemented!()
416 }
417
418 fn set_uuid_cache(&mut self, _: String) {
419 unimplemented!()
420 }
421
422 fn get_uuid_cache(&self) -> Option<String> {
423 unimplemented!()
424 }
425
426 fn get_uuid(&mut self, _working_home: &Option<String>) -> Option<String> {
427 None
428 }
429 }
430
431 async fn get_telemetry_report(
432 mut report: GreptimeDBTelemetry,
433 url: &'static str,
434 ) -> Option<Response> {
435 report.telemetry_url = url;
436 report.report_telemetry_info().await
437 }
438
439 fn contravariance<'a>(x: &'a str) -> &'static str
440 where
441 'static: 'a,
442 {
443 unsafe { std::mem::transmute(x) }
444 }
445
446 let working_home_temp = tempfile::Builder::new()
447 .prefix("greptimedb_telemetry")
448 .tempdir()
449 .unwrap();
450 let working_home = working_home_temp.path().to_str().unwrap().to_string();
451
452 let test_statistic = Box::new(TestStatistic);
453 let test_report = GreptimeDBTelemetry::new(
454 Some(working_home.clone()),
455 test_statistic,
456 Arc::new(AtomicBool::new(true)),
457 );
458 let url = format!("http://localhost:{}", port);
459 let response = {
460 let url = contravariance(url.as_str());
461 get_telemetry_report(test_report, url).await.unwrap()
462 };
463
464 let body = response.json::<StatisticData>().await.unwrap();
465 assert_eq!(env::consts::ARCH, body.arch);
466 assert_eq!(env::consts::OS, body.os);
467 assert_eq!(build_info().version, body.version);
468 assert_eq!(build_info().commit, body.git_commit);
469 assert_eq!(Mode::Standalone, body.mode);
470 assert_eq!(1, body.nodes.unwrap());
471 assert!(!body.uptime.is_empty());
472
473 let failed_statistic = Box::new(FailedStatistic);
474 let failed_report = GreptimeDBTelemetry::new(
475 Some(working_home),
476 failed_statistic,
477 Arc::new(AtomicBool::new(true)),
478 );
479 let response = {
480 let url = contravariance(url.as_str());
481 get_telemetry_report(failed_report, url).await
482 };
483 assert!(response.is_none());
484
485 let client = Client::builder()
486 .connect_timeout(Duration::from_secs(3))
487 .timeout(Duration::from_secs(3))
488 .build()
489 .unwrap();
490
491 let cnt_url = format!("{}/req-cnt", url);
492 let response = client.get(cnt_url).send().await.unwrap();
493 let body = response.text().await.unwrap();
494 assert_eq!("1", body);
495 tx.send(()).unwrap();
496 }
497
498 #[test]
499 fn test_get_uuid() {
500 let working_home_temp = tempfile::Builder::new()
501 .prefix("greptimedb_telemetry")
502 .tempdir()
503 .unwrap();
504 let working_home = working_home_temp.path().to_str().unwrap().to_string();
505
506 let uuid = default_get_uuid(&Some(working_home.clone()));
507 assert!(uuid.is_some());
508 assert_eq!(uuid, default_get_uuid(&Some(working_home.clone())));
509 assert_eq!(uuid, default_get_uuid(&Some(working_home)));
510 }
511
512 #[test]
513 fn test_format_uptime() {
514 let uptime = format_uptime();
515 assert!(!uptime.is_empty());
516 assert!(
518 uptime == "hours"
519 || uptime == "days"
520 || uptime == "weeks"
521 || uptime == "months"
522 || uptime == "years"
523 );
524 }
525}