1use std::env;
17use std::sync::{Arc, Mutex, Once};
18use std::time::Duration;
19
20use once_cell::sync::{Lazy, OnceCell};
21use opentelemetry::{global, KeyValue};
22use opentelemetry_otlp::WithExportConfig;
23use opentelemetry_sdk::propagation::TraceContextPropagator;
24use opentelemetry_sdk::trace::Sampler;
25use opentelemetry_semantic_conventions::resource;
26use serde::{Deserialize, Serialize};
27use tracing_appender::non_blocking::WorkerGuard;
28use tracing_appender::rolling::{RollingFileAppender, Rotation};
29use tracing_log::LogTracer;
30use tracing_subscriber::filter::{FilterFn, Targets};
31use tracing_subscriber::fmt::Layer;
32use tracing_subscriber::layer::SubscriberExt;
33use tracing_subscriber::prelude::*;
34use tracing_subscriber::{filter, EnvFilter, Registry};
35
36use crate::tracing_sampler::{create_sampler, TracingSampleOptions};
37
38pub const DEFAULT_OTLP_ENDPOINT: &str = "http://localhost:4317";
39
40pub static RELOAD_HANDLE: OnceCell<tracing_subscriber::reload::Handle<Targets, Registry>> =
42 OnceCell::new();
43
44#[derive(Clone, Debug, Serialize, Deserialize)]
46#[serde(default)]
47pub struct LoggingOptions {
48 pub dir: String,
50
51 pub level: Option<String>,
53
54 pub log_format: LogFormat,
56
57 pub max_log_files: usize,
59
60 pub append_stdout: bool,
62
63 pub enable_otlp_tracing: bool,
65
66 pub otlp_endpoint: Option<String>,
68
69 pub tracing_sample_ratio: Option<TracingSampleOptions>,
71
72 pub slow_query: SlowQueryOptions,
74}
75
76#[derive(Clone, Debug, Serialize, Deserialize, Default)]
78#[serde(default)]
79pub struct SlowQueryOptions {
80 pub enable: bool,
82
83 #[serde(with = "humantime_serde")]
85 pub threshold: Option<Duration>,
86
87 pub sample_ratio: Option<f64>,
89}
90
91#[derive(Clone, Debug, Copy, PartialEq, Eq, Serialize, Deserialize)]
92#[serde(rename_all = "snake_case")]
93pub enum LogFormat {
94 Json,
95 Text,
96}
97
98impl PartialEq for LoggingOptions {
99 fn eq(&self, other: &Self) -> bool {
100 self.dir == other.dir
101 && self.level == other.level
102 && self.enable_otlp_tracing == other.enable_otlp_tracing
103 && self.otlp_endpoint == other.otlp_endpoint
104 && self.tracing_sample_ratio == other.tracing_sample_ratio
105 && self.append_stdout == other.append_stdout
106 }
107}
108
109impl Eq for LoggingOptions {}
110
111impl Default for LoggingOptions {
112 fn default() -> Self {
113 Self {
114 dir: "./greptimedb_data/logs".to_string(),
115 level: None,
116 log_format: LogFormat::Text,
117 enable_otlp_tracing: false,
118 otlp_endpoint: None,
119 tracing_sample_ratio: None,
120 append_stdout: true,
121 slow_query: SlowQueryOptions::default(),
122 max_log_files: 720,
124 }
125 }
126}
127
128#[derive(Default, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
129pub struct TracingOptions {
130 #[cfg(feature = "tokio-console")]
131 pub tokio_console_addr: Option<String>,
132}
133
134pub fn init_default_ut_logging() {
137 static START: Once = Once::new();
138
139 START.call_once(|| {
140 let mut g = GLOBAL_UT_LOG_GUARD.as_ref().lock().unwrap();
141
142 let dir =
147 env::var("UNITTEST_LOG_DIR").unwrap_or_else(|_| "/tmp/__unittest_logs".to_string());
148
149 let level = env::var("UNITTEST_LOG_LEVEL").unwrap_or_else(|_|
150 "debug,hyper=warn,tower=warn,datafusion=warn,reqwest=warn,sqlparser=warn,h2=info,opendal=info,rskafka=info".to_string()
151 );
152 let opts = LoggingOptions {
153 dir: dir.clone(),
154 level: Some(level),
155 ..Default::default()
156 };
157 *g = Some(init_global_logging(
158 "unittest",
159 &opts,
160 &TracingOptions::default(),
161 None
162 ));
163
164 crate::info!("logs dir = {}", dir);
165 });
166}
167
168static GLOBAL_UT_LOG_GUARD: Lazy<Arc<Mutex<Option<Vec<WorkerGuard>>>>> =
169 Lazy::new(|| Arc::new(Mutex::new(None)));
170
171const DEFAULT_LOG_TARGETS: &str = "info";
172
173#[allow(clippy::print_stdout)]
174pub fn init_global_logging(
175 app_name: &str,
176 opts: &LoggingOptions,
177 tracing_opts: &TracingOptions,
178 node_id: Option<String>,
179) -> Vec<WorkerGuard> {
180 static START: Once = Once::new();
181 let mut guards = vec![];
182
183 START.call_once(|| {
184 LogTracer::init().expect("log tracer must be valid");
186
187 let stdout_logging_layer = if opts.append_stdout {
189 let (writer, guard) = tracing_appender::non_blocking(std::io::stdout());
190 guards.push(guard);
191
192 if opts.log_format == LogFormat::Json {
193 Some(
194 Layer::new()
195 .json()
196 .with_writer(writer)
197 .with_ansi(atty::is(atty::Stream::Stdout))
198 .boxed(),
199 )
200 } else {
201 Some(
202 Layer::new()
203 .with_writer(writer)
204 .with_ansi(atty::is(atty::Stream::Stdout))
205 .boxed(),
206 )
207 }
208 } else {
209 None
210 };
211
212 let file_logging_layer = if !opts.dir.is_empty() {
214 let rolling_appender = RollingFileAppender::builder()
215 .rotation(Rotation::HOURLY)
216 .filename_prefix("greptimedb")
217 .max_log_files(opts.max_log_files)
218 .build(&opts.dir)
219 .unwrap_or_else(|e| {
220 panic!(
221 "initializing rolling file appender at {} failed: {}",
222 &opts.dir, e
223 )
224 });
225 let (writer, guard) = tracing_appender::non_blocking(rolling_appender);
226 guards.push(guard);
227
228 if opts.log_format == LogFormat::Json {
229 Some(
230 Layer::new()
231 .json()
232 .with_writer(writer)
233 .with_ansi(false)
234 .boxed(),
235 )
236 } else {
237 Some(Layer::new().with_writer(writer).with_ansi(false).boxed())
238 }
239 } else {
240 None
241 };
242
243 let err_file_logging_layer = if !opts.dir.is_empty() {
245 let rolling_appender = RollingFileAppender::builder()
246 .rotation(Rotation::HOURLY)
247 .filename_prefix("greptimedb-err")
248 .max_log_files(opts.max_log_files)
249 .build(&opts.dir)
250 .unwrap_or_else(|e| {
251 panic!(
252 "initializing rolling file appender at {} failed: {}",
253 &opts.dir, e
254 )
255 });
256 let (writer, guard) = tracing_appender::non_blocking(rolling_appender);
257 guards.push(guard);
258
259 if opts.log_format == LogFormat::Json {
260 Some(
261 Layer::new()
262 .json()
263 .with_writer(writer)
264 .with_ansi(false)
265 .with_filter(filter::LevelFilter::ERROR)
266 .boxed(),
267 )
268 } else {
269 Some(
270 Layer::new()
271 .with_writer(writer)
272 .with_ansi(false)
273 .with_filter(filter::LevelFilter::ERROR)
274 .boxed(),
275 )
276 }
277 } else {
278 None
279 };
280
281 let slow_query_logging_layer = if !opts.dir.is_empty() && opts.slow_query.enable {
282 let rolling_appender = RollingFileAppender::builder()
283 .rotation(Rotation::HOURLY)
284 .filename_prefix("greptimedb-slow-queries")
285 .max_log_files(opts.max_log_files)
286 .build(&opts.dir)
287 .unwrap_or_else(|e| {
288 panic!(
289 "initializing rolling file appender at {} failed: {}",
290 &opts.dir, e
291 )
292 });
293 let (writer, guard) = tracing_appender::non_blocking(rolling_appender);
294 guards.push(guard);
295
296 let slow_query_filter = FilterFn::new(|metadata| {
298 metadata
299 .fields()
300 .iter()
301 .any(|field| field.name().contains("slow"))
302 });
303
304 if opts.log_format == LogFormat::Json {
305 Some(
306 Layer::new()
307 .json()
308 .with_writer(writer)
309 .with_ansi(false)
310 .with_filter(slow_query_filter)
311 .boxed(),
312 )
313 } else {
314 Some(
315 Layer::new()
316 .with_writer(writer)
317 .with_ansi(false)
318 .with_filter(slow_query_filter)
319 .boxed(),
320 )
321 }
322 } else {
323 None
324 };
325
326 let filter = opts
331 .level
332 .as_deref()
333 .or(env::var(EnvFilter::DEFAULT_ENV).ok().as_deref())
334 .unwrap_or(DEFAULT_LOG_TARGETS)
335 .parse::<filter::Targets>()
336 .expect("error parsing log level string");
337
338 let (dyn_filter, reload_handle) = tracing_subscriber::reload::Layer::new(filter.clone());
339
340 RELOAD_HANDLE
341 .set(reload_handle)
342 .expect("reload handle already set, maybe init_global_logging get called twice?");
343
344 #[cfg(feature = "tokio-console")]
347 let subscriber = {
348 let tokio_console_layer =
349 if let Some(tokio_console_addr) = &tracing_opts.tokio_console_addr {
350 let addr: std::net::SocketAddr = tokio_console_addr.parse().unwrap_or_else(|e| {
351 panic!("Invalid binding address '{tokio_console_addr}' for tokio-console: {e}");
352 });
353 println!("tokio-console listening on {addr}");
354
355 Some(
356 console_subscriber::ConsoleLayer::builder()
357 .server_addr(addr)
358 .spawn(),
359 )
360 } else {
361 None
362 };
363
364 Registry::default()
365 .with(dyn_filter)
366 .with(tokio_console_layer)
367 .with(stdout_logging_layer)
368 .with(file_logging_layer)
369 .with(err_file_logging_layer)
370 .with(slow_query_logging_layer)
371 };
372
373 let _ = tracing_opts;
375
376 #[cfg(not(feature = "tokio-console"))]
377 let subscriber = Registry::default()
378 .with(dyn_filter)
379 .with(stdout_logging_layer)
380 .with(file_logging_layer)
381 .with(err_file_logging_layer)
382 .with(slow_query_logging_layer);
383
384 if opts.enable_otlp_tracing {
385 global::set_text_map_propagator(TraceContextPropagator::new());
386
387 let sampler = opts
388 .tracing_sample_ratio
389 .as_ref()
390 .map(create_sampler)
391 .map(Sampler::ParentBased)
392 .unwrap_or(Sampler::ParentBased(Box::new(Sampler::AlwaysOn)));
393
394 let trace_config = opentelemetry_sdk::trace::config()
395 .with_sampler(sampler)
396 .with_resource(opentelemetry_sdk::Resource::new(vec![
397 KeyValue::new(resource::SERVICE_NAME, app_name.to_string()),
398 KeyValue::new(
399 resource::SERVICE_INSTANCE_ID,
400 node_id.unwrap_or("none".to_string()),
401 ),
402 KeyValue::new(resource::SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
403 KeyValue::new(resource::PROCESS_PID, std::process::id().to_string()),
404 ]));
405
406 let exporter = opentelemetry_otlp::new_exporter().tonic().with_endpoint(
407 opts.otlp_endpoint
408 .as_ref()
409 .map(|e| {
410 if e.starts_with("http") {
411 e.to_string()
412 } else {
413 format!("http://{}", e)
414 }
415 })
416 .unwrap_or(DEFAULT_OTLP_ENDPOINT.to_string()),
417 );
418
419 let tracer = opentelemetry_otlp::new_pipeline()
420 .tracing()
421 .with_exporter(exporter)
422 .with_trace_config(trace_config)
423 .install_batch(opentelemetry_sdk::runtime::Tokio)
424 .expect("otlp tracer install failed");
425
426 tracing::subscriber::set_global_default(
427 subscriber.with(tracing_opentelemetry::layer().with_tracer(tracer)),
428 )
429 .expect("error setting global tracing subscriber");
430 } else {
431 tracing::subscriber::set_global_default(subscriber)
432 .expect("error setting global tracing subscriber");
433 }
434 });
435
436 guards
437}