1use std::collections::HashMap;
17use std::env;
18use std::io::IsTerminal;
19use std::sync::{Arc, Mutex, Once};
20use std::time::Duration;
21
22use common_base::serde::empty_string_as_default;
23use once_cell::sync::{Lazy, OnceCell};
24use opentelemetry::trace::TracerProvider;
25use opentelemetry::{global, KeyValue};
26use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig, WithHttpConfig};
27use opentelemetry_sdk::propagation::TraceContextPropagator;
28use opentelemetry_sdk::trace::Sampler;
29use opentelemetry_semantic_conventions::resource;
30use serde::{Deserialize, Serialize};
31use tracing_appender::non_blocking::WorkerGuard;
32use tracing_appender::rolling::{RollingFileAppender, Rotation};
33use tracing_log::LogTracer;
34use tracing_subscriber::filter::{FilterFn, Targets};
35use tracing_subscriber::fmt::Layer;
36use tracing_subscriber::layer::SubscriberExt;
37use tracing_subscriber::prelude::*;
38use tracing_subscriber::{filter, EnvFilter, Registry};
39
40use crate::tracing_sampler::{create_sampler, TracingSampleOptions};
41
42pub const DEFAULT_OTLP_GRPC_ENDPOINT: &str = "http://localhost:4317";
44
45pub const DEFAULT_OTLP_HTTP_ENDPOINT: &str = "http://localhost:4318/v1/traces";
47
48pub const DEFAULT_LOGGING_DIR: &str = "logs";
50
51pub static RELOAD_HANDLE: OnceCell<tracing_subscriber::reload::Handle<Targets, Registry>> =
53 OnceCell::new();
54
55#[derive(Clone, Debug, Serialize, Deserialize)]
57#[serde(default)]
58pub struct LoggingOptions {
59 pub dir: String,
61
62 pub level: Option<String>,
64
65 #[serde(default, deserialize_with = "empty_string_as_default")]
67 pub log_format: LogFormat,
68
69 pub max_log_files: usize,
71
72 pub append_stdout: bool,
74
75 pub enable_otlp_tracing: bool,
77
78 pub otlp_endpoint: Option<String>,
80
81 pub tracing_sample_ratio: Option<TracingSampleOptions>,
83
84 pub otlp_export_protocol: Option<OtlpExportProtocol>,
86
87 #[serde(skip_serializing_if = "HashMap::is_empty")]
89 pub otlp_headers: HashMap<String, String>,
90}
91
92#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
94#[serde(rename_all = "snake_case")]
95pub enum OtlpExportProtocol {
96 Grpc,
98
99 Http,
101}
102
103#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
105#[serde(default)]
106pub struct SlowQueryOptions {
107 pub enable: bool,
109
110 #[serde(deserialize_with = "empty_string_as_default")]
112 pub record_type: SlowQueriesRecordType,
113
114 #[serde(with = "humantime_serde")]
116 pub threshold: Duration,
117
118 pub sample_ratio: f64,
120
121 #[serde(with = "humantime_serde")]
124 pub ttl: Duration,
125}
126
127impl Default for SlowQueryOptions {
128 fn default() -> Self {
129 Self {
130 enable: true,
131 record_type: SlowQueriesRecordType::SystemTable,
132 threshold: Duration::from_secs(30),
133 sample_ratio: 1.0,
134 ttl: Duration::from_days(90),
135 }
136 }
137}
138
139#[derive(Clone, Debug, Serialize, Deserialize, Copy, PartialEq, Default)]
140#[serde(rename_all = "snake_case")]
141pub enum SlowQueriesRecordType {
142 #[default]
144 SystemTable,
145 Log,
147}
148
149#[derive(Clone, Debug, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
150#[serde(rename_all = "snake_case")]
151pub enum LogFormat {
152 Json,
153 #[default]
154 Text,
155}
156
157impl PartialEq for LoggingOptions {
158 fn eq(&self, other: &Self) -> bool {
159 self.dir == other.dir
160 && self.level == other.level
161 && self.enable_otlp_tracing == other.enable_otlp_tracing
162 && self.otlp_endpoint == other.otlp_endpoint
163 && self.tracing_sample_ratio == other.tracing_sample_ratio
164 && self.append_stdout == other.append_stdout
165 }
166}
167
168impl Eq for LoggingOptions {}
169
170impl Default for LoggingOptions {
171 fn default() -> Self {
172 Self {
173 dir: "".to_string(),
175 level: None,
176 log_format: LogFormat::Text,
177 enable_otlp_tracing: false,
178 otlp_endpoint: None,
179 tracing_sample_ratio: None,
180 append_stdout: true,
181 max_log_files: 720,
183 otlp_export_protocol: None,
184 otlp_headers: HashMap::new(),
185 }
186 }
187}
188
189#[derive(Default, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
190pub struct TracingOptions {
191 #[cfg(feature = "tokio-console")]
192 pub tokio_console_addr: Option<String>,
193}
194
195pub fn init_default_ut_logging() {
198 static START: Once = Once::new();
199
200 START.call_once(|| {
201 let mut g = GLOBAL_UT_LOG_GUARD.as_ref().lock().unwrap();
202
203 let dir =
208 env::var("UNITTEST_LOG_DIR").unwrap_or_else(|_| "/tmp/__unittest_logs".to_string());
209
210 let level = env::var("UNITTEST_LOG_LEVEL").unwrap_or_else(|_|
211 "debug,hyper=warn,tower=warn,datafusion=warn,reqwest=warn,sqlparser=warn,h2=info,opendal=info,rskafka=info".to_string()
212 );
213 let opts = LoggingOptions {
214 dir: dir.clone(),
215 level: Some(level),
216 ..Default::default()
217 };
218 *g = Some(init_global_logging(
219 "unittest",
220 &opts,
221 &TracingOptions::default(),
222 None,
223 None,
224 ));
225
226 crate::info!("logs dir = {}", dir);
227 });
228}
229
230static GLOBAL_UT_LOG_GUARD: Lazy<Arc<Mutex<Option<Vec<WorkerGuard>>>>> =
231 Lazy::new(|| Arc::new(Mutex::new(None)));
232
233const DEFAULT_LOG_TARGETS: &str = "info";
234
235#[allow(clippy::print_stdout)]
236pub fn init_global_logging(
237 app_name: &str,
238 opts: &LoggingOptions,
239 tracing_opts: &TracingOptions,
240 node_id: Option<String>,
241 slow_query_opts: Option<&SlowQueryOptions>,
242) -> Vec<WorkerGuard> {
243 static START: Once = Once::new();
244 let mut guards = vec![];
245
246 START.call_once(|| {
247 LogTracer::init().expect("log tracer must be valid");
249
250 let stdout_logging_layer = if opts.append_stdout {
252 let (writer, guard) = tracing_appender::non_blocking(std::io::stdout());
253 guards.push(guard);
254
255 if opts.log_format == LogFormat::Json {
256 Some(
257 Layer::new()
258 .json()
259 .with_writer(writer)
260 .with_ansi(std::io::stdout().is_terminal())
261 .boxed(),
262 )
263 } else {
264 Some(
265 Layer::new()
266 .with_writer(writer)
267 .with_ansi(std::io::stdout().is_terminal())
268 .boxed(),
269 )
270 }
271 } else {
272 None
273 };
274
275 let file_logging_layer = if !opts.dir.is_empty() {
277 let rolling_appender = RollingFileAppender::builder()
278 .rotation(Rotation::HOURLY)
279 .filename_prefix("greptimedb")
280 .max_log_files(opts.max_log_files)
281 .build(&opts.dir)
282 .unwrap_or_else(|e| {
283 panic!(
284 "initializing rolling file appender at {} failed: {}",
285 &opts.dir, e
286 )
287 });
288 let (writer, guard) = tracing_appender::non_blocking(rolling_appender);
289 guards.push(guard);
290
291 if opts.log_format == LogFormat::Json {
292 Some(
293 Layer::new()
294 .json()
295 .with_writer(writer)
296 .with_ansi(false)
297 .boxed(),
298 )
299 } else {
300 Some(Layer::new().with_writer(writer).with_ansi(false).boxed())
301 }
302 } else {
303 None
304 };
305
306 let err_file_logging_layer = if !opts.dir.is_empty() {
308 let rolling_appender = RollingFileAppender::builder()
309 .rotation(Rotation::HOURLY)
310 .filename_prefix("greptimedb-err")
311 .max_log_files(opts.max_log_files)
312 .build(&opts.dir)
313 .unwrap_or_else(|e| {
314 panic!(
315 "initializing rolling file appender at {} failed: {}",
316 &opts.dir, e
317 )
318 });
319 let (writer, guard) = tracing_appender::non_blocking(rolling_appender);
320 guards.push(guard);
321
322 if opts.log_format == LogFormat::Json {
323 Some(
324 Layer::new()
325 .json()
326 .with_writer(writer)
327 .with_ansi(false)
328 .with_filter(filter::LevelFilter::ERROR)
329 .boxed(),
330 )
331 } else {
332 Some(
333 Layer::new()
334 .with_writer(writer)
335 .with_ansi(false)
336 .with_filter(filter::LevelFilter::ERROR)
337 .boxed(),
338 )
339 }
340 } else {
341 None
342 };
343
344 let slow_query_logging_layer = build_slow_query_logger(opts, slow_query_opts, &mut guards);
345
346 let filter = opts
351 .level
352 .as_deref()
353 .or(env::var(EnvFilter::DEFAULT_ENV).ok().as_deref())
354 .unwrap_or(DEFAULT_LOG_TARGETS)
355 .parse::<filter::Targets>()
356 .expect("error parsing log level string");
357
358 let (dyn_filter, reload_handle) = tracing_subscriber::reload::Layer::new(filter.clone());
359
360 RELOAD_HANDLE
361 .set(reload_handle)
362 .expect("reload handle already set, maybe init_global_logging get called twice?");
363
364 #[cfg(feature = "tokio-console")]
367 let subscriber = {
368 let tokio_console_layer =
369 if let Some(tokio_console_addr) = &tracing_opts.tokio_console_addr {
370 let addr: std::net::SocketAddr = tokio_console_addr.parse().unwrap_or_else(|e| {
371 panic!("Invalid binding address '{tokio_console_addr}' for tokio-console: {e}");
372 });
373 println!("tokio-console listening on {addr}");
374
375 Some(
376 console_subscriber::ConsoleLayer::builder()
377 .server_addr(addr)
378 .spawn(),
379 )
380 } else {
381 None
382 };
383
384 Registry::default()
385 .with(dyn_filter)
386 .with(tokio_console_layer)
387 .with(stdout_logging_layer)
388 .with(file_logging_layer)
389 .with(err_file_logging_layer)
390 .with(slow_query_logging_layer)
391 };
392
393 let _ = tracing_opts;
395
396 #[cfg(not(feature = "tokio-console"))]
397 let subscriber = Registry::default()
398 .with(dyn_filter)
399 .with(stdout_logging_layer)
400 .with(file_logging_layer)
401 .with(err_file_logging_layer)
402 .with(slow_query_logging_layer);
403
404 if opts.enable_otlp_tracing {
405 global::set_text_map_propagator(TraceContextPropagator::new());
406
407 let sampler = opts
408 .tracing_sample_ratio
409 .as_ref()
410 .map(create_sampler)
411 .map(Sampler::ParentBased)
412 .unwrap_or(Sampler::ParentBased(Box::new(Sampler::AlwaysOn)));
413
414 let provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
415 .with_batch_exporter(build_otlp_exporter(opts))
416 .with_sampler(sampler)
417 .with_resource(
418 opentelemetry_sdk::Resource::builder_empty()
419 .with_attributes([
420 KeyValue::new(resource::SERVICE_NAME, app_name.to_string()),
421 KeyValue::new(
422 resource::SERVICE_INSTANCE_ID,
423 node_id.unwrap_or("none".to_string()),
424 ),
425 KeyValue::new(resource::SERVICE_VERSION, common_version::version()),
426 KeyValue::new(resource::PROCESS_PID, std::process::id().to_string()),
427 ])
428 .build(),
429 )
430 .build();
431 let tracer = provider.tracer("greptimedb");
432
433 tracing::subscriber::set_global_default(
434 subscriber.with(tracing_opentelemetry::layer().with_tracer(tracer)),
435 )
436 .expect("error setting global tracing subscriber");
437 } else {
438 tracing::subscriber::set_global_default(subscriber)
439 .expect("error setting global tracing subscriber");
440 }
441 });
442
443 guards
444}
445
446fn build_otlp_exporter(opts: &LoggingOptions) -> SpanExporter {
447 let protocol = opts
448 .otlp_export_protocol
449 .clone()
450 .unwrap_or(OtlpExportProtocol::Http);
451
452 let endpoint = opts
453 .otlp_endpoint
454 .as_ref()
455 .map(|e| {
456 if e.starts_with("http") {
457 e.to_string()
458 } else {
459 format!("http://{}", e)
460 }
461 })
462 .unwrap_or_else(|| match protocol {
463 OtlpExportProtocol::Grpc => DEFAULT_OTLP_GRPC_ENDPOINT.to_string(),
464 OtlpExportProtocol::Http => DEFAULT_OTLP_HTTP_ENDPOINT.to_string(),
465 });
466
467 match protocol {
468 OtlpExportProtocol::Grpc => SpanExporter::builder()
469 .with_tonic()
470 .with_endpoint(endpoint)
471 .build()
472 .expect("Failed to create OTLP gRPC exporter "),
473
474 OtlpExportProtocol::Http => SpanExporter::builder()
475 .with_http()
476 .with_endpoint(endpoint)
477 .with_protocol(Protocol::HttpBinary)
478 .with_headers(opts.otlp_headers.clone())
479 .build()
480 .expect("Failed to create OTLP HTTP exporter "),
481 }
482}
483
484fn build_slow_query_logger<S>(
485 opts: &LoggingOptions,
486 slow_query_opts: Option<&SlowQueryOptions>,
487 guards: &mut Vec<WorkerGuard>,
488) -> Option<Box<dyn tracing_subscriber::Layer<S> + Send + Sync + 'static>>
489where
490 S: tracing::Subscriber
491 + Send
492 + 'static
493 + for<'span> tracing_subscriber::registry::LookupSpan<'span>,
494{
495 if let Some(slow_query_opts) = slow_query_opts {
496 if !opts.dir.is_empty()
497 && slow_query_opts.enable
498 && slow_query_opts.record_type == SlowQueriesRecordType::Log
499 {
500 let rolling_appender = RollingFileAppender::builder()
501 .rotation(Rotation::HOURLY)
502 .filename_prefix("greptimedb-slow-queries")
503 .max_log_files(opts.max_log_files)
504 .build(&opts.dir)
505 .unwrap_or_else(|e| {
506 panic!(
507 "initializing rolling file appender at {} failed: {}",
508 &opts.dir, e
509 )
510 });
511 let (writer, guard) = tracing_appender::non_blocking(rolling_appender);
512 guards.push(guard);
513
514 let slow_query_filter = FilterFn::new(|metadata| {
516 metadata
517 .fields()
518 .iter()
519 .any(|field| field.name().contains("slow"))
520 });
521
522 if opts.log_format == LogFormat::Json {
523 Some(
524 Layer::new()
525 .json()
526 .with_writer(writer)
527 .with_ansi(false)
528 .with_filter(slow_query_filter)
529 .boxed(),
530 )
531 } else {
532 Some(
533 Layer::new()
534 .with_writer(writer)
535 .with_ansi(false)
536 .with_filter(slow_query_filter)
537 .boxed(),
538 )
539 }
540 } else {
541 None
542 }
543 } else {
544 None
545 }
546}
547
548#[cfg(test)]
549mod tests {
550 use super::*;
551
552 #[test]
553 fn test_logging_options_deserialization_default() {
554 let json = r#"{}"#;
555 let opts: LoggingOptions = serde_json::from_str(json).unwrap();
556
557 assert_eq!(opts.log_format, LogFormat::Text);
558 assert_eq!(opts.dir, "");
559 assert_eq!(opts.level, None);
560 assert!(opts.append_stdout);
561 }
562
563 #[test]
564 fn test_logging_options_deserialization_empty_log_format() {
565 let json = r#"{"log_format": ""}"#;
566 let opts: LoggingOptions = serde_json::from_str(json).unwrap();
567
568 assert_eq!(opts.log_format, LogFormat::Text);
570 }
571
572 #[test]
573 fn test_logging_options_deserialization_valid_log_format() {
574 let json_format = r#"{"log_format": "json"}"#;
575 let opts: LoggingOptions = serde_json::from_str(json_format).unwrap();
576 assert_eq!(opts.log_format, LogFormat::Json);
577
578 let text_format = r#"{"log_format": "text"}"#;
579 let opts: LoggingOptions = serde_json::from_str(text_format).unwrap();
580 assert_eq!(opts.log_format, LogFormat::Text);
581 }
582
583 #[test]
584 fn test_logging_options_deserialization_missing_log_format() {
585 let json = r#"{"dir": "/tmp/logs"}"#;
586 let opts: LoggingOptions = serde_json::from_str(json).unwrap();
587
588 assert_eq!(opts.log_format, LogFormat::Text);
590 assert_eq!(opts.dir, "/tmp/logs");
591 }
592
593 #[test]
594 fn test_slow_query_options_deserialization_default() {
595 let json = r#"{"enable": true, "threshold": "30s"}"#;
596 let opts: SlowQueryOptions = serde_json::from_str(json).unwrap();
597
598 assert_eq!(opts.record_type, SlowQueriesRecordType::SystemTable);
599 assert!(opts.enable);
600 }
601
602 #[test]
603 fn test_slow_query_options_deserialization_empty_record_type() {
604 let json = r#"{"enable": true, "record_type": "", "threshold": "30s"}"#;
605 let opts: SlowQueryOptions = serde_json::from_str(json).unwrap();
606
607 assert_eq!(opts.record_type, SlowQueriesRecordType::SystemTable);
609 assert!(opts.enable);
610 }
611
612 #[test]
613 fn test_slow_query_options_deserialization_valid_record_type() {
614 let system_table_json =
615 r#"{"enable": true, "record_type": "system_table", "threshold": "30s"}"#;
616 let opts: SlowQueryOptions = serde_json::from_str(system_table_json).unwrap();
617 assert_eq!(opts.record_type, SlowQueriesRecordType::SystemTable);
618
619 let log_json = r#"{"enable": true, "record_type": "log", "threshold": "30s"}"#;
620 let opts: SlowQueryOptions = serde_json::from_str(log_json).unwrap();
621 assert_eq!(opts.record_type, SlowQueriesRecordType::Log);
622 }
623
624 #[test]
625 fn test_slow_query_options_deserialization_missing_record_type() {
626 let json = r#"{"enable": false, "threshold": "30s"}"#;
627 let opts: SlowQueryOptions = serde_json::from_str(json).unwrap();
628
629 assert_eq!(opts.record_type, SlowQueriesRecordType::SystemTable);
631 assert!(!opts.enable);
632 }
633
634 #[test]
635 fn test_otlp_export_protocol_deserialization_valid_values() {
636 let grpc_json = r#""grpc""#;
637 let protocol: OtlpExportProtocol = serde_json::from_str(grpc_json).unwrap();
638 assert_eq!(protocol, OtlpExportProtocol::Grpc);
639
640 let http_json = r#""http""#;
641 let protocol: OtlpExportProtocol = serde_json::from_str(http_json).unwrap();
642 assert_eq!(protocol, OtlpExportProtocol::Http);
643 }
644}