1use std::env;
17use std::sync::{Arc, Mutex, Once};
18use std::time::Duration;
19
20use once_cell::sync::{Lazy, OnceCell};
21use opentelemetry::{global, KeyValue};
22use opentelemetry_otlp::WithExportConfig;
23use opentelemetry_sdk::propagation::TraceContextPropagator;
24use opentelemetry_sdk::trace::Sampler;
25use opentelemetry_semantic_conventions::resource;
26use serde::{Deserialize, Serialize};
27use tracing_appender::non_blocking::WorkerGuard;
28use tracing_appender::rolling::{RollingFileAppender, Rotation};
29use tracing_log::LogTracer;
30use tracing_subscriber::filter::{FilterFn, Targets};
31use tracing_subscriber::fmt::Layer;
32use tracing_subscriber::layer::SubscriberExt;
33use tracing_subscriber::prelude::*;
34use tracing_subscriber::{filter, EnvFilter, Registry};
35
36use crate::tracing_sampler::{create_sampler, TracingSampleOptions};
37
38pub const DEFAULT_OTLP_ENDPOINT: &str = "http://localhost:4317";
39
40pub static RELOAD_HANDLE: OnceCell<tracing_subscriber::reload::Handle<Targets, Registry>> =
42 OnceCell::new();
43
44#[derive(Clone, Debug, Serialize, Deserialize)]
46#[serde(default)]
47pub struct LoggingOptions {
48 pub dir: String,
50
51 pub level: Option<String>,
53
54 pub log_format: LogFormat,
56
57 pub max_log_files: usize,
59
60 pub append_stdout: bool,
62
63 pub enable_otlp_tracing: bool,
65
66 pub otlp_endpoint: Option<String>,
68
69 pub tracing_sample_ratio: Option<TracingSampleOptions>,
71}
72
73#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
75pub struct SlowQueryOptions {
76 pub enable: bool,
78
79 pub record_type: SlowQueriesRecordType,
81
82 #[serde(with = "humantime_serde")]
84 pub threshold: Option<Duration>,
85
86 pub sample_ratio: Option<f64>,
88
89 pub ttl: Option<String>,
92}
93
94impl Default for SlowQueryOptions {
95 fn default() -> Self {
96 Self {
97 enable: true,
98 record_type: SlowQueriesRecordType::SystemTable,
99 threshold: Some(Duration::from_secs(30)),
100 sample_ratio: Some(1.0),
101 ttl: Some("30d".to_string()),
102 }
103 }
104}
105
106#[derive(Clone, Debug, Serialize, Deserialize, Copy, PartialEq)]
107#[serde(rename_all = "snake_case")]
108pub enum SlowQueriesRecordType {
109 SystemTable,
110 Log,
111}
112
113#[derive(Clone, Debug, Copy, PartialEq, Eq, Serialize, Deserialize)]
114#[serde(rename_all = "snake_case")]
115pub enum LogFormat {
116 Json,
117 Text,
118}
119
120impl PartialEq for LoggingOptions {
121 fn eq(&self, other: &Self) -> bool {
122 self.dir == other.dir
123 && self.level == other.level
124 && self.enable_otlp_tracing == other.enable_otlp_tracing
125 && self.otlp_endpoint == other.otlp_endpoint
126 && self.tracing_sample_ratio == other.tracing_sample_ratio
127 && self.append_stdout == other.append_stdout
128 }
129}
130
131impl Eq for LoggingOptions {}
132
133impl Default for LoggingOptions {
134 fn default() -> Self {
135 Self {
136 dir: "./greptimedb_data/logs".to_string(),
137 level: None,
138 log_format: LogFormat::Text,
139 enable_otlp_tracing: false,
140 otlp_endpoint: None,
141 tracing_sample_ratio: None,
142 append_stdout: true,
143 max_log_files: 720,
145 }
146 }
147}
148
149#[derive(Default, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
150pub struct TracingOptions {
151 #[cfg(feature = "tokio-console")]
152 pub tokio_console_addr: Option<String>,
153}
154
155pub fn init_default_ut_logging() {
158 static START: Once = Once::new();
159
160 START.call_once(|| {
161 let mut g = GLOBAL_UT_LOG_GUARD.as_ref().lock().unwrap();
162
163 let dir =
168 env::var("UNITTEST_LOG_DIR").unwrap_or_else(|_| "/tmp/__unittest_logs".to_string());
169
170 let level = env::var("UNITTEST_LOG_LEVEL").unwrap_or_else(|_|
171 "debug,hyper=warn,tower=warn,datafusion=warn,reqwest=warn,sqlparser=warn,h2=info,opendal=info,rskafka=info".to_string()
172 );
173 let opts = LoggingOptions {
174 dir: dir.clone(),
175 level: Some(level),
176 ..Default::default()
177 };
178 *g = Some(init_global_logging(
179 "unittest",
180 &opts,
181 &TracingOptions::default(),
182 None,
183 None,
184 ));
185
186 crate::info!("logs dir = {}", dir);
187 });
188}
189
190static GLOBAL_UT_LOG_GUARD: Lazy<Arc<Mutex<Option<Vec<WorkerGuard>>>>> =
191 Lazy::new(|| Arc::new(Mutex::new(None)));
192
193const DEFAULT_LOG_TARGETS: &str = "info";
194
195#[allow(clippy::print_stdout)]
196pub fn init_global_logging(
197 app_name: &str,
198 opts: &LoggingOptions,
199 tracing_opts: &TracingOptions,
200 node_id: Option<String>,
201 slow_query_opts: Option<&SlowQueryOptions>,
202) -> Vec<WorkerGuard> {
203 static START: Once = Once::new();
204 let mut guards = vec![];
205
206 START.call_once(|| {
207 LogTracer::init().expect("log tracer must be valid");
209
210 let stdout_logging_layer = if opts.append_stdout {
212 let (writer, guard) = tracing_appender::non_blocking(std::io::stdout());
213 guards.push(guard);
214
215 if opts.log_format == LogFormat::Json {
216 Some(
217 Layer::new()
218 .json()
219 .with_writer(writer)
220 .with_ansi(atty::is(atty::Stream::Stdout))
221 .boxed(),
222 )
223 } else {
224 Some(
225 Layer::new()
226 .with_writer(writer)
227 .with_ansi(atty::is(atty::Stream::Stdout))
228 .boxed(),
229 )
230 }
231 } else {
232 None
233 };
234
235 let file_logging_layer = if !opts.dir.is_empty() {
237 let rolling_appender = RollingFileAppender::builder()
238 .rotation(Rotation::HOURLY)
239 .filename_prefix("greptimedb")
240 .max_log_files(opts.max_log_files)
241 .build(&opts.dir)
242 .unwrap_or_else(|e| {
243 panic!(
244 "initializing rolling file appender at {} failed: {}",
245 &opts.dir, e
246 )
247 });
248 let (writer, guard) = tracing_appender::non_blocking(rolling_appender);
249 guards.push(guard);
250
251 if opts.log_format == LogFormat::Json {
252 Some(
253 Layer::new()
254 .json()
255 .with_writer(writer)
256 .with_ansi(false)
257 .boxed(),
258 )
259 } else {
260 Some(Layer::new().with_writer(writer).with_ansi(false).boxed())
261 }
262 } else {
263 None
264 };
265
266 let err_file_logging_layer = if !opts.dir.is_empty() {
268 let rolling_appender = RollingFileAppender::builder()
269 .rotation(Rotation::HOURLY)
270 .filename_prefix("greptimedb-err")
271 .max_log_files(opts.max_log_files)
272 .build(&opts.dir)
273 .unwrap_or_else(|e| {
274 panic!(
275 "initializing rolling file appender at {} failed: {}",
276 &opts.dir, e
277 )
278 });
279 let (writer, guard) = tracing_appender::non_blocking(rolling_appender);
280 guards.push(guard);
281
282 if opts.log_format == LogFormat::Json {
283 Some(
284 Layer::new()
285 .json()
286 .with_writer(writer)
287 .with_ansi(false)
288 .with_filter(filter::LevelFilter::ERROR)
289 .boxed(),
290 )
291 } else {
292 Some(
293 Layer::new()
294 .with_writer(writer)
295 .with_ansi(false)
296 .with_filter(filter::LevelFilter::ERROR)
297 .boxed(),
298 )
299 }
300 } else {
301 None
302 };
303
304 let slow_query_logging_layer = build_slow_query_logger(opts, slow_query_opts, &mut guards);
305
306 let filter = opts
311 .level
312 .as_deref()
313 .or(env::var(EnvFilter::DEFAULT_ENV).ok().as_deref())
314 .unwrap_or(DEFAULT_LOG_TARGETS)
315 .parse::<filter::Targets>()
316 .expect("error parsing log level string");
317
318 let (dyn_filter, reload_handle) = tracing_subscriber::reload::Layer::new(filter.clone());
319
320 RELOAD_HANDLE
321 .set(reload_handle)
322 .expect("reload handle already set, maybe init_global_logging get called twice?");
323
324 #[cfg(feature = "tokio-console")]
327 let subscriber = {
328 let tokio_console_layer =
329 if let Some(tokio_console_addr) = &tracing_opts.tokio_console_addr {
330 let addr: std::net::SocketAddr = tokio_console_addr.parse().unwrap_or_else(|e| {
331 panic!("Invalid binding address '{tokio_console_addr}' for tokio-console: {e}");
332 });
333 println!("tokio-console listening on {addr}");
334
335 Some(
336 console_subscriber::ConsoleLayer::builder()
337 .server_addr(addr)
338 .spawn(),
339 )
340 } else {
341 None
342 };
343
344 Registry::default()
345 .with(dyn_filter)
346 .with(tokio_console_layer)
347 .with(stdout_logging_layer)
348 .with(file_logging_layer)
349 .with(err_file_logging_layer)
350 .with(slow_query_logging_layer)
351 };
352
353 let _ = tracing_opts;
355
356 #[cfg(not(feature = "tokio-console"))]
357 let subscriber = Registry::default()
358 .with(dyn_filter)
359 .with(stdout_logging_layer)
360 .with(file_logging_layer)
361 .with(err_file_logging_layer)
362 .with(slow_query_logging_layer);
363
364 if opts.enable_otlp_tracing {
365 global::set_text_map_propagator(TraceContextPropagator::new());
366
367 let sampler = opts
368 .tracing_sample_ratio
369 .as_ref()
370 .map(create_sampler)
371 .map(Sampler::ParentBased)
372 .unwrap_or(Sampler::ParentBased(Box::new(Sampler::AlwaysOn)));
373
374 let trace_config = opentelemetry_sdk::trace::config()
375 .with_sampler(sampler)
376 .with_resource(opentelemetry_sdk::Resource::new(vec![
377 KeyValue::new(resource::SERVICE_NAME, app_name.to_string()),
378 KeyValue::new(
379 resource::SERVICE_INSTANCE_ID,
380 node_id.unwrap_or("none".to_string()),
381 ),
382 KeyValue::new(resource::SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
383 KeyValue::new(resource::PROCESS_PID, std::process::id().to_string()),
384 ]));
385
386 let exporter = opentelemetry_otlp::new_exporter().tonic().with_endpoint(
387 opts.otlp_endpoint
388 .as_ref()
389 .map(|e| {
390 if e.starts_with("http") {
391 e.to_string()
392 } else {
393 format!("http://{}", e)
394 }
395 })
396 .unwrap_or(DEFAULT_OTLP_ENDPOINT.to_string()),
397 );
398
399 let tracer = opentelemetry_otlp::new_pipeline()
400 .tracing()
401 .with_exporter(exporter)
402 .with_trace_config(trace_config)
403 .install_batch(opentelemetry_sdk::runtime::Tokio)
404 .expect("otlp tracer install failed");
405
406 tracing::subscriber::set_global_default(
407 subscriber.with(tracing_opentelemetry::layer().with_tracer(tracer)),
408 )
409 .expect("error setting global tracing subscriber");
410 } else {
411 tracing::subscriber::set_global_default(subscriber)
412 .expect("error setting global tracing subscriber");
413 }
414 });
415
416 guards
417}
418
419fn build_slow_query_logger<S>(
420 opts: &LoggingOptions,
421 slow_query_opts: Option<&SlowQueryOptions>,
422 guards: &mut Vec<WorkerGuard>,
423) -> Option<Box<dyn tracing_subscriber::Layer<S> + Send + Sync + 'static>>
424where
425 S: tracing::Subscriber
426 + Send
427 + 'static
428 + for<'span> tracing_subscriber::registry::LookupSpan<'span>,
429{
430 if let Some(slow_query_opts) = slow_query_opts {
431 if !opts.dir.is_empty()
432 && slow_query_opts.enable
433 && slow_query_opts.record_type == SlowQueriesRecordType::Log
434 {
435 let rolling_appender = RollingFileAppender::builder()
436 .rotation(Rotation::HOURLY)
437 .filename_prefix("greptimedb-slow-queries")
438 .max_log_files(opts.max_log_files)
439 .build(&opts.dir)
440 .unwrap_or_else(|e| {
441 panic!(
442 "initializing rolling file appender at {} failed: {}",
443 &opts.dir, e
444 )
445 });
446 let (writer, guard) = tracing_appender::non_blocking(rolling_appender);
447 guards.push(guard);
448
449 let slow_query_filter = FilterFn::new(|metadata| {
451 metadata
452 .fields()
453 .iter()
454 .any(|field| field.name().contains("slow"))
455 });
456
457 if opts.log_format == LogFormat::Json {
458 Some(
459 Layer::new()
460 .json()
461 .with_writer(writer)
462 .with_ansi(false)
463 .with_filter(slow_query_filter)
464 .boxed(),
465 )
466 } else {
467 Some(
468 Layer::new()
469 .with_writer(writer)
470 .with_ansi(false)
471 .with_filter(slow_query_filter)
472 .boxed(),
473 )
474 }
475 } else {
476 None
477 }
478 } else {
479 None
480 }
481}