common_greptimedb_telemetry/
lib.rs1use std::env;
16use std::io::ErrorKind;
17use std::path::{Path, PathBuf};
18use std::sync::atomic::{AtomicBool, Ordering};
19use std::sync::Arc;
20use std::time::Duration;
21
22use common_runtime::error::{Error, Result};
23use common_runtime::{BoxedTaskFunction, RepeatedTask, TaskFunction};
24use common_telemetry::{debug, info};
25use common_version::build_info;
26use reqwest::{Client, Response};
27use serde::{Deserialize, Serialize};
28
29pub const TELEMETRY_URL: &str = "https://telemetry.greptimestats.com/db/otel/statistics";
31const UUID_FILE_NAME: &str = ".greptimedb-telemetry-uuid";
33
34pub static TELEMETRY_INTERVAL: Duration = Duration::from_secs(60 * 30);
36const GREPTIMEDB_TELEMETRY_CLIENT_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
38const GREPTIMEDB_TELEMETRY_CLIENT_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
40
41pub enum GreptimeDBTelemetryTask {
42 Enable((RepeatedTask<Error>, Arc<AtomicBool>)),
43 Disable,
44}
45
46impl GreptimeDBTelemetryTask {
47 pub fn should_report(&self, value: bool) {
48 match self {
49 GreptimeDBTelemetryTask::Enable((_, should_report)) => {
50 should_report.store(value, Ordering::Relaxed);
51 }
52 GreptimeDBTelemetryTask::Disable => {}
53 }
54 }
55
56 pub fn enable(
57 interval: Duration,
58 task_fn: BoxedTaskFunction<Error>,
59 should_report: Arc<AtomicBool>,
60 ) -> Self {
61 GreptimeDBTelemetryTask::Enable((
62 RepeatedTask::new(interval, task_fn).with_initial_delay(Some(Duration::ZERO)),
63 should_report,
64 ))
65 }
66
67 pub fn disable() -> Self {
68 GreptimeDBTelemetryTask::Disable
69 }
70
71 pub fn start(&self) -> Result<()> {
72 match self {
73 GreptimeDBTelemetryTask::Enable((task, _)) => {
74 print_anonymous_usage_data_disclaimer();
75 task.start(common_runtime::global_runtime())
76 }
77 GreptimeDBTelemetryTask::Disable => Ok(()),
78 }
79 }
80
81 pub async fn stop(&self) -> Result<()> {
82 match self {
83 GreptimeDBTelemetryTask::Enable((task, _)) => task.stop().await,
84 GreptimeDBTelemetryTask::Disable => Ok(()),
85 }
86 }
87}
88
89#[derive(Serialize, Deserialize, Debug)]
91struct StatisticData {
92 pub os: String,
94 pub version: String,
96 pub arch: String,
98 pub mode: Mode,
100 pub git_commit: String,
102 pub nodes: Option<i32>,
104 pub uuid: String,
106}
107
108#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
109#[serde(rename_all = "lowercase")]
110pub enum Mode {
111 Distributed,
112 Standalone,
113}
114
115#[async_trait::async_trait]
116pub trait Collector {
117 fn get_version(&self) -> String {
118 build_info().version.to_string()
119 }
120
121 fn get_git_hash(&self) -> String {
122 build_info().commit.to_string()
123 }
124
125 fn get_os(&self) -> String {
126 env::consts::OS.to_string()
127 }
128
129 fn get_arch(&self) -> String {
130 env::consts::ARCH.to_string()
131 }
132
133 fn get_mode(&self) -> Mode;
134
135 fn get_retry(&self) -> i32;
136
137 fn inc_retry(&mut self);
138
139 fn set_uuid_cache(&mut self, uuid: String);
140
141 fn get_uuid_cache(&self) -> Option<String>;
142
143 async fn get_nodes(&self) -> Option<i32>;
144
145 fn get_uuid(&mut self, working_home: &Option<String>) -> Option<String> {
146 match self.get_uuid_cache() {
147 Some(uuid) => Some(uuid),
148 None => {
149 if self.get_retry() > 3 {
150 return None;
151 }
152 match default_get_uuid(working_home) {
153 Some(uuid) => {
154 self.set_uuid_cache(uuid.clone());
155 Some(uuid)
156 }
157 None => {
158 self.inc_retry();
159 None
160 }
161 }
162 }
163 }
164 }
165}
166
167fn print_anonymous_usage_data_disclaimer() {
168 info!("Attention: GreptimeDB now collects anonymous usage data to help improve its roadmap and prioritize features.");
169 info!(
170 "To learn more about this anonymous program and how to deactivate it if you don't want to participate, please visit the following URL: ");
171 info!("https://docs.greptime.com/reference/telemetry");
172}
173
174pub fn default_get_uuid(working_home: &Option<String>) -> Option<String> {
175 let temp_dir = env::temp_dir();
176
177 let mut path = PathBuf::new();
178 path.push(
179 working_home
180 .as_ref()
181 .map(Path::new)
182 .unwrap_or_else(|| temp_dir.as_path()),
183 );
184 path.push(UUID_FILE_NAME);
185
186 let path = path.as_path();
187 match std::fs::read(path) {
188 Ok(bytes) => Some(String::from_utf8_lossy(&bytes).to_string()),
189 Err(e) => {
190 if e.kind() == ErrorKind::NotFound {
191 let uuid = uuid::Uuid::new_v4().to_string();
192 let _ = std::fs::write(path, uuid.as_bytes());
193 Some(uuid)
194 } else {
195 None
196 }
197 }
198 }
199}
200
201pub struct GreptimeDBTelemetry {
210 statistics: Box<dyn Collector + Send + Sync>,
211 client: Option<Client>,
212 working_home: Option<String>,
213 telemetry_url: &'static str,
214 should_report: Arc<AtomicBool>,
215 report_times: usize,
216}
217
218#[async_trait::async_trait]
219impl TaskFunction<Error> for GreptimeDBTelemetry {
220 fn name(&self) -> &str {
221 "Greptimedb-telemetry-task"
222 }
223
224 async fn call(&mut self) -> Result<()> {
225 if self.should_report.load(Ordering::Relaxed) {
226 self.report_telemetry_info().await;
227 }
228 Ok(())
229 }
230}
231
232impl GreptimeDBTelemetry {
233 pub fn new(
234 working_home: Option<String>,
235 statistics: Box<dyn Collector + Send + Sync>,
236 should_report: Arc<AtomicBool>,
237 ) -> Self {
238 let client = Client::builder()
239 .connect_timeout(GREPTIMEDB_TELEMETRY_CLIENT_CONNECT_TIMEOUT)
240 .timeout(GREPTIMEDB_TELEMETRY_CLIENT_REQUEST_TIMEOUT)
241 .build();
242 Self {
243 working_home,
244 statistics,
245 client: client.ok(),
246 telemetry_url: TELEMETRY_URL,
247 should_report,
248 report_times: 0,
249 }
250 }
251
252 pub async fn report_telemetry_info(&mut self) -> Option<Response> {
253 match self.statistics.get_uuid(&self.working_home) {
254 Some(uuid) => {
255 let data = StatisticData {
256 os: self.statistics.get_os(),
257 version: self.statistics.get_version(),
258 git_commit: self.statistics.get_git_hash(),
259 arch: self.statistics.get_arch(),
260 mode: self.statistics.get_mode(),
261 nodes: self.statistics.get_nodes().await,
262 uuid,
263 };
264
265 if let Some(client) = self.client.as_ref() {
266 if self.report_times == 0 {
267 info!("reporting greptimedb version: {:?}", data);
268 }
269 let result = client.post(self.telemetry_url).json(&data).send().await;
270 self.report_times += 1;
271 debug!("report version result: {:?}", result);
272 result.ok()
273 } else {
274 None
275 }
276 }
277 None => None,
278 }
279 }
280}
281
282#[cfg(test)]
283mod tests {
284 use std::convert::Infallible;
285 use std::env;
286 use std::sync::atomic::{AtomicBool, AtomicUsize};
287 use std::sync::Arc;
288 use std::time::Duration;
289
290 use common_test_util::ports;
291 use common_version::build_info;
292 use hyper::service::{make_service_fn, service_fn};
293 use hyper::Server;
294 use reqwest::{Client, Response};
295 use tokio::spawn;
296
297 use crate::{default_get_uuid, Collector, GreptimeDBTelemetry, Mode, StatisticData};
298
299 static COUNT: AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
300
301 async fn echo(req: hyper::Request<hyper::Body>) -> hyper::Result<hyper::Response<hyper::Body>> {
302 let path = req.uri().path();
303 if path == "/req-cnt" {
304 let body = hyper::Body::from(format!(
305 "{}",
306 COUNT.load(std::sync::atomic::Ordering::SeqCst)
307 ));
308 Ok(hyper::Response::new(body))
309 } else {
310 COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
311 Ok(hyper::Response::new(req.into_body()))
312 }
313 }
314
315 #[tokio::test]
316 async fn test_gretimedb_telemetry() {
317 let (tx, rx) = tokio::sync::oneshot::channel::<()>();
318 let port: u16 = ports::get_port() as u16;
319 spawn(async move {
320 let make_svc = make_service_fn(|_conn| {
321 async { Ok::<_, Infallible>(service_fn(echo)) }
325 });
326 let addr = ([127, 0, 0, 1], port).into();
327
328 let server = Server::try_bind(&addr).unwrap().serve(make_svc);
329 let graceful = server.with_graceful_shutdown(async {
330 rx.await.ok();
331 });
332 let _ = graceful.await;
333 Ok::<_, Infallible>(())
334 });
335 struct TestStatistic;
336
337 struct FailedStatistic;
338
339 #[async_trait::async_trait]
340 impl Collector for TestStatistic {
341 fn get_mode(&self) -> Mode {
342 Mode::Standalone
343 }
344
345 async fn get_nodes(&self) -> Option<i32> {
346 Some(1)
347 }
348
349 fn get_retry(&self) -> i32 {
350 unimplemented!()
351 }
352
353 fn inc_retry(&mut self) {
354 unimplemented!()
355 }
356
357 fn set_uuid_cache(&mut self, _: String) {
358 unimplemented!()
359 }
360
361 fn get_uuid_cache(&self) -> Option<String> {
362 unimplemented!()
363 }
364
365 fn get_uuid(&mut self, _working_home: &Option<String>) -> Option<String> {
366 Some("test".to_string())
367 }
368 }
369
370 #[async_trait::async_trait]
371 impl Collector for FailedStatistic {
372 fn get_mode(&self) -> Mode {
373 Mode::Standalone
374 }
375
376 async fn get_nodes(&self) -> Option<i32> {
377 None
378 }
379
380 fn get_retry(&self) -> i32 {
381 unimplemented!()
382 }
383
384 fn inc_retry(&mut self) {
385 unimplemented!()
386 }
387
388 fn set_uuid_cache(&mut self, _: String) {
389 unimplemented!()
390 }
391
392 fn get_uuid_cache(&self) -> Option<String> {
393 unimplemented!()
394 }
395
396 fn get_uuid(&mut self, _working_home: &Option<String>) -> Option<String> {
397 None
398 }
399 }
400
401 async fn get_telemetry_report(
402 mut report: GreptimeDBTelemetry,
403 url: &'static str,
404 ) -> Option<Response> {
405 report.telemetry_url = url;
406 report.report_telemetry_info().await
407 }
408
409 fn contravariance<'a>(x: &'a str) -> &'static str
410 where
411 'static: 'a,
412 {
413 unsafe { std::mem::transmute(x) }
414 }
415
416 let working_home_temp = tempfile::Builder::new()
417 .prefix("greptimedb_telemetry")
418 .tempdir()
419 .unwrap();
420 let working_home = working_home_temp.path().to_str().unwrap().to_string();
421
422 let test_statistic = Box::new(TestStatistic);
423 let test_report = GreptimeDBTelemetry::new(
424 Some(working_home.clone()),
425 test_statistic,
426 Arc::new(AtomicBool::new(true)),
427 );
428 let url = format!("http://localhost:{}", port);
429 let response = {
430 let url = contravariance(url.as_str());
431 get_telemetry_report(test_report, url).await.unwrap()
432 };
433
434 let body = response.json::<StatisticData>().await.unwrap();
435 assert_eq!(env::consts::ARCH, body.arch);
436 assert_eq!(env::consts::OS, body.os);
437 assert_eq!(build_info().version, body.version);
438 assert_eq!(build_info().commit, body.git_commit);
439 assert_eq!(Mode::Standalone, body.mode);
440 assert_eq!(1, body.nodes.unwrap());
441
442 let failed_statistic = Box::new(FailedStatistic);
443 let failed_report = GreptimeDBTelemetry::new(
444 Some(working_home),
445 failed_statistic,
446 Arc::new(AtomicBool::new(true)),
447 );
448 let response = {
449 let url = contravariance(url.as_str());
450 get_telemetry_report(failed_report, url).await
451 };
452 assert!(response.is_none());
453
454 let client = Client::builder()
455 .connect_timeout(Duration::from_secs(3))
456 .timeout(Duration::from_secs(3))
457 .build()
458 .unwrap();
459
460 let cnt_url = format!("{}/req-cnt", url);
461 let response = client.get(cnt_url).send().await.unwrap();
462 let body = response.text().await.unwrap();
463 assert_eq!("1", body);
464 tx.send(()).unwrap();
465 }
466
467 #[test]
468 fn test_get_uuid() {
469 let working_home_temp = tempfile::Builder::new()
470 .prefix("greptimedb_telemetry")
471 .tempdir()
472 .unwrap();
473 let working_home = working_home_temp.path().to_str().unwrap().to_string();
474
475 let uuid = default_get_uuid(&Some(working_home.clone()));
476 assert!(uuid.is_some());
477 assert_eq!(uuid, default_get_uuid(&Some(working_home.clone())));
478 assert_eq!(uuid, default_get_uuid(&Some(working_home)));
479 }
480}