common_telemetry/
logging.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! logging stuffs, inspired by databend
16use std::env;
17use std::sync::{Arc, Mutex, Once};
18use std::time::Duration;
19
20use once_cell::sync::{Lazy, OnceCell};
21use opentelemetry::{global, KeyValue};
22use opentelemetry_otlp::WithExportConfig;
23use opentelemetry_sdk::propagation::TraceContextPropagator;
24use opentelemetry_sdk::trace::Sampler;
25use opentelemetry_semantic_conventions::resource;
26use serde::{Deserialize, Serialize};
27use tracing_appender::non_blocking::WorkerGuard;
28use tracing_appender::rolling::{RollingFileAppender, Rotation};
29use tracing_log::LogTracer;
30use tracing_subscriber::filter::{FilterFn, Targets};
31use tracing_subscriber::fmt::Layer;
32use tracing_subscriber::layer::SubscriberExt;
33use tracing_subscriber::prelude::*;
34use tracing_subscriber::{filter, EnvFilter, Registry};
35
36use crate::tracing_sampler::{create_sampler, TracingSampleOptions};
37
38pub const DEFAULT_OTLP_ENDPOINT: &str = "http://localhost:4317";
39
40// Handle for reloading log level
41pub static RELOAD_HANDLE: OnceCell<tracing_subscriber::reload::Handle<Targets, Registry>> =
42    OnceCell::new();
43
44/// The logging options that used to initialize the logger.
45#[derive(Clone, Debug, Serialize, Deserialize)]
46#[serde(default)]
47pub struct LoggingOptions {
48    /// The directory to store log files. If not set, logs will be written to stdout.
49    pub dir: String,
50
51    /// The log level that can be one of "trace", "debug", "info", "warn", "error". Default is "info".
52    pub level: Option<String>,
53
54    /// The log format that can be one of "json" or "text". Default is "text".
55    pub log_format: LogFormat,
56
57    /// The maximum number of log files set by default.
58    pub max_log_files: usize,
59
60    /// Whether to append logs to stdout. Default is true.
61    pub append_stdout: bool,
62
63    /// Whether to enable tracing with OTLP. Default is false.
64    pub enable_otlp_tracing: bool,
65
66    /// The endpoint of OTLP. Default is "http://localhost:4317".
67    pub otlp_endpoint: Option<String>,
68
69    /// The tracing sample ratio.
70    pub tracing_sample_ratio: Option<TracingSampleOptions>,
71}
72
73/// The options of slow query.
74#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
75pub struct SlowQueryOptions {
76    /// Whether to enable slow query log.
77    pub enable: bool,
78
79    /// The record type of slow queries.
80    pub record_type: SlowQueriesRecordType,
81
82    /// The threshold of slow queries.
83    #[serde(with = "humantime_serde")]
84    pub threshold: Option<Duration>,
85
86    /// The sample ratio of slow queries.
87    pub sample_ratio: Option<f64>,
88
89    /// The table TTL of `slow_queries` system table. Default is "30d".
90    /// It's used when `record_type` is `SystemTable`.
91    pub ttl: Option<String>,
92}
93
94impl Default for SlowQueryOptions {
95    fn default() -> Self {
96        Self {
97            enable: true,
98            record_type: SlowQueriesRecordType::SystemTable,
99            threshold: Some(Duration::from_secs(30)),
100            sample_ratio: Some(1.0),
101            ttl: Some("30d".to_string()),
102        }
103    }
104}
105
106#[derive(Clone, Debug, Serialize, Deserialize, Copy, PartialEq)]
107#[serde(rename_all = "snake_case")]
108pub enum SlowQueriesRecordType {
109    SystemTable,
110    Log,
111}
112
113#[derive(Clone, Debug, Copy, PartialEq, Eq, Serialize, Deserialize)]
114#[serde(rename_all = "snake_case")]
115pub enum LogFormat {
116    Json,
117    Text,
118}
119
120impl PartialEq for LoggingOptions {
121    fn eq(&self, other: &Self) -> bool {
122        self.dir == other.dir
123            && self.level == other.level
124            && self.enable_otlp_tracing == other.enable_otlp_tracing
125            && self.otlp_endpoint == other.otlp_endpoint
126            && self.tracing_sample_ratio == other.tracing_sample_ratio
127            && self.append_stdout == other.append_stdout
128    }
129}
130
131impl Eq for LoggingOptions {}
132
133impl Default for LoggingOptions {
134    fn default() -> Self {
135        Self {
136            dir: "./greptimedb_data/logs".to_string(),
137            level: None,
138            log_format: LogFormat::Text,
139            enable_otlp_tracing: false,
140            otlp_endpoint: None,
141            tracing_sample_ratio: None,
142            append_stdout: true,
143            // Rotation hourly, 24 files per day, keeps info log files of 30 days
144            max_log_files: 720,
145        }
146    }
147}
148
149#[derive(Default, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
150pub struct TracingOptions {
151    #[cfg(feature = "tokio-console")]
152    pub tokio_console_addr: Option<String>,
153}
154
155/// Init tracing for unittest.
156/// Write logs to file `unittest`.
157pub fn init_default_ut_logging() {
158    static START: Once = Once::new();
159
160    START.call_once(|| {
161        let mut g = GLOBAL_UT_LOG_GUARD.as_ref().lock().unwrap();
162
163        // When running in Github's actions, env "UNITTEST_LOG_DIR" is set to a directory other
164        // than "/tmp".
165        // This is to fix the problem that the "/tmp" disk space of action runner's is small,
166        // if we write testing logs in it, actions would fail due to disk out of space error.
167        let dir =
168            env::var("UNITTEST_LOG_DIR").unwrap_or_else(|_| "/tmp/__unittest_logs".to_string());
169
170        let level = env::var("UNITTEST_LOG_LEVEL").unwrap_or_else(|_|
171            "debug,hyper=warn,tower=warn,datafusion=warn,reqwest=warn,sqlparser=warn,h2=info,opendal=info,rskafka=info".to_string()
172        );
173        let opts = LoggingOptions {
174            dir: dir.clone(),
175            level: Some(level),
176            ..Default::default()
177        };
178        *g = Some(init_global_logging(
179            "unittest",
180            &opts,
181            &TracingOptions::default(),
182            None,
183            None,
184        ));
185
186        crate::info!("logs dir = {}", dir);
187    });
188}
189
190static GLOBAL_UT_LOG_GUARD: Lazy<Arc<Mutex<Option<Vec<WorkerGuard>>>>> =
191    Lazy::new(|| Arc::new(Mutex::new(None)));
192
193const DEFAULT_LOG_TARGETS: &str = "info";
194
195#[allow(clippy::print_stdout)]
196pub fn init_global_logging(
197    app_name: &str,
198    opts: &LoggingOptions,
199    tracing_opts: &TracingOptions,
200    node_id: Option<String>,
201    slow_query_opts: Option<&SlowQueryOptions>,
202) -> Vec<WorkerGuard> {
203    static START: Once = Once::new();
204    let mut guards = vec![];
205
206    START.call_once(|| {
207        // Enable log compatible layer to convert log record to tracing span.
208        LogTracer::init().expect("log tracer must be valid");
209
210        // Configure the stdout logging layer.
211        let stdout_logging_layer = if opts.append_stdout {
212            let (writer, guard) = tracing_appender::non_blocking(std::io::stdout());
213            guards.push(guard);
214
215            if opts.log_format == LogFormat::Json {
216                Some(
217                    Layer::new()
218                        .json()
219                        .with_writer(writer)
220                        .with_ansi(atty::is(atty::Stream::Stdout))
221                        .boxed(),
222                )
223            } else {
224                Some(
225                    Layer::new()
226                        .with_writer(writer)
227                        .with_ansi(atty::is(atty::Stream::Stdout))
228                        .boxed(),
229                )
230            }
231        } else {
232            None
233        };
234
235        // Configure the file logging layer with rolling policy.
236        let file_logging_layer = if !opts.dir.is_empty() {
237            let rolling_appender = RollingFileAppender::builder()
238                .rotation(Rotation::HOURLY)
239                .filename_prefix("greptimedb")
240                .max_log_files(opts.max_log_files)
241                .build(&opts.dir)
242                .unwrap_or_else(|e| {
243                    panic!(
244                        "initializing rolling file appender at {} failed: {}",
245                        &opts.dir, e
246                    )
247                });
248            let (writer, guard) = tracing_appender::non_blocking(rolling_appender);
249            guards.push(guard);
250
251            if opts.log_format == LogFormat::Json {
252                Some(
253                    Layer::new()
254                        .json()
255                        .with_writer(writer)
256                        .with_ansi(false)
257                        .boxed(),
258                )
259            } else {
260                Some(Layer::new().with_writer(writer).with_ansi(false).boxed())
261            }
262        } else {
263            None
264        };
265
266        // Configure the error file logging layer with rolling policy.
267        let err_file_logging_layer = if !opts.dir.is_empty() {
268            let rolling_appender = RollingFileAppender::builder()
269                .rotation(Rotation::HOURLY)
270                .filename_prefix("greptimedb-err")
271                .max_log_files(opts.max_log_files)
272                .build(&opts.dir)
273                .unwrap_or_else(|e| {
274                    panic!(
275                        "initializing rolling file appender at {} failed: {}",
276                        &opts.dir, e
277                    )
278                });
279            let (writer, guard) = tracing_appender::non_blocking(rolling_appender);
280            guards.push(guard);
281
282            if opts.log_format == LogFormat::Json {
283                Some(
284                    Layer::new()
285                        .json()
286                        .with_writer(writer)
287                        .with_ansi(false)
288                        .with_filter(filter::LevelFilter::ERROR)
289                        .boxed(),
290                )
291            } else {
292                Some(
293                    Layer::new()
294                        .with_writer(writer)
295                        .with_ansi(false)
296                        .with_filter(filter::LevelFilter::ERROR)
297                        .boxed(),
298                )
299            }
300        } else {
301            None
302        };
303
304        let slow_query_logging_layer = build_slow_query_logger(opts, slow_query_opts, &mut guards);
305
306        // resolve log level settings from:
307        // - options from command line or config files
308        // - environment variable: RUST_LOG
309        // - default settings
310        let filter = opts
311            .level
312            .as_deref()
313            .or(env::var(EnvFilter::DEFAULT_ENV).ok().as_deref())
314            .unwrap_or(DEFAULT_LOG_TARGETS)
315            .parse::<filter::Targets>()
316            .expect("error parsing log level string");
317
318        let (dyn_filter, reload_handle) = tracing_subscriber::reload::Layer::new(filter.clone());
319
320        RELOAD_HANDLE
321            .set(reload_handle)
322            .expect("reload handle already set, maybe init_global_logging get called twice?");
323
324        // Must enable 'tokio_unstable' cfg to use this feature.
325        // For example: `RUSTFLAGS="--cfg tokio_unstable" cargo run -F common-telemetry/console -- standalone start`
326        #[cfg(feature = "tokio-console")]
327        let subscriber = {
328            let tokio_console_layer =
329                if let Some(tokio_console_addr) = &tracing_opts.tokio_console_addr {
330                    let addr: std::net::SocketAddr = tokio_console_addr.parse().unwrap_or_else(|e| {
331                    panic!("Invalid binding address '{tokio_console_addr}' for tokio-console: {e}");
332                });
333                    println!("tokio-console listening on {addr}");
334
335                    Some(
336                        console_subscriber::ConsoleLayer::builder()
337                            .server_addr(addr)
338                            .spawn(),
339                    )
340                } else {
341                    None
342                };
343
344            Registry::default()
345                .with(dyn_filter)
346                .with(tokio_console_layer)
347                .with(stdout_logging_layer)
348                .with(file_logging_layer)
349                .with(err_file_logging_layer)
350                .with(slow_query_logging_layer)
351        };
352
353        // consume the `tracing_opts` to avoid "unused" warnings.
354        let _ = tracing_opts;
355
356        #[cfg(not(feature = "tokio-console"))]
357        let subscriber = Registry::default()
358            .with(dyn_filter)
359            .with(stdout_logging_layer)
360            .with(file_logging_layer)
361            .with(err_file_logging_layer)
362            .with(slow_query_logging_layer);
363
364        if opts.enable_otlp_tracing {
365            global::set_text_map_propagator(TraceContextPropagator::new());
366
367            let sampler = opts
368                .tracing_sample_ratio
369                .as_ref()
370                .map(create_sampler)
371                .map(Sampler::ParentBased)
372                .unwrap_or(Sampler::ParentBased(Box::new(Sampler::AlwaysOn)));
373
374            let trace_config = opentelemetry_sdk::trace::config()
375                .with_sampler(sampler)
376                .with_resource(opentelemetry_sdk::Resource::new(vec![
377                    KeyValue::new(resource::SERVICE_NAME, app_name.to_string()),
378                    KeyValue::new(
379                        resource::SERVICE_INSTANCE_ID,
380                        node_id.unwrap_or("none".to_string()),
381                    ),
382                    KeyValue::new(resource::SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
383                    KeyValue::new(resource::PROCESS_PID, std::process::id().to_string()),
384                ]));
385
386            let exporter = opentelemetry_otlp::new_exporter().tonic().with_endpoint(
387                opts.otlp_endpoint
388                    .as_ref()
389                    .map(|e| {
390                        if e.starts_with("http") {
391                            e.to_string()
392                        } else {
393                            format!("http://{}", e)
394                        }
395                    })
396                    .unwrap_or(DEFAULT_OTLP_ENDPOINT.to_string()),
397            );
398
399            let tracer = opentelemetry_otlp::new_pipeline()
400                .tracing()
401                .with_exporter(exporter)
402                .with_trace_config(trace_config)
403                .install_batch(opentelemetry_sdk::runtime::Tokio)
404                .expect("otlp tracer install failed");
405
406            tracing::subscriber::set_global_default(
407                subscriber.with(tracing_opentelemetry::layer().with_tracer(tracer)),
408            )
409            .expect("error setting global tracing subscriber");
410        } else {
411            tracing::subscriber::set_global_default(subscriber)
412                .expect("error setting global tracing subscriber");
413        }
414    });
415
416    guards
417}
418
419fn build_slow_query_logger<S>(
420    opts: &LoggingOptions,
421    slow_query_opts: Option<&SlowQueryOptions>,
422    guards: &mut Vec<WorkerGuard>,
423) -> Option<Box<dyn tracing_subscriber::Layer<S> + Send + Sync + 'static>>
424where
425    S: tracing::Subscriber
426        + Send
427        + 'static
428        + for<'span> tracing_subscriber::registry::LookupSpan<'span>,
429{
430    if let Some(slow_query_opts) = slow_query_opts {
431        if !opts.dir.is_empty()
432            && slow_query_opts.enable
433            && slow_query_opts.record_type == SlowQueriesRecordType::Log
434        {
435            let rolling_appender = RollingFileAppender::builder()
436                .rotation(Rotation::HOURLY)
437                .filename_prefix("greptimedb-slow-queries")
438                .max_log_files(opts.max_log_files)
439                .build(&opts.dir)
440                .unwrap_or_else(|e| {
441                    panic!(
442                        "initializing rolling file appender at {} failed: {}",
443                        &opts.dir, e
444                    )
445                });
446            let (writer, guard) = tracing_appender::non_blocking(rolling_appender);
447            guards.push(guard);
448
449            // Only logs if the field contains "slow".
450            let slow_query_filter = FilterFn::new(|metadata| {
451                metadata
452                    .fields()
453                    .iter()
454                    .any(|field| field.name().contains("slow"))
455            });
456
457            if opts.log_format == LogFormat::Json {
458                Some(
459                    Layer::new()
460                        .json()
461                        .with_writer(writer)
462                        .with_ansi(false)
463                        .with_filter(slow_query_filter)
464                        .boxed(),
465                )
466            } else {
467                Some(
468                    Layer::new()
469                        .with_writer(writer)
470                        .with_ansi(false)
471                        .with_filter(slow_query_filter)
472                        .boxed(),
473                )
474            }
475        } else {
476            None
477        }
478    } else {
479        None
480    }
481}