1use std::env;
17use std::io::IsTerminal;
18use std::sync::{Arc, Mutex, Once};
19use std::time::Duration;
20
21use once_cell::sync::{Lazy, OnceCell};
22use opentelemetry::{global, KeyValue};
23use opentelemetry_otlp::{Protocol, SpanExporterBuilder, WithExportConfig};
24use opentelemetry_sdk::propagation::TraceContextPropagator;
25use opentelemetry_sdk::trace::Sampler;
26use opentelemetry_semantic_conventions::resource;
27use serde::{Deserialize, Serialize};
28use tracing_appender::non_blocking::WorkerGuard;
29use tracing_appender::rolling::{RollingFileAppender, Rotation};
30use tracing_log::LogTracer;
31use tracing_subscriber::filter::{FilterFn, Targets};
32use tracing_subscriber::fmt::Layer;
33use tracing_subscriber::layer::SubscriberExt;
34use tracing_subscriber::prelude::*;
35use tracing_subscriber::{filter, EnvFilter, Registry};
36
37use crate::tracing_sampler::{create_sampler, TracingSampleOptions};
38
39pub const DEFAULT_OTLP_GRPC_ENDPOINT: &str = "http://localhost:4317";
41
42pub const DEFAULT_OTLP_HTTP_ENDPOINT: &str = "http://localhost:4318";
44
45pub const DEFAULT_LOGGING_DIR: &str = "logs";
47
48pub static RELOAD_HANDLE: OnceCell<tracing_subscriber::reload::Handle<Targets, Registry>> =
50 OnceCell::new();
51
52#[derive(Clone, Debug, Serialize, Deserialize)]
54#[serde(default)]
55pub struct LoggingOptions {
56 pub dir: String,
58
59 pub level: Option<String>,
61
62 pub log_format: LogFormat,
64
65 pub max_log_files: usize,
67
68 pub append_stdout: bool,
70
71 pub enable_otlp_tracing: bool,
73
74 pub otlp_endpoint: Option<String>,
76
77 pub tracing_sample_ratio: Option<TracingSampleOptions>,
79
80 pub otlp_export_protocol: Option<OtlpExportProtocol>,
82}
83
84#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
86#[serde(rename_all = "snake_case")]
87pub enum OtlpExportProtocol {
88 Grpc,
90
91 Http,
93}
94
95#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
97pub struct SlowQueryOptions {
98 pub enable: bool,
100
101 pub record_type: SlowQueriesRecordType,
103
104 #[serde(with = "humantime_serde")]
106 pub threshold: Option<Duration>,
107
108 pub sample_ratio: Option<f64>,
110
111 pub ttl: Option<String>,
114}
115
116impl Default for SlowQueryOptions {
117 fn default() -> Self {
118 Self {
119 enable: true,
120 record_type: SlowQueriesRecordType::SystemTable,
121 threshold: Some(Duration::from_secs(30)),
122 sample_ratio: Some(1.0),
123 ttl: Some("30d".to_string()),
124 }
125 }
126}
127
128#[derive(Clone, Debug, Serialize, Deserialize, Copy, PartialEq)]
129#[serde(rename_all = "snake_case")]
130pub enum SlowQueriesRecordType {
131 SystemTable,
132 Log,
133}
134
135#[derive(Clone, Debug, Copy, PartialEq, Eq, Serialize, Deserialize)]
136#[serde(rename_all = "snake_case")]
137pub enum LogFormat {
138 Json,
139 Text,
140}
141
142impl PartialEq for LoggingOptions {
143 fn eq(&self, other: &Self) -> bool {
144 self.dir == other.dir
145 && self.level == other.level
146 && self.enable_otlp_tracing == other.enable_otlp_tracing
147 && self.otlp_endpoint == other.otlp_endpoint
148 && self.tracing_sample_ratio == other.tracing_sample_ratio
149 && self.append_stdout == other.append_stdout
150 }
151}
152
153impl Eq for LoggingOptions {}
154
155impl Default for LoggingOptions {
156 fn default() -> Self {
157 Self {
158 dir: "".to_string(),
160 level: None,
161 log_format: LogFormat::Text,
162 enable_otlp_tracing: false,
163 otlp_endpoint: None,
164 tracing_sample_ratio: None,
165 append_stdout: true,
166 max_log_files: 720,
168 otlp_export_protocol: None,
169 }
170 }
171}
172
173#[derive(Default, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
174pub struct TracingOptions {
175 #[cfg(feature = "tokio-console")]
176 pub tokio_console_addr: Option<String>,
177}
178
179pub fn init_default_ut_logging() {
182 static START: Once = Once::new();
183
184 START.call_once(|| {
185 let mut g = GLOBAL_UT_LOG_GUARD.as_ref().lock().unwrap();
186
187 let dir =
192 env::var("UNITTEST_LOG_DIR").unwrap_or_else(|_| "/tmp/__unittest_logs".to_string());
193
194 let level = env::var("UNITTEST_LOG_LEVEL").unwrap_or_else(|_|
195 "debug,hyper=warn,tower=warn,datafusion=warn,reqwest=warn,sqlparser=warn,h2=info,opendal=info,rskafka=info".to_string()
196 );
197 let opts = LoggingOptions {
198 dir: dir.clone(),
199 level: Some(level),
200 ..Default::default()
201 };
202 *g = Some(init_global_logging(
203 "unittest",
204 &opts,
205 &TracingOptions::default(),
206 None,
207 None,
208 ));
209
210 crate::info!("logs dir = {}", dir);
211 });
212}
213
214static GLOBAL_UT_LOG_GUARD: Lazy<Arc<Mutex<Option<Vec<WorkerGuard>>>>> =
215 Lazy::new(|| Arc::new(Mutex::new(None)));
216
217const DEFAULT_LOG_TARGETS: &str = "info";
218
219#[allow(clippy::print_stdout)]
220pub fn init_global_logging(
221 app_name: &str,
222 opts: &LoggingOptions,
223 tracing_opts: &TracingOptions,
224 node_id: Option<String>,
225 slow_query_opts: Option<&SlowQueryOptions>,
226) -> Vec<WorkerGuard> {
227 static START: Once = Once::new();
228 let mut guards = vec![];
229
230 START.call_once(|| {
231 LogTracer::init().expect("log tracer must be valid");
233
234 let stdout_logging_layer = if opts.append_stdout {
236 let (writer, guard) = tracing_appender::non_blocking(std::io::stdout());
237 guards.push(guard);
238
239 if opts.log_format == LogFormat::Json {
240 Some(
241 Layer::new()
242 .json()
243 .with_writer(writer)
244 .with_ansi(std::io::stdout().is_terminal())
245 .boxed(),
246 )
247 } else {
248 Some(
249 Layer::new()
250 .with_writer(writer)
251 .with_ansi(std::io::stdout().is_terminal())
252 .boxed(),
253 )
254 }
255 } else {
256 None
257 };
258
259 let file_logging_layer = if !opts.dir.is_empty() {
261 let rolling_appender = RollingFileAppender::builder()
262 .rotation(Rotation::HOURLY)
263 .filename_prefix("greptimedb")
264 .max_log_files(opts.max_log_files)
265 .build(&opts.dir)
266 .unwrap_or_else(|e| {
267 panic!(
268 "initializing rolling file appender at {} failed: {}",
269 &opts.dir, e
270 )
271 });
272 let (writer, guard) = tracing_appender::non_blocking(rolling_appender);
273 guards.push(guard);
274
275 if opts.log_format == LogFormat::Json {
276 Some(
277 Layer::new()
278 .json()
279 .with_writer(writer)
280 .with_ansi(false)
281 .boxed(),
282 )
283 } else {
284 Some(Layer::new().with_writer(writer).with_ansi(false).boxed())
285 }
286 } else {
287 None
288 };
289
290 let err_file_logging_layer = if !opts.dir.is_empty() {
292 let rolling_appender = RollingFileAppender::builder()
293 .rotation(Rotation::HOURLY)
294 .filename_prefix("greptimedb-err")
295 .max_log_files(opts.max_log_files)
296 .build(&opts.dir)
297 .unwrap_or_else(|e| {
298 panic!(
299 "initializing rolling file appender at {} failed: {}",
300 &opts.dir, e
301 )
302 });
303 let (writer, guard) = tracing_appender::non_blocking(rolling_appender);
304 guards.push(guard);
305
306 if opts.log_format == LogFormat::Json {
307 Some(
308 Layer::new()
309 .json()
310 .with_writer(writer)
311 .with_ansi(false)
312 .with_filter(filter::LevelFilter::ERROR)
313 .boxed(),
314 )
315 } else {
316 Some(
317 Layer::new()
318 .with_writer(writer)
319 .with_ansi(false)
320 .with_filter(filter::LevelFilter::ERROR)
321 .boxed(),
322 )
323 }
324 } else {
325 None
326 };
327
328 let slow_query_logging_layer = build_slow_query_logger(opts, slow_query_opts, &mut guards);
329
330 let filter = opts
335 .level
336 .as_deref()
337 .or(env::var(EnvFilter::DEFAULT_ENV).ok().as_deref())
338 .unwrap_or(DEFAULT_LOG_TARGETS)
339 .parse::<filter::Targets>()
340 .expect("error parsing log level string");
341
342 let (dyn_filter, reload_handle) = tracing_subscriber::reload::Layer::new(filter.clone());
343
344 RELOAD_HANDLE
345 .set(reload_handle)
346 .expect("reload handle already set, maybe init_global_logging get called twice?");
347
348 #[cfg(feature = "tokio-console")]
351 let subscriber = {
352 let tokio_console_layer =
353 if let Some(tokio_console_addr) = &tracing_opts.tokio_console_addr {
354 let addr: std::net::SocketAddr = tokio_console_addr.parse().unwrap_or_else(|e| {
355 panic!("Invalid binding address '{tokio_console_addr}' for tokio-console: {e}");
356 });
357 println!("tokio-console listening on {addr}");
358
359 Some(
360 console_subscriber::ConsoleLayer::builder()
361 .server_addr(addr)
362 .spawn(),
363 )
364 } else {
365 None
366 };
367
368 Registry::default()
369 .with(dyn_filter)
370 .with(tokio_console_layer)
371 .with(stdout_logging_layer)
372 .with(file_logging_layer)
373 .with(err_file_logging_layer)
374 .with(slow_query_logging_layer)
375 };
376
377 let _ = tracing_opts;
379
380 #[cfg(not(feature = "tokio-console"))]
381 let subscriber = Registry::default()
382 .with(dyn_filter)
383 .with(stdout_logging_layer)
384 .with(file_logging_layer)
385 .with(err_file_logging_layer)
386 .with(slow_query_logging_layer);
387
388 if opts.enable_otlp_tracing {
389 global::set_text_map_propagator(TraceContextPropagator::new());
390
391 let sampler = opts
392 .tracing_sample_ratio
393 .as_ref()
394 .map(create_sampler)
395 .map(Sampler::ParentBased)
396 .unwrap_or(Sampler::ParentBased(Box::new(Sampler::AlwaysOn)));
397
398 let trace_config = opentelemetry_sdk::trace::config()
399 .with_sampler(sampler)
400 .with_resource(opentelemetry_sdk::Resource::new(vec![
401 KeyValue::new(resource::SERVICE_NAME, app_name.to_string()),
402 KeyValue::new(
403 resource::SERVICE_INSTANCE_ID,
404 node_id.unwrap_or("none".to_string()),
405 ),
406 KeyValue::new(resource::SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
407 KeyValue::new(resource::PROCESS_PID, std::process::id().to_string()),
408 ]));
409
410 let tracer = opentelemetry_otlp::new_pipeline()
411 .tracing()
412 .with_exporter(build_otlp_exporter(opts))
413 .with_trace_config(trace_config)
414 .install_batch(opentelemetry_sdk::runtime::Tokio)
415 .expect("otlp tracer install failed");
416
417 tracing::subscriber::set_global_default(
418 subscriber.with(tracing_opentelemetry::layer().with_tracer(tracer)),
419 )
420 .expect("error setting global tracing subscriber");
421 } else {
422 tracing::subscriber::set_global_default(subscriber)
423 .expect("error setting global tracing subscriber");
424 }
425 });
426
427 guards
428}
429
430fn build_otlp_exporter(opts: &LoggingOptions) -> SpanExporterBuilder {
431 let protocol = opts
432 .otlp_export_protocol
433 .clone()
434 .unwrap_or(OtlpExportProtocol::Http);
435
436 let endpoint = opts
437 .otlp_endpoint
438 .as_ref()
439 .map(|e| {
440 if e.starts_with("http") {
441 e.to_string()
442 } else {
443 format!("http://{}", e)
444 }
445 })
446 .unwrap_or_else(|| match protocol {
447 OtlpExportProtocol::Grpc => DEFAULT_OTLP_GRPC_ENDPOINT.to_string(),
448 OtlpExportProtocol::Http => DEFAULT_OTLP_HTTP_ENDPOINT.to_string(),
449 });
450
451 match protocol {
452 OtlpExportProtocol::Grpc => SpanExporterBuilder::Tonic(
453 opentelemetry_otlp::new_exporter()
454 .tonic()
455 .with_endpoint(endpoint),
456 ),
457 OtlpExportProtocol::Http => SpanExporterBuilder::Http(
458 opentelemetry_otlp::new_exporter()
459 .http()
460 .with_endpoint(endpoint)
461 .with_protocol(Protocol::HttpBinary),
462 ),
463 }
464}
465
466fn build_slow_query_logger<S>(
467 opts: &LoggingOptions,
468 slow_query_opts: Option<&SlowQueryOptions>,
469 guards: &mut Vec<WorkerGuard>,
470) -> Option<Box<dyn tracing_subscriber::Layer<S> + Send + Sync + 'static>>
471where
472 S: tracing::Subscriber
473 + Send
474 + 'static
475 + for<'span> tracing_subscriber::registry::LookupSpan<'span>,
476{
477 if let Some(slow_query_opts) = slow_query_opts {
478 if !opts.dir.is_empty()
479 && slow_query_opts.enable
480 && slow_query_opts.record_type == SlowQueriesRecordType::Log
481 {
482 let rolling_appender = RollingFileAppender::builder()
483 .rotation(Rotation::HOURLY)
484 .filename_prefix("greptimedb-slow-queries")
485 .max_log_files(opts.max_log_files)
486 .build(&opts.dir)
487 .unwrap_or_else(|e| {
488 panic!(
489 "initializing rolling file appender at {} failed: {}",
490 &opts.dir, e
491 )
492 });
493 let (writer, guard) = tracing_appender::non_blocking(rolling_appender);
494 guards.push(guard);
495
496 let slow_query_filter = FilterFn::new(|metadata| {
498 metadata
499 .fields()
500 .iter()
501 .any(|field| field.name().contains("slow"))
502 });
503
504 if opts.log_format == LogFormat::Json {
505 Some(
506 Layer::new()
507 .json()
508 .with_writer(writer)
509 .with_ansi(false)
510 .with_filter(slow_query_filter)
511 .boxed(),
512 )
513 } else {
514 Some(
515 Layer::new()
516 .with_writer(writer)
517 .with_ansi(false)
518 .with_filter(slow_query_filter)
519 .boxed(),
520 )
521 }
522 } else {
523 None
524 }
525 } else {
526 None
527 }
528}