common_telemetry/
logging.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! logging stuffs, inspired by databend
16use std::env;
17use std::io::IsTerminal;
18use std::sync::{Arc, Mutex, Once};
19use std::time::Duration;
20
21use once_cell::sync::{Lazy, OnceCell};
22use opentelemetry::{global, KeyValue};
23use opentelemetry_otlp::{Protocol, SpanExporterBuilder, WithExportConfig};
24use opentelemetry_sdk::propagation::TraceContextPropagator;
25use opentelemetry_sdk::trace::Sampler;
26use opentelemetry_semantic_conventions::resource;
27use serde::{Deserialize, Serialize};
28use tracing_appender::non_blocking::WorkerGuard;
29use tracing_appender::rolling::{RollingFileAppender, Rotation};
30use tracing_log::LogTracer;
31use tracing_subscriber::filter::{FilterFn, Targets};
32use tracing_subscriber::fmt::Layer;
33use tracing_subscriber::layer::SubscriberExt;
34use tracing_subscriber::prelude::*;
35use tracing_subscriber::{filter, EnvFilter, Registry};
36
37use crate::tracing_sampler::{create_sampler, TracingSampleOptions};
38
39/// The default endpoint when use gRPC exporter protocol.
40pub const DEFAULT_OTLP_GRPC_ENDPOINT: &str = "http://localhost:4317";
41
42/// The default endpoint when use HTTP exporter protocol.
43pub const DEFAULT_OTLP_HTTP_ENDPOINT: &str = "http://localhost:4318";
44
45/// The default logs directory.
46pub const DEFAULT_LOGGING_DIR: &str = "logs";
47
48// Handle for reloading log level
49pub static RELOAD_HANDLE: OnceCell<tracing_subscriber::reload::Handle<Targets, Registry>> =
50    OnceCell::new();
51
52/// The logging options that used to initialize the logger.
53#[derive(Clone, Debug, Serialize, Deserialize)]
54#[serde(default)]
55pub struct LoggingOptions {
56    /// The directory to store log files. If not set, logs will be written to stdout.
57    pub dir: String,
58
59    /// The log level that can be one of "trace", "debug", "info", "warn", "error". Default is "info".
60    pub level: Option<String>,
61
62    /// The log format that can be one of "json" or "text". Default is "text".
63    pub log_format: LogFormat,
64
65    /// The maximum number of log files set by default.
66    pub max_log_files: usize,
67
68    /// Whether to append logs to stdout. Default is true.
69    pub append_stdout: bool,
70
71    /// Whether to enable tracing with OTLP. Default is false.
72    pub enable_otlp_tracing: bool,
73
74    /// The endpoint of OTLP. Default is "http://localhost:4318".
75    pub otlp_endpoint: Option<String>,
76
77    /// The tracing sample ratio.
78    pub tracing_sample_ratio: Option<TracingSampleOptions>,
79
80    /// The protocol of OTLP export.
81    pub otlp_export_protocol: Option<OtlpExportProtocol>,
82}
83
84/// The protocol of OTLP export.
85#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
86#[serde(rename_all = "snake_case")]
87pub enum OtlpExportProtocol {
88    /// GRPC protocol.
89    Grpc,
90
91    /// HTTP protocol with binary protobuf.
92    Http,
93}
94
95/// The options of slow query.
96#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
97pub struct SlowQueryOptions {
98    /// Whether to enable slow query log.
99    pub enable: bool,
100
101    /// The record type of slow queries.
102    pub record_type: SlowQueriesRecordType,
103
104    /// The threshold of slow queries.
105    #[serde(with = "humantime_serde")]
106    pub threshold: Option<Duration>,
107
108    /// The sample ratio of slow queries.
109    pub sample_ratio: Option<f64>,
110
111    /// The table TTL of `slow_queries` system table. Default is "30d".
112    /// It's used when `record_type` is `SystemTable`.
113    pub ttl: Option<String>,
114}
115
116impl Default for SlowQueryOptions {
117    fn default() -> Self {
118        Self {
119            enable: true,
120            record_type: SlowQueriesRecordType::SystemTable,
121            threshold: Some(Duration::from_secs(30)),
122            sample_ratio: Some(1.0),
123            ttl: Some("30d".to_string()),
124        }
125    }
126}
127
128#[derive(Clone, Debug, Serialize, Deserialize, Copy, PartialEq)]
129#[serde(rename_all = "snake_case")]
130pub enum SlowQueriesRecordType {
131    SystemTable,
132    Log,
133}
134
135#[derive(Clone, Debug, Copy, PartialEq, Eq, Serialize, Deserialize)]
136#[serde(rename_all = "snake_case")]
137pub enum LogFormat {
138    Json,
139    Text,
140}
141
142impl PartialEq for LoggingOptions {
143    fn eq(&self, other: &Self) -> bool {
144        self.dir == other.dir
145            && self.level == other.level
146            && self.enable_otlp_tracing == other.enable_otlp_tracing
147            && self.otlp_endpoint == other.otlp_endpoint
148            && self.tracing_sample_ratio == other.tracing_sample_ratio
149            && self.append_stdout == other.append_stdout
150    }
151}
152
153impl Eq for LoggingOptions {}
154
155impl Default for LoggingOptions {
156    fn default() -> Self {
157        Self {
158            // The directory path will be configured at application startup, typically using the data home directory as a base.
159            dir: "".to_string(),
160            level: None,
161            log_format: LogFormat::Text,
162            enable_otlp_tracing: false,
163            otlp_endpoint: None,
164            tracing_sample_ratio: None,
165            append_stdout: true,
166            // Rotation hourly, 24 files per day, keeps info log files of 30 days
167            max_log_files: 720,
168            otlp_export_protocol: None,
169        }
170    }
171}
172
173#[derive(Default, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
174pub struct TracingOptions {
175    #[cfg(feature = "tokio-console")]
176    pub tokio_console_addr: Option<String>,
177}
178
179/// Init tracing for unittest.
180/// Write logs to file `unittest`.
181pub fn init_default_ut_logging() {
182    static START: Once = Once::new();
183
184    START.call_once(|| {
185        let mut g = GLOBAL_UT_LOG_GUARD.as_ref().lock().unwrap();
186
187        // When running in Github's actions, env "UNITTEST_LOG_DIR" is set to a directory other
188        // than "/tmp".
189        // This is to fix the problem that the "/tmp" disk space of action runner's is small,
190        // if we write testing logs in it, actions would fail due to disk out of space error.
191        let dir =
192            env::var("UNITTEST_LOG_DIR").unwrap_or_else(|_| "/tmp/__unittest_logs".to_string());
193
194        let level = env::var("UNITTEST_LOG_LEVEL").unwrap_or_else(|_|
195            "debug,hyper=warn,tower=warn,datafusion=warn,reqwest=warn,sqlparser=warn,h2=info,opendal=info,rskafka=info".to_string()
196        );
197        let opts = LoggingOptions {
198            dir: dir.clone(),
199            level: Some(level),
200            ..Default::default()
201        };
202        *g = Some(init_global_logging(
203            "unittest",
204            &opts,
205            &TracingOptions::default(),
206            None,
207            None,
208        ));
209
210        crate::info!("logs dir = {}", dir);
211    });
212}
213
214static GLOBAL_UT_LOG_GUARD: Lazy<Arc<Mutex<Option<Vec<WorkerGuard>>>>> =
215    Lazy::new(|| Arc::new(Mutex::new(None)));
216
217const DEFAULT_LOG_TARGETS: &str = "info";
218
219#[allow(clippy::print_stdout)]
220pub fn init_global_logging(
221    app_name: &str,
222    opts: &LoggingOptions,
223    tracing_opts: &TracingOptions,
224    node_id: Option<String>,
225    slow_query_opts: Option<&SlowQueryOptions>,
226) -> Vec<WorkerGuard> {
227    static START: Once = Once::new();
228    let mut guards = vec![];
229
230    START.call_once(|| {
231        // Enable log compatible layer to convert log record to tracing span.
232        LogTracer::init().expect("log tracer must be valid");
233
234        // Configure the stdout logging layer.
235        let stdout_logging_layer = if opts.append_stdout {
236            let (writer, guard) = tracing_appender::non_blocking(std::io::stdout());
237            guards.push(guard);
238
239            if opts.log_format == LogFormat::Json {
240                Some(
241                    Layer::new()
242                        .json()
243                        .with_writer(writer)
244                        .with_ansi(std::io::stdout().is_terminal())
245                        .boxed(),
246                )
247            } else {
248                Some(
249                    Layer::new()
250                        .with_writer(writer)
251                        .with_ansi(std::io::stdout().is_terminal())
252                        .boxed(),
253                )
254            }
255        } else {
256            None
257        };
258
259        // Configure the file logging layer with rolling policy.
260        let file_logging_layer = if !opts.dir.is_empty() {
261            let rolling_appender = RollingFileAppender::builder()
262                .rotation(Rotation::HOURLY)
263                .filename_prefix("greptimedb")
264                .max_log_files(opts.max_log_files)
265                .build(&opts.dir)
266                .unwrap_or_else(|e| {
267                    panic!(
268                        "initializing rolling file appender at {} failed: {}",
269                        &opts.dir, e
270                    )
271                });
272            let (writer, guard) = tracing_appender::non_blocking(rolling_appender);
273            guards.push(guard);
274
275            if opts.log_format == LogFormat::Json {
276                Some(
277                    Layer::new()
278                        .json()
279                        .with_writer(writer)
280                        .with_ansi(false)
281                        .boxed(),
282                )
283            } else {
284                Some(Layer::new().with_writer(writer).with_ansi(false).boxed())
285            }
286        } else {
287            None
288        };
289
290        // Configure the error file logging layer with rolling policy.
291        let err_file_logging_layer = if !opts.dir.is_empty() {
292            let rolling_appender = RollingFileAppender::builder()
293                .rotation(Rotation::HOURLY)
294                .filename_prefix("greptimedb-err")
295                .max_log_files(opts.max_log_files)
296                .build(&opts.dir)
297                .unwrap_or_else(|e| {
298                    panic!(
299                        "initializing rolling file appender at {} failed: {}",
300                        &opts.dir, e
301                    )
302                });
303            let (writer, guard) = tracing_appender::non_blocking(rolling_appender);
304            guards.push(guard);
305
306            if opts.log_format == LogFormat::Json {
307                Some(
308                    Layer::new()
309                        .json()
310                        .with_writer(writer)
311                        .with_ansi(false)
312                        .with_filter(filter::LevelFilter::ERROR)
313                        .boxed(),
314                )
315            } else {
316                Some(
317                    Layer::new()
318                        .with_writer(writer)
319                        .with_ansi(false)
320                        .with_filter(filter::LevelFilter::ERROR)
321                        .boxed(),
322                )
323            }
324        } else {
325            None
326        };
327
328        let slow_query_logging_layer = build_slow_query_logger(opts, slow_query_opts, &mut guards);
329
330        // resolve log level settings from:
331        // - options from command line or config files
332        // - environment variable: RUST_LOG
333        // - default settings
334        let filter = opts
335            .level
336            .as_deref()
337            .or(env::var(EnvFilter::DEFAULT_ENV).ok().as_deref())
338            .unwrap_or(DEFAULT_LOG_TARGETS)
339            .parse::<filter::Targets>()
340            .expect("error parsing log level string");
341
342        let (dyn_filter, reload_handle) = tracing_subscriber::reload::Layer::new(filter.clone());
343
344        RELOAD_HANDLE
345            .set(reload_handle)
346            .expect("reload handle already set, maybe init_global_logging get called twice?");
347
348        // Must enable 'tokio_unstable' cfg to use this feature.
349        // For example: `RUSTFLAGS="--cfg tokio_unstable" cargo run -F common-telemetry/console -- standalone start`
350        #[cfg(feature = "tokio-console")]
351        let subscriber = {
352            let tokio_console_layer =
353                if let Some(tokio_console_addr) = &tracing_opts.tokio_console_addr {
354                    let addr: std::net::SocketAddr = tokio_console_addr.parse().unwrap_or_else(|e| {
355                    panic!("Invalid binding address '{tokio_console_addr}' for tokio-console: {e}");
356                });
357                    println!("tokio-console listening on {addr}");
358
359                    Some(
360                        console_subscriber::ConsoleLayer::builder()
361                            .server_addr(addr)
362                            .spawn(),
363                    )
364                } else {
365                    None
366                };
367
368            Registry::default()
369                .with(dyn_filter)
370                .with(tokio_console_layer)
371                .with(stdout_logging_layer)
372                .with(file_logging_layer)
373                .with(err_file_logging_layer)
374                .with(slow_query_logging_layer)
375        };
376
377        // consume the `tracing_opts` to avoid "unused" warnings.
378        let _ = tracing_opts;
379
380        #[cfg(not(feature = "tokio-console"))]
381        let subscriber = Registry::default()
382            .with(dyn_filter)
383            .with(stdout_logging_layer)
384            .with(file_logging_layer)
385            .with(err_file_logging_layer)
386            .with(slow_query_logging_layer);
387
388        if opts.enable_otlp_tracing {
389            global::set_text_map_propagator(TraceContextPropagator::new());
390
391            let sampler = opts
392                .tracing_sample_ratio
393                .as_ref()
394                .map(create_sampler)
395                .map(Sampler::ParentBased)
396                .unwrap_or(Sampler::ParentBased(Box::new(Sampler::AlwaysOn)));
397
398            let trace_config = opentelemetry_sdk::trace::config()
399                .with_sampler(sampler)
400                .with_resource(opentelemetry_sdk::Resource::new(vec![
401                    KeyValue::new(resource::SERVICE_NAME, app_name.to_string()),
402                    KeyValue::new(
403                        resource::SERVICE_INSTANCE_ID,
404                        node_id.unwrap_or("none".to_string()),
405                    ),
406                    KeyValue::new(resource::SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
407                    KeyValue::new(resource::PROCESS_PID, std::process::id().to_string()),
408                ]));
409
410            let tracer = opentelemetry_otlp::new_pipeline()
411                .tracing()
412                .with_exporter(build_otlp_exporter(opts))
413                .with_trace_config(trace_config)
414                .install_batch(opentelemetry_sdk::runtime::Tokio)
415                .expect("otlp tracer install failed");
416
417            tracing::subscriber::set_global_default(
418                subscriber.with(tracing_opentelemetry::layer().with_tracer(tracer)),
419            )
420            .expect("error setting global tracing subscriber");
421        } else {
422            tracing::subscriber::set_global_default(subscriber)
423                .expect("error setting global tracing subscriber");
424        }
425    });
426
427    guards
428}
429
430fn build_otlp_exporter(opts: &LoggingOptions) -> SpanExporterBuilder {
431    let protocol = opts
432        .otlp_export_protocol
433        .clone()
434        .unwrap_or(OtlpExportProtocol::Http);
435
436    let endpoint = opts
437        .otlp_endpoint
438        .as_ref()
439        .map(|e| {
440            if e.starts_with("http") {
441                e.to_string()
442            } else {
443                format!("http://{}", e)
444            }
445        })
446        .unwrap_or_else(|| match protocol {
447            OtlpExportProtocol::Grpc => DEFAULT_OTLP_GRPC_ENDPOINT.to_string(),
448            OtlpExportProtocol::Http => DEFAULT_OTLP_HTTP_ENDPOINT.to_string(),
449        });
450
451    match protocol {
452        OtlpExportProtocol::Grpc => SpanExporterBuilder::Tonic(
453            opentelemetry_otlp::new_exporter()
454                .tonic()
455                .with_endpoint(endpoint),
456        ),
457        OtlpExportProtocol::Http => SpanExporterBuilder::Http(
458            opentelemetry_otlp::new_exporter()
459                .http()
460                .with_endpoint(endpoint)
461                .with_protocol(Protocol::HttpBinary),
462        ),
463    }
464}
465
466fn build_slow_query_logger<S>(
467    opts: &LoggingOptions,
468    slow_query_opts: Option<&SlowQueryOptions>,
469    guards: &mut Vec<WorkerGuard>,
470) -> Option<Box<dyn tracing_subscriber::Layer<S> + Send + Sync + 'static>>
471where
472    S: tracing::Subscriber
473        + Send
474        + 'static
475        + for<'span> tracing_subscriber::registry::LookupSpan<'span>,
476{
477    if let Some(slow_query_opts) = slow_query_opts {
478        if !opts.dir.is_empty()
479            && slow_query_opts.enable
480            && slow_query_opts.record_type == SlowQueriesRecordType::Log
481        {
482            let rolling_appender = RollingFileAppender::builder()
483                .rotation(Rotation::HOURLY)
484                .filename_prefix("greptimedb-slow-queries")
485                .max_log_files(opts.max_log_files)
486                .build(&opts.dir)
487                .unwrap_or_else(|e| {
488                    panic!(
489                        "initializing rolling file appender at {} failed: {}",
490                        &opts.dir, e
491                    )
492                });
493            let (writer, guard) = tracing_appender::non_blocking(rolling_appender);
494            guards.push(guard);
495
496            // Only logs if the field contains "slow".
497            let slow_query_filter = FilterFn::new(|metadata| {
498                metadata
499                    .fields()
500                    .iter()
501                    .any(|field| field.name().contains("slow"))
502            });
503
504            if opts.log_format == LogFormat::Json {
505                Some(
506                    Layer::new()
507                        .json()
508                        .with_writer(writer)
509                        .with_ansi(false)
510                        .with_filter(slow_query_filter)
511                        .boxed(),
512                )
513            } else {
514                Some(
515                    Layer::new()
516                        .with_writer(writer)
517                        .with_ansi(false)
518                        .with_filter(slow_query_filter)
519                        .boxed(),
520                )
521            }
522        } else {
523            None
524        }
525    } else {
526        None
527    }
528}