common_telemetry/
logging.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! logging stuffs, inspired by databend
16use std::collections::HashMap;
17use std::env;
18use std::io::IsTerminal;
19use std::sync::{Arc, Mutex, Once};
20use std::time::Duration;
21
22use common_base::serde::empty_string_as_default;
23use once_cell::sync::{Lazy, OnceCell};
24use opentelemetry::trace::TracerProvider;
25use opentelemetry::{global, KeyValue};
26use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig, WithHttpConfig};
27use opentelemetry_sdk::propagation::TraceContextPropagator;
28use opentelemetry_sdk::trace::Sampler;
29use opentelemetry_semantic_conventions::resource;
30use serde::{Deserialize, Serialize};
31use tracing_appender::non_blocking::WorkerGuard;
32use tracing_appender::rolling::{RollingFileAppender, Rotation};
33use tracing_log::LogTracer;
34use tracing_subscriber::filter::{FilterFn, Targets};
35use tracing_subscriber::fmt::Layer;
36use tracing_subscriber::layer::SubscriberExt;
37use tracing_subscriber::prelude::*;
38use tracing_subscriber::{filter, EnvFilter, Registry};
39
40use crate::tracing_sampler::{create_sampler, TracingSampleOptions};
41
42/// The default endpoint when use gRPC exporter protocol.
43pub const DEFAULT_OTLP_GRPC_ENDPOINT: &str = "http://localhost:4317";
44
45/// The default endpoint when use HTTP exporter protocol.
46pub const DEFAULT_OTLP_HTTP_ENDPOINT: &str = "http://localhost:4318/v1/traces";
47
48/// The default logs directory.
49pub const DEFAULT_LOGGING_DIR: &str = "logs";
50
51// Handle for reloading log level
52pub static RELOAD_HANDLE: OnceCell<tracing_subscriber::reload::Handle<Targets, Registry>> =
53    OnceCell::new();
54
55/// The logging options that used to initialize the logger.
56#[derive(Clone, Debug, Serialize, Deserialize)]
57#[serde(default)]
58pub struct LoggingOptions {
59    /// The directory to store log files. If not set, logs will be written to stdout.
60    pub dir: String,
61
62    /// The log level that can be one of "trace", "debug", "info", "warn", "error". Default is "info".
63    pub level: Option<String>,
64
65    /// The log format that can be one of "json" or "text". Default is "text".
66    #[serde(default, deserialize_with = "empty_string_as_default")]
67    pub log_format: LogFormat,
68
69    /// The maximum number of log files set by default.
70    pub max_log_files: usize,
71
72    /// Whether to append logs to stdout. Default is true.
73    pub append_stdout: bool,
74
75    /// Whether to enable tracing with OTLP. Default is false.
76    pub enable_otlp_tracing: bool,
77
78    /// The endpoint of OTLP.
79    pub otlp_endpoint: Option<String>,
80
81    /// The tracing sample ratio.
82    pub tracing_sample_ratio: Option<TracingSampleOptions>,
83
84    /// The protocol of OTLP export.
85    pub otlp_export_protocol: Option<OtlpExportProtocol>,
86
87    /// Additional HTTP headers for OTLP exporter.
88    #[serde(skip_serializing_if = "HashMap::is_empty")]
89    pub otlp_headers: HashMap<String, String>,
90}
91
92/// The protocol of OTLP export.
93#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
94#[serde(rename_all = "snake_case")]
95pub enum OtlpExportProtocol {
96    /// GRPC protocol.
97    Grpc,
98
99    /// HTTP protocol with binary protobuf.
100    Http,
101}
102
103/// The options of slow query.
104#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
105#[serde(default)]
106pub struct SlowQueryOptions {
107    /// Whether to enable slow query log.
108    pub enable: bool,
109
110    /// The record type of slow queries.
111    #[serde(deserialize_with = "empty_string_as_default")]
112    pub record_type: SlowQueriesRecordType,
113
114    /// The threshold of slow queries.
115    #[serde(with = "humantime_serde")]
116    pub threshold: Duration,
117
118    /// The sample ratio of slow queries.
119    pub sample_ratio: f64,
120
121    /// The table TTL of `slow_queries` system table. Default is "90d".
122    /// It's used when `record_type` is `SystemTable`.
123    #[serde(with = "humantime_serde")]
124    pub ttl: Duration,
125}
126
127impl Default for SlowQueryOptions {
128    fn default() -> Self {
129        Self {
130            enable: true,
131            record_type: SlowQueriesRecordType::SystemTable,
132            threshold: Duration::from_secs(30),
133            sample_ratio: 1.0,
134            ttl: Duration::from_days(90),
135        }
136    }
137}
138
139#[derive(Clone, Debug, Serialize, Deserialize, Copy, PartialEq, Default)]
140#[serde(rename_all = "snake_case")]
141pub enum SlowQueriesRecordType {
142    /// Record the slow query in the system table.
143    #[default]
144    SystemTable,
145    /// Record the slow query in a specific logs file.
146    Log,
147}
148
149#[derive(Clone, Debug, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
150#[serde(rename_all = "snake_case")]
151pub enum LogFormat {
152    Json,
153    #[default]
154    Text,
155}
156
157impl PartialEq for LoggingOptions {
158    fn eq(&self, other: &Self) -> bool {
159        self.dir == other.dir
160            && self.level == other.level
161            && self.enable_otlp_tracing == other.enable_otlp_tracing
162            && self.otlp_endpoint == other.otlp_endpoint
163            && self.tracing_sample_ratio == other.tracing_sample_ratio
164            && self.append_stdout == other.append_stdout
165    }
166}
167
168impl Eq for LoggingOptions {}
169
170impl Default for LoggingOptions {
171    fn default() -> Self {
172        Self {
173            // The directory path will be configured at application startup, typically using the data home directory as a base.
174            dir: "".to_string(),
175            level: None,
176            log_format: LogFormat::Text,
177            enable_otlp_tracing: false,
178            otlp_endpoint: None,
179            tracing_sample_ratio: None,
180            append_stdout: true,
181            // Rotation hourly, 24 files per day, keeps info log files of 30 days
182            max_log_files: 720,
183            otlp_export_protocol: None,
184            otlp_headers: HashMap::new(),
185        }
186    }
187}
188
189#[derive(Default, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
190pub struct TracingOptions {
191    #[cfg(feature = "tokio-console")]
192    pub tokio_console_addr: Option<String>,
193}
194
195/// Init tracing for unittest.
196/// Write logs to file `unittest`.
197pub fn init_default_ut_logging() {
198    static START: Once = Once::new();
199
200    START.call_once(|| {
201        let mut g = GLOBAL_UT_LOG_GUARD.as_ref().lock().unwrap();
202
203        // When running in Github's actions, env "UNITTEST_LOG_DIR" is set to a directory other
204        // than "/tmp".
205        // This is to fix the problem that the "/tmp" disk space of action runner's is small,
206        // if we write testing logs in it, actions would fail due to disk out of space error.
207        let dir =
208            env::var("UNITTEST_LOG_DIR").unwrap_or_else(|_| "/tmp/__unittest_logs".to_string());
209
210        let level = env::var("UNITTEST_LOG_LEVEL").unwrap_or_else(|_|
211            "debug,hyper=warn,tower=warn,datafusion=warn,reqwest=warn,sqlparser=warn,h2=info,opendal=info,rskafka=info".to_string()
212        );
213        let opts = LoggingOptions {
214            dir: dir.clone(),
215            level: Some(level),
216            ..Default::default()
217        };
218        *g = Some(init_global_logging(
219            "unittest",
220            &opts,
221            &TracingOptions::default(),
222            None,
223            None,
224        ));
225
226        crate::info!("logs dir = {}", dir);
227    });
228}
229
230static GLOBAL_UT_LOG_GUARD: Lazy<Arc<Mutex<Option<Vec<WorkerGuard>>>>> =
231    Lazy::new(|| Arc::new(Mutex::new(None)));
232
233const DEFAULT_LOG_TARGETS: &str = "info";
234
235#[allow(clippy::print_stdout)]
236pub fn init_global_logging(
237    app_name: &str,
238    opts: &LoggingOptions,
239    tracing_opts: &TracingOptions,
240    node_id: Option<String>,
241    slow_query_opts: Option<&SlowQueryOptions>,
242) -> Vec<WorkerGuard> {
243    static START: Once = Once::new();
244    let mut guards = vec![];
245
246    START.call_once(|| {
247        // Enable log compatible layer to convert log record to tracing span.
248        LogTracer::init().expect("log tracer must be valid");
249
250        // Configure the stdout logging layer.
251        let stdout_logging_layer = if opts.append_stdout {
252            let (writer, guard) = tracing_appender::non_blocking(std::io::stdout());
253            guards.push(guard);
254
255            if opts.log_format == LogFormat::Json {
256                Some(
257                    Layer::new()
258                        .json()
259                        .with_writer(writer)
260                        .with_ansi(std::io::stdout().is_terminal())
261                        .boxed(),
262                )
263            } else {
264                Some(
265                    Layer::new()
266                        .with_writer(writer)
267                        .with_ansi(std::io::stdout().is_terminal())
268                        .boxed(),
269                )
270            }
271        } else {
272            None
273        };
274
275        // Configure the file logging layer with rolling policy.
276        let file_logging_layer = if !opts.dir.is_empty() {
277            let rolling_appender = RollingFileAppender::builder()
278                .rotation(Rotation::HOURLY)
279                .filename_prefix("greptimedb")
280                .max_log_files(opts.max_log_files)
281                .build(&opts.dir)
282                .unwrap_or_else(|e| {
283                    panic!(
284                        "initializing rolling file appender at {} failed: {}",
285                        &opts.dir, e
286                    )
287                });
288            let (writer, guard) = tracing_appender::non_blocking(rolling_appender);
289            guards.push(guard);
290
291            if opts.log_format == LogFormat::Json {
292                Some(
293                    Layer::new()
294                        .json()
295                        .with_writer(writer)
296                        .with_ansi(false)
297                        .boxed(),
298                )
299            } else {
300                Some(Layer::new().with_writer(writer).with_ansi(false).boxed())
301            }
302        } else {
303            None
304        };
305
306        // Configure the error file logging layer with rolling policy.
307        let err_file_logging_layer = if !opts.dir.is_empty() {
308            let rolling_appender = RollingFileAppender::builder()
309                .rotation(Rotation::HOURLY)
310                .filename_prefix("greptimedb-err")
311                .max_log_files(opts.max_log_files)
312                .build(&opts.dir)
313                .unwrap_or_else(|e| {
314                    panic!(
315                        "initializing rolling file appender at {} failed: {}",
316                        &opts.dir, e
317                    )
318                });
319            let (writer, guard) = tracing_appender::non_blocking(rolling_appender);
320            guards.push(guard);
321
322            if opts.log_format == LogFormat::Json {
323                Some(
324                    Layer::new()
325                        .json()
326                        .with_writer(writer)
327                        .with_ansi(false)
328                        .with_filter(filter::LevelFilter::ERROR)
329                        .boxed(),
330                )
331            } else {
332                Some(
333                    Layer::new()
334                        .with_writer(writer)
335                        .with_ansi(false)
336                        .with_filter(filter::LevelFilter::ERROR)
337                        .boxed(),
338                )
339            }
340        } else {
341            None
342        };
343
344        let slow_query_logging_layer = build_slow_query_logger(opts, slow_query_opts, &mut guards);
345
346        // resolve log level settings from:
347        // - options from command line or config files
348        // - environment variable: RUST_LOG
349        // - default settings
350        let filter = opts
351            .level
352            .as_deref()
353            .or(env::var(EnvFilter::DEFAULT_ENV).ok().as_deref())
354            .unwrap_or(DEFAULT_LOG_TARGETS)
355            .parse::<filter::Targets>()
356            .expect("error parsing log level string");
357
358        let (dyn_filter, reload_handle) = tracing_subscriber::reload::Layer::new(filter.clone());
359
360        RELOAD_HANDLE
361            .set(reload_handle)
362            .expect("reload handle already set, maybe init_global_logging get called twice?");
363
364        // Must enable 'tokio_unstable' cfg to use this feature.
365        // For example: `RUSTFLAGS="--cfg tokio_unstable" cargo run -F common-telemetry/console -- standalone start`
366        #[cfg(feature = "tokio-console")]
367        let subscriber = {
368            let tokio_console_layer =
369                if let Some(tokio_console_addr) = &tracing_opts.tokio_console_addr {
370                    let addr: std::net::SocketAddr = tokio_console_addr.parse().unwrap_or_else(|e| {
371                    panic!("Invalid binding address '{tokio_console_addr}' for tokio-console: {e}");
372                });
373                    println!("tokio-console listening on {addr}");
374
375                    Some(
376                        console_subscriber::ConsoleLayer::builder()
377                            .server_addr(addr)
378                            .spawn(),
379                    )
380                } else {
381                    None
382                };
383
384            Registry::default()
385                .with(dyn_filter)
386                .with(tokio_console_layer)
387                .with(stdout_logging_layer)
388                .with(file_logging_layer)
389                .with(err_file_logging_layer)
390                .with(slow_query_logging_layer)
391        };
392
393        // consume the `tracing_opts` to avoid "unused" warnings.
394        let _ = tracing_opts;
395
396        #[cfg(not(feature = "tokio-console"))]
397        let subscriber = Registry::default()
398            .with(dyn_filter)
399            .with(stdout_logging_layer)
400            .with(file_logging_layer)
401            .with(err_file_logging_layer)
402            .with(slow_query_logging_layer);
403
404        if opts.enable_otlp_tracing {
405            global::set_text_map_propagator(TraceContextPropagator::new());
406
407            let sampler = opts
408                .tracing_sample_ratio
409                .as_ref()
410                .map(create_sampler)
411                .map(Sampler::ParentBased)
412                .unwrap_or(Sampler::ParentBased(Box::new(Sampler::AlwaysOn)));
413
414            let provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
415                .with_batch_exporter(build_otlp_exporter(opts))
416                .with_sampler(sampler)
417                .with_resource(
418                    opentelemetry_sdk::Resource::builder_empty()
419                        .with_attributes([
420                            KeyValue::new(resource::SERVICE_NAME, app_name.to_string()),
421                            KeyValue::new(
422                                resource::SERVICE_INSTANCE_ID,
423                                node_id.unwrap_or("none".to_string()),
424                            ),
425                            KeyValue::new(resource::SERVICE_VERSION, common_version::version()),
426                            KeyValue::new(resource::PROCESS_PID, std::process::id().to_string()),
427                        ])
428                        .build(),
429                )
430                .build();
431            let tracer = provider.tracer("greptimedb");
432
433            tracing::subscriber::set_global_default(
434                subscriber.with(tracing_opentelemetry::layer().with_tracer(tracer)),
435            )
436            .expect("error setting global tracing subscriber");
437        } else {
438            tracing::subscriber::set_global_default(subscriber)
439                .expect("error setting global tracing subscriber");
440        }
441    });
442
443    guards
444}
445
446fn build_otlp_exporter(opts: &LoggingOptions) -> SpanExporter {
447    let protocol = opts
448        .otlp_export_protocol
449        .clone()
450        .unwrap_or(OtlpExportProtocol::Http);
451
452    let endpoint = opts
453        .otlp_endpoint
454        .as_ref()
455        .map(|e| {
456            if e.starts_with("http") {
457                e.to_string()
458            } else {
459                format!("http://{}", e)
460            }
461        })
462        .unwrap_or_else(|| match protocol {
463            OtlpExportProtocol::Grpc => DEFAULT_OTLP_GRPC_ENDPOINT.to_string(),
464            OtlpExportProtocol::Http => DEFAULT_OTLP_HTTP_ENDPOINT.to_string(),
465        });
466
467    match protocol {
468        OtlpExportProtocol::Grpc => SpanExporter::builder()
469            .with_tonic()
470            .with_endpoint(endpoint)
471            .build()
472            .expect("Failed to create OTLP gRPC exporter "),
473
474        OtlpExportProtocol::Http => SpanExporter::builder()
475            .with_http()
476            .with_endpoint(endpoint)
477            .with_protocol(Protocol::HttpBinary)
478            .with_headers(opts.otlp_headers.clone())
479            .build()
480            .expect("Failed to create OTLP HTTP exporter "),
481    }
482}
483
484fn build_slow_query_logger<S>(
485    opts: &LoggingOptions,
486    slow_query_opts: Option<&SlowQueryOptions>,
487    guards: &mut Vec<WorkerGuard>,
488) -> Option<Box<dyn tracing_subscriber::Layer<S> + Send + Sync + 'static>>
489where
490    S: tracing::Subscriber
491        + Send
492        + 'static
493        + for<'span> tracing_subscriber::registry::LookupSpan<'span>,
494{
495    if let Some(slow_query_opts) = slow_query_opts {
496        if !opts.dir.is_empty()
497            && slow_query_opts.enable
498            && slow_query_opts.record_type == SlowQueriesRecordType::Log
499        {
500            let rolling_appender = RollingFileAppender::builder()
501                .rotation(Rotation::HOURLY)
502                .filename_prefix("greptimedb-slow-queries")
503                .max_log_files(opts.max_log_files)
504                .build(&opts.dir)
505                .unwrap_or_else(|e| {
506                    panic!(
507                        "initializing rolling file appender at {} failed: {}",
508                        &opts.dir, e
509                    )
510                });
511            let (writer, guard) = tracing_appender::non_blocking(rolling_appender);
512            guards.push(guard);
513
514            // Only logs if the field contains "slow".
515            let slow_query_filter = FilterFn::new(|metadata| {
516                metadata
517                    .fields()
518                    .iter()
519                    .any(|field| field.name().contains("slow"))
520            });
521
522            if opts.log_format == LogFormat::Json {
523                Some(
524                    Layer::new()
525                        .json()
526                        .with_writer(writer)
527                        .with_ansi(false)
528                        .with_filter(slow_query_filter)
529                        .boxed(),
530                )
531            } else {
532                Some(
533                    Layer::new()
534                        .with_writer(writer)
535                        .with_ansi(false)
536                        .with_filter(slow_query_filter)
537                        .boxed(),
538                )
539            }
540        } else {
541            None
542        }
543    } else {
544        None
545    }
546}
547
548#[cfg(test)]
549mod tests {
550    use super::*;
551
552    #[test]
553    fn test_logging_options_deserialization_default() {
554        let json = r#"{}"#;
555        let opts: LoggingOptions = serde_json::from_str(json).unwrap();
556
557        assert_eq!(opts.log_format, LogFormat::Text);
558        assert_eq!(opts.dir, "");
559        assert_eq!(opts.level, None);
560        assert!(opts.append_stdout);
561    }
562
563    #[test]
564    fn test_logging_options_deserialization_empty_log_format() {
565        let json = r#"{"log_format": ""}"#;
566        let opts: LoggingOptions = serde_json::from_str(json).unwrap();
567
568        // Empty string should use default (Text)
569        assert_eq!(opts.log_format, LogFormat::Text);
570    }
571
572    #[test]
573    fn test_logging_options_deserialization_valid_log_format() {
574        let json_format = r#"{"log_format": "json"}"#;
575        let opts: LoggingOptions = serde_json::from_str(json_format).unwrap();
576        assert_eq!(opts.log_format, LogFormat::Json);
577
578        let text_format = r#"{"log_format": "text"}"#;
579        let opts: LoggingOptions = serde_json::from_str(text_format).unwrap();
580        assert_eq!(opts.log_format, LogFormat::Text);
581    }
582
583    #[test]
584    fn test_logging_options_deserialization_missing_log_format() {
585        let json = r#"{"dir": "/tmp/logs"}"#;
586        let opts: LoggingOptions = serde_json::from_str(json).unwrap();
587
588        // Missing log_format should use default (Text)
589        assert_eq!(opts.log_format, LogFormat::Text);
590        assert_eq!(opts.dir, "/tmp/logs");
591    }
592
593    #[test]
594    fn test_slow_query_options_deserialization_default() {
595        let json = r#"{"enable": true, "threshold": "30s"}"#;
596        let opts: SlowQueryOptions = serde_json::from_str(json).unwrap();
597
598        assert_eq!(opts.record_type, SlowQueriesRecordType::SystemTable);
599        assert!(opts.enable);
600    }
601
602    #[test]
603    fn test_slow_query_options_deserialization_empty_record_type() {
604        let json = r#"{"enable": true, "record_type": "", "threshold": "30s"}"#;
605        let opts: SlowQueryOptions = serde_json::from_str(json).unwrap();
606
607        // Empty string should use default (SystemTable)
608        assert_eq!(opts.record_type, SlowQueriesRecordType::SystemTable);
609        assert!(opts.enable);
610    }
611
612    #[test]
613    fn test_slow_query_options_deserialization_valid_record_type() {
614        let system_table_json =
615            r#"{"enable": true, "record_type": "system_table", "threshold": "30s"}"#;
616        let opts: SlowQueryOptions = serde_json::from_str(system_table_json).unwrap();
617        assert_eq!(opts.record_type, SlowQueriesRecordType::SystemTable);
618
619        let log_json = r#"{"enable": true, "record_type": "log", "threshold": "30s"}"#;
620        let opts: SlowQueryOptions = serde_json::from_str(log_json).unwrap();
621        assert_eq!(opts.record_type, SlowQueriesRecordType::Log);
622    }
623
624    #[test]
625    fn test_slow_query_options_deserialization_missing_record_type() {
626        let json = r#"{"enable": false, "threshold": "30s"}"#;
627        let opts: SlowQueryOptions = serde_json::from_str(json).unwrap();
628
629        // Missing record_type should use default (SystemTable)
630        assert_eq!(opts.record_type, SlowQueriesRecordType::SystemTable);
631        assert!(!opts.enable);
632    }
633
634    #[test]
635    fn test_otlp_export_protocol_deserialization_valid_values() {
636        let grpc_json = r#""grpc""#;
637        let protocol: OtlpExportProtocol = serde_json::from_str(grpc_json).unwrap();
638        assert_eq!(protocol, OtlpExportProtocol::Grpc);
639
640        let http_json = r#""http""#;
641        let protocol: OtlpExportProtocol = serde_json::from_str(http_json).unwrap();
642        assert_eq!(protocol, OtlpExportProtocol::Http);
643    }
644}