common_telemetry/
logging.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! logging stuffs, inspired by databend
16use std::collections::HashMap;
17use std::env;
18use std::io::IsTerminal;
19use std::sync::{Arc, Mutex, Once, RwLock};
20use std::time::Duration;
21
22use common_base::serde::empty_string_as_default;
23use once_cell::sync::{Lazy, OnceCell};
24use opentelemetry::trace::TracerProvider;
25use opentelemetry::{KeyValue, global};
26use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig, WithHttpConfig};
27use opentelemetry_sdk::propagation::TraceContextPropagator;
28use opentelemetry_sdk::trace::{Sampler, Tracer};
29use opentelemetry_semantic_conventions::resource;
30use serde::{Deserialize, Serialize};
31use tracing::callsite;
32use tracing::metadata::LevelFilter;
33use tracing_appender::non_blocking::WorkerGuard;
34use tracing_appender::rolling::{RollingFileAppender, Rotation};
35use tracing_log::LogTracer;
36use tracing_subscriber::filter::{FilterFn, Targets};
37use tracing_subscriber::fmt::Layer;
38use tracing_subscriber::layer::{Layered, SubscriberExt};
39use tracing_subscriber::prelude::*;
40use tracing_subscriber::{EnvFilter, Registry, filter};
41
42use crate::tracing_sampler::{TracingSampleOptions, create_sampler};
43
44/// The default endpoint when use gRPC exporter protocol.
45pub const DEFAULT_OTLP_GRPC_ENDPOINT: &str = "http://localhost:4317";
46
47/// The default endpoint when use HTTP exporter protocol.
48pub const DEFAULT_OTLP_HTTP_ENDPOINT: &str = "http://localhost:4318/v1/traces";
49
50/// The default logs directory.
51pub const DEFAULT_LOGGING_DIR: &str = "logs";
52
53/// Handle for reloading log level
54pub static LOG_RELOAD_HANDLE: OnceCell<tracing_subscriber::reload::Handle<Targets, Registry>> =
55    OnceCell::new();
56
57type DynSubscriber = Layered<tracing_subscriber::reload::Layer<Targets, Registry>, Registry>;
58type OtelTraceLayer = tracing_opentelemetry::OpenTelemetryLayer<DynSubscriber, Tracer>;
59
60#[derive(Clone)]
61pub struct TraceReloadHandle {
62    inner: Arc<RwLock<Option<OtelTraceLayer>>>,
63}
64
65impl TraceReloadHandle {
66    fn new(inner: Arc<RwLock<Option<OtelTraceLayer>>>) -> Self {
67        Self { inner }
68    }
69
70    pub fn reload(&self, new_layer: Option<OtelTraceLayer>) {
71        let mut guard = self.inner.write().unwrap();
72        *guard = new_layer;
73        drop(guard);
74
75        callsite::rebuild_interest_cache();
76    }
77}
78
79/// A tracing layer that can be dynamically reloaded.
80///
81/// Mostly copied from [`tracing_subscriber::reload::Layer`].
82struct TraceLayer {
83    inner: Arc<RwLock<Option<OtelTraceLayer>>>,
84}
85
86impl TraceLayer {
87    fn new(initial: Option<OtelTraceLayer>) -> (Self, TraceReloadHandle) {
88        let inner = Arc::new(RwLock::new(initial));
89        (
90            Self {
91                inner: inner.clone(),
92            },
93            TraceReloadHandle::new(inner),
94        )
95    }
96
97    fn with_layer<R>(&self, f: impl FnOnce(&OtelTraceLayer) -> R) -> Option<R> {
98        self.inner
99            .read()
100            .ok()
101            .and_then(|guard| guard.as_ref().map(f))
102    }
103
104    fn with_layer_mut<R>(&self, f: impl FnOnce(&mut OtelTraceLayer) -> R) -> Option<R> {
105        self.inner
106            .write()
107            .ok()
108            .and_then(|mut guard| guard.as_mut().map(f))
109    }
110}
111
112impl tracing_subscriber::Layer<DynSubscriber> for TraceLayer {
113    fn on_register_dispatch(&self, subscriber: &tracing::Dispatch) {
114        let _ = self.with_layer(|layer| layer.on_register_dispatch(subscriber));
115    }
116
117    fn on_layer(&mut self, subscriber: &mut DynSubscriber) {
118        let _ = self.with_layer_mut(|layer| layer.on_layer(subscriber));
119    }
120
121    fn register_callsite(
122        &self,
123        metadata: &'static tracing::Metadata<'static>,
124    ) -> tracing::subscriber::Interest {
125        self.with_layer(|layer| layer.register_callsite(metadata))
126            .unwrap_or_else(tracing::subscriber::Interest::always)
127    }
128
129    fn enabled(
130        &self,
131        metadata: &tracing::Metadata<'_>,
132        ctx: tracing_subscriber::layer::Context<'_, DynSubscriber>,
133    ) -> bool {
134        self.with_layer(|layer| layer.enabled(metadata, ctx))
135            .unwrap_or(true)
136    }
137
138    fn on_new_span(
139        &self,
140        attrs: &tracing::span::Attributes<'_>,
141        id: &tracing::span::Id,
142        ctx: tracing_subscriber::layer::Context<'_, DynSubscriber>,
143    ) {
144        let _ = self.with_layer(|layer| layer.on_new_span(attrs, id, ctx));
145    }
146
147    fn max_level_hint(&self) -> Option<LevelFilter> {
148        self.with_layer(|layer| layer.max_level_hint()).flatten()
149    }
150
151    fn on_record(
152        &self,
153        span: &tracing::span::Id,
154        values: &tracing::span::Record<'_>,
155        ctx: tracing_subscriber::layer::Context<'_, DynSubscriber>,
156    ) {
157        let _ = self.with_layer(|layer| layer.on_record(span, values, ctx));
158    }
159
160    fn on_follows_from(
161        &self,
162        span: &tracing::span::Id,
163        follows: &tracing::span::Id,
164        ctx: tracing_subscriber::layer::Context<'_, DynSubscriber>,
165    ) {
166        let _ = self.with_layer(|layer| layer.on_follows_from(span, follows, ctx));
167    }
168
169    fn event_enabled(
170        &self,
171        event: &tracing::Event<'_>,
172        ctx: tracing_subscriber::layer::Context<'_, DynSubscriber>,
173    ) -> bool {
174        self.with_layer(|layer| layer.event_enabled(event, ctx))
175            .unwrap_or(true)
176    }
177
178    fn on_event(
179        &self,
180        event: &tracing::Event<'_>,
181        ctx: tracing_subscriber::layer::Context<'_, DynSubscriber>,
182    ) {
183        let _ = self.with_layer(|layer| layer.on_event(event, ctx));
184    }
185
186    fn on_enter(
187        &self,
188        id: &tracing::span::Id,
189        ctx: tracing_subscriber::layer::Context<'_, DynSubscriber>,
190    ) {
191        let _ = self.with_layer(|layer| layer.on_enter(id, ctx));
192    }
193
194    fn on_exit(
195        &self,
196        id: &tracing::span::Id,
197        ctx: tracing_subscriber::layer::Context<'_, DynSubscriber>,
198    ) {
199        let _ = self.with_layer(|layer| layer.on_exit(id, ctx));
200    }
201
202    fn on_close(
203        &self,
204        id: tracing::span::Id,
205        ctx: tracing_subscriber::layer::Context<'_, DynSubscriber>,
206    ) {
207        let _ = self.with_layer(|layer| layer.on_close(id, ctx));
208    }
209
210    fn on_id_change(
211        &self,
212        old: &tracing::span::Id,
213        new: &tracing::span::Id,
214        ctx: tracing_subscriber::layer::Context<'_, DynSubscriber>,
215    ) {
216        let _ = self.with_layer(|layer| layer.on_id_change(old, new, ctx));
217    }
218
219    unsafe fn downcast_raw(&self, id: std::any::TypeId) -> Option<*const ()> {
220        self.inner.read().ok().and_then(|guard| {
221            guard
222                .as_ref()
223                .and_then(|layer| unsafe { layer.downcast_raw(id) })
224        })
225    }
226}
227
228/// Handle for reloading trace level
229pub static TRACE_RELOAD_HANDLE: OnceCell<TraceReloadHandle> = OnceCell::new();
230
231static TRACER: OnceCell<Mutex<TraceState>> = OnceCell::new();
232
233#[derive(Debug)]
234enum TraceState {
235    Ready(Tracer),
236    Deferred(TraceContext),
237}
238
239/// The logging options that used to initialize the logger.
240#[derive(Clone, Debug, Serialize, Deserialize)]
241#[serde(default)]
242pub struct LoggingOptions {
243    /// The directory to store log files. If not set, logs will be written to stdout.
244    pub dir: String,
245
246    /// The log level that can be one of "trace", "debug", "info", "warn", "error". Default is "info".
247    pub level: Option<String>,
248
249    /// The log format that can be one of "json" or "text". Default is "text".
250    #[serde(default, deserialize_with = "empty_string_as_default")]
251    pub log_format: LogFormat,
252
253    /// The maximum number of log files set by default.
254    pub max_log_files: usize,
255
256    /// Whether to append logs to stdout. Default is true.
257    pub append_stdout: bool,
258
259    /// Whether to enable tracing with OTLP. Default is false.
260    pub enable_otlp_tracing: bool,
261
262    /// The endpoint of OTLP.
263    pub otlp_endpoint: Option<String>,
264
265    /// The tracing sample ratio.
266    pub tracing_sample_ratio: Option<TracingSampleOptions>,
267
268    /// The protocol of OTLP export.
269    pub otlp_export_protocol: Option<OtlpExportProtocol>,
270
271    /// Additional HTTP headers for OTLP exporter.
272    #[serde(skip_serializing_if = "HashMap::is_empty")]
273    pub otlp_headers: HashMap<String, String>,
274}
275
276/// The protocol of OTLP export.
277#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
278#[serde(rename_all = "snake_case")]
279pub enum OtlpExportProtocol {
280    /// GRPC protocol.
281    Grpc,
282
283    /// HTTP protocol with binary protobuf.
284    Http,
285}
286
287/// The options of slow query.
288#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
289#[serde(default)]
290pub struct SlowQueryOptions {
291    /// Whether to enable slow query log.
292    pub enable: bool,
293
294    /// The record type of slow queries.
295    #[serde(deserialize_with = "empty_string_as_default")]
296    pub record_type: SlowQueriesRecordType,
297
298    /// The threshold of slow queries.
299    #[serde(with = "humantime_serde")]
300    pub threshold: Duration,
301
302    /// The sample ratio of slow queries.
303    pub sample_ratio: f64,
304
305    /// The table TTL of `slow_queries` system table. Default is "90d".
306    /// It's used when `record_type` is `SystemTable`.
307    #[serde(with = "humantime_serde")]
308    pub ttl: Duration,
309}
310
311impl Default for SlowQueryOptions {
312    fn default() -> Self {
313        Self {
314            enable: true,
315            record_type: SlowQueriesRecordType::SystemTable,
316            threshold: Duration::from_secs(30),
317            sample_ratio: 1.0,
318            ttl: Duration::from_days(90),
319        }
320    }
321}
322
323#[derive(Clone, Debug, Serialize, Deserialize, Copy, PartialEq, Default)]
324#[serde(rename_all = "snake_case")]
325pub enum SlowQueriesRecordType {
326    /// Record the slow query in the system table.
327    #[default]
328    SystemTable,
329    /// Record the slow query in a specific logs file.
330    Log,
331}
332
333#[derive(Clone, Debug, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
334#[serde(rename_all = "snake_case")]
335pub enum LogFormat {
336    Json,
337    #[default]
338    Text,
339}
340
341impl PartialEq for LoggingOptions {
342    fn eq(&self, other: &Self) -> bool {
343        self.dir == other.dir
344            && self.level == other.level
345            && self.enable_otlp_tracing == other.enable_otlp_tracing
346            && self.otlp_endpoint == other.otlp_endpoint
347            && self.tracing_sample_ratio == other.tracing_sample_ratio
348            && self.append_stdout == other.append_stdout
349    }
350}
351
352impl Eq for LoggingOptions {}
353
354#[derive(Clone, Debug)]
355struct TraceContext {
356    app_name: String,
357    node_id: String,
358    logging_opts: LoggingOptions,
359}
360
361impl Default for LoggingOptions {
362    fn default() -> Self {
363        Self {
364            // The directory path will be configured at application startup, typically using the data home directory as a base.
365            dir: "".to_string(),
366            level: None,
367            log_format: LogFormat::Text,
368            enable_otlp_tracing: false,
369            otlp_endpoint: None,
370            tracing_sample_ratio: None,
371            append_stdout: true,
372            // Rotation hourly, 24 files per day, keeps info log files of 30 days
373            max_log_files: 720,
374            otlp_export_protocol: None,
375            otlp_headers: HashMap::new(),
376        }
377    }
378}
379
380#[derive(Default, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
381pub struct TracingOptions {
382    #[cfg(feature = "tokio-console")]
383    pub tokio_console_addr: Option<String>,
384}
385
386/// Init tracing for unittest.
387/// Write logs to file `unittest`.
388pub fn init_default_ut_logging() {
389    static START: Once = Once::new();
390
391    START.call_once(|| {
392        let mut g = GLOBAL_UT_LOG_GUARD.as_ref().lock().unwrap();
393
394        // When running in Github's actions, env "UNITTEST_LOG_DIR" is set to a directory other
395        // than "/tmp".
396        // This is to fix the problem that the "/tmp" disk space of action runner's is small,
397        // if we write testing logs in it, actions would fail due to disk out of space error.
398        let dir =
399            env::var("UNITTEST_LOG_DIR").unwrap_or_else(|_| "/tmp/__unittest_logs".to_string());
400
401        let level = env::var("UNITTEST_LOG_LEVEL").unwrap_or_else(|_|
402            "debug,hyper=warn,tower=warn,datafusion=warn,reqwest=warn,sqlparser=warn,h2=info,opendal=info,rskafka=info".to_string()
403        );
404        let opts = LoggingOptions {
405            dir: dir.clone(),
406            level: Some(level),
407            ..Default::default()
408        };
409        *g = Some(init_global_logging(
410            "unittest",
411            &opts,
412            &TracingOptions::default(),
413            None,
414            None,
415        ));
416
417        crate::info!("logs dir = {}", dir);
418    });
419}
420
421static GLOBAL_UT_LOG_GUARD: Lazy<Arc<Mutex<Option<Vec<WorkerGuard>>>>> =
422    Lazy::new(|| Arc::new(Mutex::new(None)));
423
424const DEFAULT_LOG_TARGETS: &str = "info";
425
426#[allow(clippy::print_stdout)]
427pub fn init_global_logging(
428    app_name: &str,
429    opts: &LoggingOptions,
430    tracing_opts: &TracingOptions,
431    node_id: Option<String>,
432    slow_query_opts: Option<&SlowQueryOptions>,
433) -> Vec<WorkerGuard> {
434    static START: Once = Once::new();
435    let mut guards = vec![];
436    let node_id = node_id.unwrap_or_else(|| "none".to_string());
437
438    START.call_once(|| {
439        // Enable log compatible layer to convert log record to tracing span.
440        LogTracer::init().expect("log tracer must be valid");
441
442        // Configure the stdout logging layer.
443        let stdout_logging_layer = if opts.append_stdout {
444            let (writer, guard) = tracing_appender::non_blocking(std::io::stdout());
445            guards.push(guard);
446
447            if opts.log_format == LogFormat::Json {
448                Some(
449                    Layer::new()
450                        .json()
451                        .with_writer(writer)
452                        .with_ansi(std::io::stdout().is_terminal())
453                        .boxed(),
454                )
455            } else {
456                Some(
457                    Layer::new()
458                        .with_writer(writer)
459                        .with_ansi(std::io::stdout().is_terminal())
460                        .boxed(),
461                )
462            }
463        } else {
464            None
465        };
466
467        // Configure the file logging layer with rolling policy.
468        let file_logging_layer = if !opts.dir.is_empty() {
469            let rolling_appender = RollingFileAppender::builder()
470                .rotation(Rotation::HOURLY)
471                .filename_prefix("greptimedb")
472                .max_log_files(opts.max_log_files)
473                .build(&opts.dir)
474                .unwrap_or_else(|e| {
475                    panic!(
476                        "initializing rolling file appender at {} failed: {}",
477                        &opts.dir, e
478                    )
479                });
480            let (writer, guard) = tracing_appender::non_blocking(rolling_appender);
481            guards.push(guard);
482
483            if opts.log_format == LogFormat::Json {
484                Some(
485                    Layer::new()
486                        .json()
487                        .with_writer(writer)
488                        .with_ansi(false)
489                        .boxed(),
490                )
491            } else {
492                Some(Layer::new().with_writer(writer).with_ansi(false).boxed())
493            }
494        } else {
495            None
496        };
497
498        // Configure the error file logging layer with rolling policy.
499        let err_file_logging_layer = if !opts.dir.is_empty() {
500            let rolling_appender = RollingFileAppender::builder()
501                .rotation(Rotation::HOURLY)
502                .filename_prefix("greptimedb-err")
503                .max_log_files(opts.max_log_files)
504                .build(&opts.dir)
505                .unwrap_or_else(|e| {
506                    panic!(
507                        "initializing rolling file appender at {} failed: {}",
508                        &opts.dir, e
509                    )
510                });
511            let (writer, guard) = tracing_appender::non_blocking(rolling_appender);
512            guards.push(guard);
513
514            if opts.log_format == LogFormat::Json {
515                Some(
516                    Layer::new()
517                        .json()
518                        .with_writer(writer)
519                        .with_ansi(false)
520                        .with_filter(filter::LevelFilter::ERROR)
521                        .boxed(),
522                )
523            } else {
524                Some(
525                    Layer::new()
526                        .with_writer(writer)
527                        .with_ansi(false)
528                        .with_filter(filter::LevelFilter::ERROR)
529                        .boxed(),
530                )
531            }
532        } else {
533            None
534        };
535
536        let slow_query_logging_layer = build_slow_query_logger(opts, slow_query_opts, &mut guards);
537
538        // resolve log level settings from:
539        // - options from command line or config files
540        // - environment variable: RUST_LOG
541        // - default settings
542        let filter = opts
543            .level
544            .as_deref()
545            .or(env::var(EnvFilter::DEFAULT_ENV).ok().as_deref())
546            .unwrap_or(DEFAULT_LOG_TARGETS)
547            .parse::<filter::Targets>()
548            .expect("error parsing log level string");
549
550        let (dyn_filter, reload_handle) = tracing_subscriber::reload::Layer::new(filter.clone());
551
552        LOG_RELOAD_HANDLE
553            .set(reload_handle)
554            .expect("reload handle already set, maybe init_global_logging get called twice?");
555
556        let mut initial_tracer = None;
557        let trace_state = if opts.enable_otlp_tracing {
558            let tracer = create_tracer(app_name, &node_id, opts);
559            initial_tracer = Some(tracer.clone());
560            TraceState::Ready(tracer)
561        } else {
562            TraceState::Deferred(TraceContext {
563                app_name: app_name.to_string(),
564                node_id: node_id.clone(),
565                logging_opts: opts.clone(),
566            })
567        };
568
569        TRACER
570            .set(Mutex::new(trace_state))
571            .expect("trace state already initialized");
572
573        let initial_trace_layer = initial_tracer
574            .as_ref()
575            .map(|tracer| tracing_opentelemetry::layer().with_tracer(tracer.clone()));
576
577        let (dyn_trace_layer, trace_reload_handle) = TraceLayer::new(initial_trace_layer);
578
579        TRACE_RELOAD_HANDLE
580            .set(trace_reload_handle)
581            .unwrap_or_else(|_| panic!("failed to set trace reload handle"));
582
583        // Must enable 'tokio_unstable' cfg to use this feature.
584        // For example: `RUSTFLAGS="--cfg tokio_unstable" cargo run -F common-telemetry/console -- standalone start`
585        #[cfg(feature = "tokio-console")]
586        let subscriber = {
587            let tokio_console_layer =
588                if let Some(tokio_console_addr) = &tracing_opts.tokio_console_addr {
589                    let addr: std::net::SocketAddr = tokio_console_addr.parse().unwrap_or_else(|e| {
590                    panic!("Invalid binding address '{tokio_console_addr}' for tokio-console: {e}");
591                });
592                    println!("tokio-console listening on {addr}");
593
594                    Some(
595                        console_subscriber::ConsoleLayer::builder()
596                            .server_addr(addr)
597                            .spawn(),
598                    )
599                } else {
600                    None
601                };
602
603            Registry::default()
604                .with(dyn_filter)
605                .with(dyn_trace_layer)
606                .with(tokio_console_layer)
607                .with(stdout_logging_layer)
608                .with(file_logging_layer)
609                .with(err_file_logging_layer)
610                .with(slow_query_logging_layer)
611        };
612
613        // consume the `tracing_opts` to avoid "unused" warnings.
614        let _ = tracing_opts;
615
616        #[cfg(not(feature = "tokio-console"))]
617        let subscriber = Registry::default()
618            .with(dyn_filter)
619            .with(dyn_trace_layer)
620            .with(stdout_logging_layer)
621            .with(file_logging_layer)
622            .with(err_file_logging_layer)
623            .with(slow_query_logging_layer);
624
625        global::set_text_map_propagator(TraceContextPropagator::new());
626
627        tracing::subscriber::set_global_default(subscriber)
628            .expect("error setting global tracing subscriber");
629    });
630
631    guards
632}
633
634fn create_tracer(app_name: &str, node_id: &str, opts: &LoggingOptions) -> Tracer {
635    let sampler = opts
636        .tracing_sample_ratio
637        .as_ref()
638        .map(create_sampler)
639        .map(Sampler::ParentBased)
640        .unwrap_or(Sampler::ParentBased(Box::new(Sampler::AlwaysOn)));
641
642    let resource = opentelemetry_sdk::Resource::builder_empty()
643        .with_attributes([
644            KeyValue::new(resource::SERVICE_NAME, app_name.to_string()),
645            KeyValue::new(resource::SERVICE_INSTANCE_ID, node_id.to_string()),
646            KeyValue::new(resource::SERVICE_VERSION, common_version::version()),
647            KeyValue::new(resource::PROCESS_PID, std::process::id().to_string()),
648        ])
649        .build();
650
651    opentelemetry_sdk::trace::SdkTracerProvider::builder()
652        .with_batch_exporter(build_otlp_exporter(opts))
653        .with_sampler(sampler)
654        .with_resource(resource)
655        .build()
656        .tracer("greptimedb")
657}
658
659/// Ensure that the OTLP tracer has been constructed, building it lazily if needed.
660pub fn get_or_init_tracer() -> Result<Tracer, &'static str> {
661    let state = TRACER.get().ok_or("trace state is not initialized")?;
662    let mut guard = state.lock().expect("trace state lock poisoned");
663
664    match &mut *guard {
665        TraceState::Ready(tracer) => Ok(tracer.clone()),
666        TraceState::Deferred(context) => {
667            let tracer = create_tracer(&context.app_name, &context.node_id, &context.logging_opts);
668            *guard = TraceState::Ready(tracer.clone());
669            Ok(tracer)
670        }
671    }
672}
673
674fn build_otlp_exporter(opts: &LoggingOptions) -> SpanExporter {
675    let protocol = opts
676        .otlp_export_protocol
677        .clone()
678        .unwrap_or(OtlpExportProtocol::Http);
679
680    let endpoint = opts
681        .otlp_endpoint
682        .as_ref()
683        .map(|e| {
684            if e.starts_with("http") {
685                e.clone()
686            } else {
687                format!("http://{}", e)
688            }
689        })
690        .unwrap_or_else(|| match protocol {
691            OtlpExportProtocol::Grpc => DEFAULT_OTLP_GRPC_ENDPOINT.to_string(),
692            OtlpExportProtocol::Http => DEFAULT_OTLP_HTTP_ENDPOINT.to_string(),
693        });
694
695    match protocol {
696        OtlpExportProtocol::Grpc => SpanExporter::builder()
697            .with_tonic()
698            .with_endpoint(endpoint)
699            .build()
700            .expect("Failed to create OTLP gRPC exporter "),
701
702        OtlpExportProtocol::Http => SpanExporter::builder()
703            .with_http()
704            .with_endpoint(endpoint)
705            .with_protocol(Protocol::HttpBinary)
706            .with_headers(opts.otlp_headers.clone())
707            .build()
708            .expect("Failed to create OTLP HTTP exporter "),
709    }
710}
711
712fn build_slow_query_logger<S>(
713    opts: &LoggingOptions,
714    slow_query_opts: Option<&SlowQueryOptions>,
715    guards: &mut Vec<WorkerGuard>,
716) -> Option<Box<dyn tracing_subscriber::Layer<S> + Send + Sync + 'static>>
717where
718    S: tracing::Subscriber
719        + Send
720        + 'static
721        + for<'span> tracing_subscriber::registry::LookupSpan<'span>,
722{
723    if let Some(slow_query_opts) = slow_query_opts {
724        if !opts.dir.is_empty()
725            && slow_query_opts.enable
726            && slow_query_opts.record_type == SlowQueriesRecordType::Log
727        {
728            let rolling_appender = RollingFileAppender::builder()
729                .rotation(Rotation::HOURLY)
730                .filename_prefix("greptimedb-slow-queries")
731                .max_log_files(opts.max_log_files)
732                .build(&opts.dir)
733                .unwrap_or_else(|e| {
734                    panic!(
735                        "initializing rolling file appender at {} failed: {}",
736                        &opts.dir, e
737                    )
738                });
739            let (writer, guard) = tracing_appender::non_blocking(rolling_appender);
740            guards.push(guard);
741
742            // Only logs if the field contains "slow".
743            let slow_query_filter = FilterFn::new(|metadata| {
744                metadata
745                    .fields()
746                    .iter()
747                    .any(|field| field.name().contains("slow"))
748            });
749
750            if opts.log_format == LogFormat::Json {
751                Some(
752                    Layer::new()
753                        .json()
754                        .with_writer(writer)
755                        .with_ansi(false)
756                        .with_filter(slow_query_filter)
757                        .boxed(),
758                )
759            } else {
760                Some(
761                    Layer::new()
762                        .with_writer(writer)
763                        .with_ansi(false)
764                        .with_filter(slow_query_filter)
765                        .boxed(),
766                )
767            }
768        } else {
769            None
770        }
771    } else {
772        None
773    }
774}
775
776#[cfg(test)]
777mod tests {
778    use super::*;
779
780    #[test]
781    fn test_logging_options_deserialization_default() {
782        let json = r#"{}"#;
783        let opts: LoggingOptions = serde_json::from_str(json).unwrap();
784
785        assert_eq!(opts.log_format, LogFormat::Text);
786        assert_eq!(opts.dir, "");
787        assert_eq!(opts.level, None);
788        assert!(opts.append_stdout);
789    }
790
791    #[test]
792    fn test_logging_options_deserialization_empty_log_format() {
793        let json = r#"{"log_format": ""}"#;
794        let opts: LoggingOptions = serde_json::from_str(json).unwrap();
795
796        // Empty string should use default (Text)
797        assert_eq!(opts.log_format, LogFormat::Text);
798    }
799
800    #[test]
801    fn test_logging_options_deserialization_valid_log_format() {
802        let json_format = r#"{"log_format": "json"}"#;
803        let opts: LoggingOptions = serde_json::from_str(json_format).unwrap();
804        assert_eq!(opts.log_format, LogFormat::Json);
805
806        let text_format = r#"{"log_format": "text"}"#;
807        let opts: LoggingOptions = serde_json::from_str(text_format).unwrap();
808        assert_eq!(opts.log_format, LogFormat::Text);
809    }
810
811    #[test]
812    fn test_logging_options_deserialization_missing_log_format() {
813        let json = r#"{"dir": "/tmp/logs"}"#;
814        let opts: LoggingOptions = serde_json::from_str(json).unwrap();
815
816        // Missing log_format should use default (Text)
817        assert_eq!(opts.log_format, LogFormat::Text);
818        assert_eq!(opts.dir, "/tmp/logs");
819    }
820
821    #[test]
822    fn test_slow_query_options_deserialization_default() {
823        let json = r#"{"enable": true, "threshold": "30s"}"#;
824        let opts: SlowQueryOptions = serde_json::from_str(json).unwrap();
825
826        assert_eq!(opts.record_type, SlowQueriesRecordType::SystemTable);
827        assert!(opts.enable);
828    }
829
830    #[test]
831    fn test_slow_query_options_deserialization_empty_record_type() {
832        let json = r#"{"enable": true, "record_type": "", "threshold": "30s"}"#;
833        let opts: SlowQueryOptions = serde_json::from_str(json).unwrap();
834
835        // Empty string should use default (SystemTable)
836        assert_eq!(opts.record_type, SlowQueriesRecordType::SystemTable);
837        assert!(opts.enable);
838    }
839
840    #[test]
841    fn test_slow_query_options_deserialization_valid_record_type() {
842        let system_table_json =
843            r#"{"enable": true, "record_type": "system_table", "threshold": "30s"}"#;
844        let opts: SlowQueryOptions = serde_json::from_str(system_table_json).unwrap();
845        assert_eq!(opts.record_type, SlowQueriesRecordType::SystemTable);
846
847        let log_json = r#"{"enable": true, "record_type": "log", "threshold": "30s"}"#;
848        let opts: SlowQueryOptions = serde_json::from_str(log_json).unwrap();
849        assert_eq!(opts.record_type, SlowQueriesRecordType::Log);
850    }
851
852    #[test]
853    fn test_slow_query_options_deserialization_missing_record_type() {
854        let json = r#"{"enable": false, "threshold": "30s"}"#;
855        let opts: SlowQueryOptions = serde_json::from_str(json).unwrap();
856
857        // Missing record_type should use default (SystemTable)
858        assert_eq!(opts.record_type, SlowQueriesRecordType::SystemTable);
859        assert!(!opts.enable);
860    }
861
862    #[test]
863    fn test_otlp_export_protocol_deserialization_valid_values() {
864        let grpc_json = r#""grpc""#;
865        let protocol: OtlpExportProtocol = serde_json::from_str(grpc_json).unwrap();
866        assert_eq!(protocol, OtlpExportProtocol::Grpc);
867
868        let http_json = r#""http""#;
869        let protocol: OtlpExportProtocol = serde_json::from_str(http_json).unwrap();
870        assert_eq!(protocol, OtlpExportProtocol::Http);
871    }
872}