common_telemetry/
logging.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! logging stuffs, inspired by databend
16use std::env;
17use std::sync::{Arc, Mutex, Once};
18use std::time::Duration;
19
20use once_cell::sync::{Lazy, OnceCell};
21use opentelemetry::{global, KeyValue};
22use opentelemetry_otlp::WithExportConfig;
23use opentelemetry_sdk::propagation::TraceContextPropagator;
24use opentelemetry_sdk::trace::Sampler;
25use opentelemetry_semantic_conventions::resource;
26use serde::{Deserialize, Serialize};
27use tracing_appender::non_blocking::WorkerGuard;
28use tracing_appender::rolling::{RollingFileAppender, Rotation};
29use tracing_log::LogTracer;
30use tracing_subscriber::filter::{FilterFn, Targets};
31use tracing_subscriber::fmt::Layer;
32use tracing_subscriber::layer::SubscriberExt;
33use tracing_subscriber::prelude::*;
34use tracing_subscriber::{filter, EnvFilter, Registry};
35
36use crate::tracing_sampler::{create_sampler, TracingSampleOptions};
37
38pub const DEFAULT_OTLP_ENDPOINT: &str = "http://localhost:4317";
39
40// Handle for reloading log level
41pub static RELOAD_HANDLE: OnceCell<tracing_subscriber::reload::Handle<Targets, Registry>> =
42    OnceCell::new();
43
44/// The logging options that used to initialize the logger.
45#[derive(Clone, Debug, Serialize, Deserialize)]
46#[serde(default)]
47pub struct LoggingOptions {
48    /// The directory to store log files. If not set, logs will be written to stdout.
49    pub dir: String,
50
51    /// The log level that can be one of "trace", "debug", "info", "warn", "error". Default is "info".
52    pub level: Option<String>,
53
54    /// The log format that can be one of "json" or "text". Default is "text".
55    pub log_format: LogFormat,
56
57    /// The maximum number of log files set by default.
58    pub max_log_files: usize,
59
60    /// Whether to append logs to stdout. Default is true.
61    pub append_stdout: bool,
62
63    /// Whether to enable tracing with OTLP. Default is false.
64    pub enable_otlp_tracing: bool,
65
66    /// The endpoint of OTLP. Default is "http://localhost:4317".
67    pub otlp_endpoint: Option<String>,
68
69    /// The tracing sample ratio.
70    pub tracing_sample_ratio: Option<TracingSampleOptions>,
71
72    /// The logging options of slow query.
73    pub slow_query: SlowQueryOptions,
74}
75
76/// The options of slow query.
77#[derive(Clone, Debug, Serialize, Deserialize, Default)]
78#[serde(default)]
79pub struct SlowQueryOptions {
80    /// Whether to enable slow query log.
81    pub enable: bool,
82
83    /// The threshold of slow queries.
84    #[serde(with = "humantime_serde")]
85    pub threshold: Option<Duration>,
86
87    /// The sample ratio of slow queries.
88    pub sample_ratio: Option<f64>,
89}
90
91#[derive(Clone, Debug, Copy, PartialEq, Eq, Serialize, Deserialize)]
92#[serde(rename_all = "snake_case")]
93pub enum LogFormat {
94    Json,
95    Text,
96}
97
98impl PartialEq for LoggingOptions {
99    fn eq(&self, other: &Self) -> bool {
100        self.dir == other.dir
101            && self.level == other.level
102            && self.enable_otlp_tracing == other.enable_otlp_tracing
103            && self.otlp_endpoint == other.otlp_endpoint
104            && self.tracing_sample_ratio == other.tracing_sample_ratio
105            && self.append_stdout == other.append_stdout
106    }
107}
108
109impl Eq for LoggingOptions {}
110
111impl Default for LoggingOptions {
112    fn default() -> Self {
113        Self {
114            dir: "./greptimedb_data/logs".to_string(),
115            level: None,
116            log_format: LogFormat::Text,
117            enable_otlp_tracing: false,
118            otlp_endpoint: None,
119            tracing_sample_ratio: None,
120            append_stdout: true,
121            slow_query: SlowQueryOptions::default(),
122            // Rotation hourly, 24 files per day, keeps info log files of 30 days
123            max_log_files: 720,
124        }
125    }
126}
127
128#[derive(Default, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
129pub struct TracingOptions {
130    #[cfg(feature = "tokio-console")]
131    pub tokio_console_addr: Option<String>,
132}
133
134/// Init tracing for unittest.
135/// Write logs to file `unittest`.
136pub fn init_default_ut_logging() {
137    static START: Once = Once::new();
138
139    START.call_once(|| {
140        let mut g = GLOBAL_UT_LOG_GUARD.as_ref().lock().unwrap();
141
142        // When running in Github's actions, env "UNITTEST_LOG_DIR" is set to a directory other
143        // than "/tmp".
144        // This is to fix the problem that the "/tmp" disk space of action runner's is small,
145        // if we write testing logs in it, actions would fail due to disk out of space error.
146        let dir =
147            env::var("UNITTEST_LOG_DIR").unwrap_or_else(|_| "/tmp/__unittest_logs".to_string());
148
149        let level = env::var("UNITTEST_LOG_LEVEL").unwrap_or_else(|_|
150            "debug,hyper=warn,tower=warn,datafusion=warn,reqwest=warn,sqlparser=warn,h2=info,opendal=info,rskafka=info".to_string()
151        );
152        let opts = LoggingOptions {
153            dir: dir.clone(),
154            level: Some(level),
155            ..Default::default()
156        };
157        *g = Some(init_global_logging(
158            "unittest",
159            &opts,
160            &TracingOptions::default(),
161            None
162        ));
163
164        crate::info!("logs dir = {}", dir);
165    });
166}
167
168static GLOBAL_UT_LOG_GUARD: Lazy<Arc<Mutex<Option<Vec<WorkerGuard>>>>> =
169    Lazy::new(|| Arc::new(Mutex::new(None)));
170
171const DEFAULT_LOG_TARGETS: &str = "info";
172
173#[allow(clippy::print_stdout)]
174pub fn init_global_logging(
175    app_name: &str,
176    opts: &LoggingOptions,
177    tracing_opts: &TracingOptions,
178    node_id: Option<String>,
179) -> Vec<WorkerGuard> {
180    static START: Once = Once::new();
181    let mut guards = vec![];
182
183    START.call_once(|| {
184        // Enable log compatible layer to convert log record to tracing span.
185        LogTracer::init().expect("log tracer must be valid");
186
187        // Configure the stdout logging layer.
188        let stdout_logging_layer = if opts.append_stdout {
189            let (writer, guard) = tracing_appender::non_blocking(std::io::stdout());
190            guards.push(guard);
191
192            if opts.log_format == LogFormat::Json {
193                Some(
194                    Layer::new()
195                        .json()
196                        .with_writer(writer)
197                        .with_ansi(atty::is(atty::Stream::Stdout))
198                        .boxed(),
199                )
200            } else {
201                Some(
202                    Layer::new()
203                        .with_writer(writer)
204                        .with_ansi(atty::is(atty::Stream::Stdout))
205                        .boxed(),
206                )
207            }
208        } else {
209            None
210        };
211
212        // Configure the file logging layer with rolling policy.
213        let file_logging_layer = if !opts.dir.is_empty() {
214            let rolling_appender = RollingFileAppender::builder()
215                .rotation(Rotation::HOURLY)
216                .filename_prefix("greptimedb")
217                .max_log_files(opts.max_log_files)
218                .build(&opts.dir)
219                .unwrap_or_else(|e| {
220                    panic!(
221                        "initializing rolling file appender at {} failed: {}",
222                        &opts.dir, e
223                    )
224                });
225            let (writer, guard) = tracing_appender::non_blocking(rolling_appender);
226            guards.push(guard);
227
228            if opts.log_format == LogFormat::Json {
229                Some(
230                    Layer::new()
231                        .json()
232                        .with_writer(writer)
233                        .with_ansi(false)
234                        .boxed(),
235                )
236            } else {
237                Some(Layer::new().with_writer(writer).with_ansi(false).boxed())
238            }
239        } else {
240            None
241        };
242
243        // Configure the error file logging layer with rolling policy.
244        let err_file_logging_layer = if !opts.dir.is_empty() {
245            let rolling_appender = RollingFileAppender::builder()
246                .rotation(Rotation::HOURLY)
247                .filename_prefix("greptimedb-err")
248                .max_log_files(opts.max_log_files)
249                .build(&opts.dir)
250                .unwrap_or_else(|e| {
251                    panic!(
252                        "initializing rolling file appender at {} failed: {}",
253                        &opts.dir, e
254                    )
255                });
256            let (writer, guard) = tracing_appender::non_blocking(rolling_appender);
257            guards.push(guard);
258
259            if opts.log_format == LogFormat::Json {
260                Some(
261                    Layer::new()
262                        .json()
263                        .with_writer(writer)
264                        .with_ansi(false)
265                        .with_filter(filter::LevelFilter::ERROR)
266                        .boxed(),
267                )
268            } else {
269                Some(
270                    Layer::new()
271                        .with_writer(writer)
272                        .with_ansi(false)
273                        .with_filter(filter::LevelFilter::ERROR)
274                        .boxed(),
275                )
276            }
277        } else {
278            None
279        };
280
281        let slow_query_logging_layer = if !opts.dir.is_empty() && opts.slow_query.enable {
282            let rolling_appender = RollingFileAppender::builder()
283                .rotation(Rotation::HOURLY)
284                .filename_prefix("greptimedb-slow-queries")
285                .max_log_files(opts.max_log_files)
286                .build(&opts.dir)
287                .unwrap_or_else(|e| {
288                    panic!(
289                        "initializing rolling file appender at {} failed: {}",
290                        &opts.dir, e
291                    )
292                });
293            let (writer, guard) = tracing_appender::non_blocking(rolling_appender);
294            guards.push(guard);
295
296            // Only logs if the field contains "slow".
297            let slow_query_filter = FilterFn::new(|metadata| {
298                metadata
299                    .fields()
300                    .iter()
301                    .any(|field| field.name().contains("slow"))
302            });
303
304            if opts.log_format == LogFormat::Json {
305                Some(
306                    Layer::new()
307                        .json()
308                        .with_writer(writer)
309                        .with_ansi(false)
310                        .with_filter(slow_query_filter)
311                        .boxed(),
312                )
313            } else {
314                Some(
315                    Layer::new()
316                        .with_writer(writer)
317                        .with_ansi(false)
318                        .with_filter(slow_query_filter)
319                        .boxed(),
320                )
321            }
322        } else {
323            None
324        };
325
326        // resolve log level settings from:
327        // - options from command line or config files
328        // - environment variable: RUST_LOG
329        // - default settings
330        let filter = opts
331            .level
332            .as_deref()
333            .or(env::var(EnvFilter::DEFAULT_ENV).ok().as_deref())
334            .unwrap_or(DEFAULT_LOG_TARGETS)
335            .parse::<filter::Targets>()
336            .expect("error parsing log level string");
337
338        let (dyn_filter, reload_handle) = tracing_subscriber::reload::Layer::new(filter.clone());
339
340        RELOAD_HANDLE
341            .set(reload_handle)
342            .expect("reload handle already set, maybe init_global_logging get called twice?");
343
344        // Must enable 'tokio_unstable' cfg to use this feature.
345        // For example: `RUSTFLAGS="--cfg tokio_unstable" cargo run -F common-telemetry/console -- standalone start`
346        #[cfg(feature = "tokio-console")]
347        let subscriber = {
348            let tokio_console_layer =
349                if let Some(tokio_console_addr) = &tracing_opts.tokio_console_addr {
350                    let addr: std::net::SocketAddr = tokio_console_addr.parse().unwrap_or_else(|e| {
351                    panic!("Invalid binding address '{tokio_console_addr}' for tokio-console: {e}");
352                });
353                    println!("tokio-console listening on {addr}");
354
355                    Some(
356                        console_subscriber::ConsoleLayer::builder()
357                            .server_addr(addr)
358                            .spawn(),
359                    )
360                } else {
361                    None
362                };
363
364            Registry::default()
365                .with(dyn_filter)
366                .with(tokio_console_layer)
367                .with(stdout_logging_layer)
368                .with(file_logging_layer)
369                .with(err_file_logging_layer)
370                .with(slow_query_logging_layer)
371        };
372
373        // consume the `tracing_opts` to avoid "unused" warnings.
374        let _ = tracing_opts;
375
376        #[cfg(not(feature = "tokio-console"))]
377        let subscriber = Registry::default()
378            .with(dyn_filter)
379            .with(stdout_logging_layer)
380            .with(file_logging_layer)
381            .with(err_file_logging_layer)
382            .with(slow_query_logging_layer);
383
384        if opts.enable_otlp_tracing {
385            global::set_text_map_propagator(TraceContextPropagator::new());
386
387            let sampler = opts
388                .tracing_sample_ratio
389                .as_ref()
390                .map(create_sampler)
391                .map(Sampler::ParentBased)
392                .unwrap_or(Sampler::ParentBased(Box::new(Sampler::AlwaysOn)));
393
394            let trace_config = opentelemetry_sdk::trace::config()
395                .with_sampler(sampler)
396                .with_resource(opentelemetry_sdk::Resource::new(vec![
397                    KeyValue::new(resource::SERVICE_NAME, app_name.to_string()),
398                    KeyValue::new(
399                        resource::SERVICE_INSTANCE_ID,
400                        node_id.unwrap_or("none".to_string()),
401                    ),
402                    KeyValue::new(resource::SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
403                    KeyValue::new(resource::PROCESS_PID, std::process::id().to_string()),
404                ]));
405
406            let exporter = opentelemetry_otlp::new_exporter().tonic().with_endpoint(
407                opts.otlp_endpoint
408                    .as_ref()
409                    .map(|e| {
410                        if e.starts_with("http") {
411                            e.to_string()
412                        } else {
413                            format!("http://{}", e)
414                        }
415                    })
416                    .unwrap_or(DEFAULT_OTLP_ENDPOINT.to_string()),
417            );
418
419            let tracer = opentelemetry_otlp::new_pipeline()
420                .tracing()
421                .with_exporter(exporter)
422                .with_trace_config(trace_config)
423                .install_batch(opentelemetry_sdk::runtime::Tokio)
424                .expect("otlp tracer install failed");
425
426            tracing::subscriber::set_global_default(
427                subscriber.with(tracing_opentelemetry::layer().with_tracer(tracer)),
428            )
429            .expect("error setting global tracing subscriber");
430        } else {
431            tracing::subscriber::set_global_default(subscriber)
432                .expect("error setting global tracing subscriber");
433        }
434    });
435
436    guards
437}