Skip to main content

common_telemetry/
logging.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! logging stuffs, inspired by databend
16use std::collections::HashMap;
17use std::env;
18use std::io::IsTerminal;
19use std::sync::{Arc, Mutex, Once, RwLock};
20use std::time::Duration;
21
22use common_base::serde::empty_string_as_default;
23use once_cell::sync::{Lazy, OnceCell};
24use opentelemetry::trace::TracerProvider;
25use opentelemetry::{KeyValue, global};
26use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig, WithHttpConfig};
27use opentelemetry_sdk::propagation::TraceContextPropagator;
28use opentelemetry_sdk::trace::{Sampler, Tracer};
29use opentelemetry_semantic_conventions::resource;
30use serde::{Deserialize, Serialize};
31use tracing::callsite;
32use tracing::metadata::LevelFilter;
33use tracing_appender::non_blocking::WorkerGuard;
34use tracing_appender::rolling::{RollingFileAppender, Rotation};
35use tracing_log::LogTracer;
36use tracing_subscriber::filter::{FilterFn, Targets};
37use tracing_subscriber::fmt::Layer;
38use tracing_subscriber::layer::{Layered, SubscriberExt};
39use tracing_subscriber::prelude::*;
40use tracing_subscriber::{EnvFilter, Registry, filter};
41
42use crate::tracing_sampler::{TracingSampleOptions, create_sampler};
43
44/// The default endpoint when use gRPC exporter protocol.
45pub const DEFAULT_OTLP_GRPC_ENDPOINT: &str = "http://localhost:4317";
46
47/// The default endpoint when use HTTP exporter protocol.
48pub const DEFAULT_OTLP_HTTP_ENDPOINT: &str = "http://localhost:4318/v1/traces";
49
50/// The default logs directory.
51pub const DEFAULT_LOGGING_DIR: &str = "logs";
52
53/// Handle for reloading log level
54pub static LOG_RELOAD_HANDLE: OnceCell<tracing_subscriber::reload::Handle<Targets, Registry>> =
55    OnceCell::new();
56
57type DynSubscriber = Layered<tracing_subscriber::reload::Layer<Targets, Registry>, Registry>;
58type OtelTraceLayer = tracing_opentelemetry::OpenTelemetryLayer<DynSubscriber, Tracer>;
59
60#[derive(Clone)]
61pub struct TraceReloadHandle {
62    inner: Arc<RwLock<Option<OtelTraceLayer>>>,
63}
64
65impl TraceReloadHandle {
66    fn new(inner: Arc<RwLock<Option<OtelTraceLayer>>>) -> Self {
67        Self { inner }
68    }
69
70    pub fn reload(&self, new_layer: Option<OtelTraceLayer>) {
71        let mut guard = self.inner.write().unwrap();
72        *guard = new_layer;
73        drop(guard);
74
75        callsite::rebuild_interest_cache();
76    }
77}
78
79/// A tracing layer that can be dynamically reloaded.
80///
81/// Mostly copied from [`tracing_subscriber::reload::Layer`].
82struct TraceLayer {
83    inner: Arc<RwLock<Option<OtelTraceLayer>>>,
84}
85
86impl TraceLayer {
87    fn new(initial: Option<OtelTraceLayer>) -> (Self, TraceReloadHandle) {
88        let inner = Arc::new(RwLock::new(initial));
89        (
90            Self {
91                inner: inner.clone(),
92            },
93            TraceReloadHandle::new(inner),
94        )
95    }
96
97    fn with_layer<R>(&self, f: impl FnOnce(&OtelTraceLayer) -> R) -> Option<R> {
98        self.inner
99            .read()
100            .ok()
101            .and_then(|guard| guard.as_ref().map(f))
102    }
103
104    fn with_layer_mut<R>(&self, f: impl FnOnce(&mut OtelTraceLayer) -> R) -> Option<R> {
105        self.inner
106            .write()
107            .ok()
108            .and_then(|mut guard| guard.as_mut().map(f))
109    }
110}
111
112impl tracing_subscriber::Layer<DynSubscriber> for TraceLayer {
113    fn on_register_dispatch(&self, subscriber: &tracing::Dispatch) {
114        let _ = self.with_layer(|layer| layer.on_register_dispatch(subscriber));
115    }
116
117    fn on_layer(&mut self, subscriber: &mut DynSubscriber) {
118        let _ = self.with_layer_mut(|layer| layer.on_layer(subscriber));
119    }
120
121    fn register_callsite(
122        &self,
123        metadata: &'static tracing::Metadata<'static>,
124    ) -> tracing::subscriber::Interest {
125        self.with_layer(|layer| layer.register_callsite(metadata))
126            .unwrap_or_else(tracing::subscriber::Interest::always)
127    }
128
129    fn enabled(
130        &self,
131        metadata: &tracing::Metadata<'_>,
132        ctx: tracing_subscriber::layer::Context<'_, DynSubscriber>,
133    ) -> bool {
134        self.with_layer(|layer| layer.enabled(metadata, ctx))
135            .unwrap_or(true)
136    }
137
138    fn on_new_span(
139        &self,
140        attrs: &tracing::span::Attributes<'_>,
141        id: &tracing::span::Id,
142        ctx: tracing_subscriber::layer::Context<'_, DynSubscriber>,
143    ) {
144        let _ = self.with_layer(|layer| layer.on_new_span(attrs, id, ctx));
145    }
146
147    fn max_level_hint(&self) -> Option<LevelFilter> {
148        self.with_layer(|layer| layer.max_level_hint()).flatten()
149    }
150
151    fn on_record(
152        &self,
153        span: &tracing::span::Id,
154        values: &tracing::span::Record<'_>,
155        ctx: tracing_subscriber::layer::Context<'_, DynSubscriber>,
156    ) {
157        let _ = self.with_layer(|layer| layer.on_record(span, values, ctx));
158    }
159
160    fn on_follows_from(
161        &self,
162        span: &tracing::span::Id,
163        follows: &tracing::span::Id,
164        ctx: tracing_subscriber::layer::Context<'_, DynSubscriber>,
165    ) {
166        let _ = self.with_layer(|layer| layer.on_follows_from(span, follows, ctx));
167    }
168
169    fn event_enabled(
170        &self,
171        event: &tracing::Event<'_>,
172        ctx: tracing_subscriber::layer::Context<'_, DynSubscriber>,
173    ) -> bool {
174        self.with_layer(|layer| layer.event_enabled(event, ctx))
175            .unwrap_or(true)
176    }
177
178    fn on_event(
179        &self,
180        event: &tracing::Event<'_>,
181        ctx: tracing_subscriber::layer::Context<'_, DynSubscriber>,
182    ) {
183        let _ = self.with_layer(|layer| layer.on_event(event, ctx));
184    }
185
186    fn on_enter(
187        &self,
188        id: &tracing::span::Id,
189        ctx: tracing_subscriber::layer::Context<'_, DynSubscriber>,
190    ) {
191        let _ = self.with_layer(|layer| layer.on_enter(id, ctx));
192    }
193
194    fn on_exit(
195        &self,
196        id: &tracing::span::Id,
197        ctx: tracing_subscriber::layer::Context<'_, DynSubscriber>,
198    ) {
199        let _ = self.with_layer(|layer| layer.on_exit(id, ctx));
200    }
201
202    fn on_close(
203        &self,
204        id: tracing::span::Id,
205        ctx: tracing_subscriber::layer::Context<'_, DynSubscriber>,
206    ) {
207        let _ = self.with_layer(|layer| layer.on_close(id, ctx));
208    }
209
210    fn on_id_change(
211        &self,
212        old: &tracing::span::Id,
213        new: &tracing::span::Id,
214        ctx: tracing_subscriber::layer::Context<'_, DynSubscriber>,
215    ) {
216        let _ = self.with_layer(|layer| layer.on_id_change(old, new, ctx));
217    }
218
219    unsafe fn downcast_raw(&self, id: std::any::TypeId) -> Option<*const ()> {
220        self.inner.read().ok().and_then(|guard| {
221            guard
222                .as_ref()
223                .and_then(|layer| unsafe { layer.downcast_raw(id) })
224        })
225    }
226}
227
228/// Handle for reloading trace level
229pub static TRACE_RELOAD_HANDLE: OnceCell<TraceReloadHandle> = OnceCell::new();
230
231static TRACER: OnceCell<Mutex<TraceState>> = OnceCell::new();
232
233#[derive(Debug)]
234enum TraceState {
235    Ready(Tracer),
236    Deferred(TraceContext),
237}
238
239/// The logging options that used to initialize the logger.
240#[derive(Clone, Debug, Serialize, Deserialize)]
241#[serde(default)]
242pub struct LoggingOptions {
243    /// The directory to store log files. If not set, logs will be written to stdout.
244    pub dir: String,
245
246    /// The log level that can be one of "trace", "debug", "info", "warn", "error". Default is "info".
247    pub level: Option<String>,
248
249    /// The log format that can be one of "json" or "text". Default is "text".
250    #[serde(default, deserialize_with = "empty_string_as_default")]
251    pub log_format: LogFormat,
252
253    /// The maximum number of log files set by default.
254    pub max_log_files: usize,
255
256    /// Whether to append logs to stdout. Default is true.
257    pub append_stdout: bool,
258
259    /// Whether to enable tracing with OTLP. Default is false.
260    pub enable_otlp_tracing: bool,
261
262    /// The endpoint of OTLP.
263    pub otlp_endpoint: Option<String>,
264
265    /// The tracing sample ratio.
266    pub tracing_sample_ratio: Option<TracingSampleOptions>,
267
268    /// The protocol of OTLP export.
269    pub otlp_export_protocol: Option<OtlpExportProtocol>,
270
271    /// Additional HTTP headers for OTLP exporter.
272    #[serde(skip_serializing_if = "HashMap::is_empty")]
273    pub otlp_headers: HashMap<String, String>,
274
275    /// Whether to enable per-region metrics.
276    pub enable_per_region_metrics: bool,
277}
278
279/// The protocol of OTLP export.
280#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
281#[serde(rename_all = "snake_case")]
282pub enum OtlpExportProtocol {
283    /// GRPC protocol.
284    Grpc,
285
286    /// HTTP protocol with binary protobuf.
287    Http,
288}
289
290/// The options of slow query.
291#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
292#[serde(default)]
293pub struct SlowQueryOptions {
294    /// Whether to enable slow query log.
295    pub enable: bool,
296
297    /// The record type of slow queries.
298    #[serde(deserialize_with = "empty_string_as_default")]
299    pub record_type: SlowQueriesRecordType,
300
301    /// The threshold of slow queries.
302    #[serde(with = "humantime_serde")]
303    pub threshold: Duration,
304
305    /// The sample ratio of slow queries.
306    pub sample_ratio: f64,
307
308    /// The table TTL of `slow_queries` system table. Default is "90d".
309    /// It's used when `record_type` is `SystemTable`.
310    #[serde(with = "humantime_serde")]
311    pub ttl: Duration,
312}
313
314impl Default for SlowQueryOptions {
315    fn default() -> Self {
316        Self {
317            enable: true,
318            record_type: SlowQueriesRecordType::SystemTable,
319            threshold: Duration::from_secs(30),
320            sample_ratio: 1.0,
321            ttl: Duration::from_days(90),
322        }
323    }
324}
325
326#[derive(Clone, Debug, Serialize, Deserialize, Copy, PartialEq, Default)]
327#[serde(rename_all = "snake_case")]
328pub enum SlowQueriesRecordType {
329    /// Record the slow query in the system table.
330    #[default]
331    SystemTable,
332    /// Record the slow query in a specific logs file.
333    Log,
334}
335
336#[derive(Clone, Debug, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
337#[serde(rename_all = "snake_case")]
338pub enum LogFormat {
339    Json,
340    #[default]
341    Text,
342}
343
344impl PartialEq for LoggingOptions {
345    fn eq(&self, other: &Self) -> bool {
346        self.dir == other.dir
347            && self.level == other.level
348            && self.enable_otlp_tracing == other.enable_otlp_tracing
349            && self.otlp_endpoint == other.otlp_endpoint
350            && self.tracing_sample_ratio == other.tracing_sample_ratio
351            && self.append_stdout == other.append_stdout
352            && self.enable_per_region_metrics == other.enable_per_region_metrics
353    }
354}
355
356impl Eq for LoggingOptions {}
357
358#[derive(Clone, Debug)]
359struct TraceContext {
360    app_name: String,
361    node_id: String,
362    logging_opts: LoggingOptions,
363}
364
365impl Default for LoggingOptions {
366    fn default() -> Self {
367        Self {
368            // The directory path will be configured at application startup, typically using the data home directory as a base.
369            dir: "".to_string(),
370            level: None,
371            log_format: LogFormat::Text,
372            enable_otlp_tracing: false,
373            otlp_endpoint: None,
374            tracing_sample_ratio: None,
375            append_stdout: true,
376            // Rotation hourly, 24 files per day, keeps info log files of 30 days
377            max_log_files: 720,
378            otlp_export_protocol: None,
379            otlp_headers: HashMap::new(),
380            enable_per_region_metrics: false,
381        }
382    }
383}
384
385#[derive(Default, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
386pub struct TracingOptions {
387    #[cfg(feature = "tokio-console")]
388    pub tokio_console_addr: Option<String>,
389}
390
391/// Init tracing for unittest.
392/// Write logs to file `unittest`.
393pub fn init_default_ut_logging() {
394    static START: Once = Once::new();
395
396    START.call_once(|| {
397        let mut g = GLOBAL_UT_LOG_GUARD.as_ref().lock().unwrap();
398
399        // When running in Github's actions, env "UNITTEST_LOG_DIR" is set to a directory other
400        // than "/tmp".
401        // This is to fix the problem that the "/tmp" disk space of action runner's is small,
402        // if we write testing logs in it, actions would fail due to disk out of space error.
403        let dir =
404            env::var("UNITTEST_LOG_DIR").unwrap_or_else(|_| "/tmp/__unittest_logs".to_string());
405
406        let level = env::var("UNITTEST_LOG_LEVEL").unwrap_or_else(|_|
407            "debug,hyper=warn,tower=warn,datafusion=warn,reqwest=warn,sqlparser=warn,h2=info,opendal=info,rskafka=info".to_string()
408        );
409        let opts = LoggingOptions {
410            dir: dir.clone(),
411            level: Some(level),
412            ..Default::default()
413        };
414        *g = Some(init_global_logging(
415            "unittest",
416            &opts,
417            &TracingOptions::default(),
418            None,
419            None,
420        ));
421
422        crate::info!("logs dir = {}", dir);
423    });
424}
425
426static GLOBAL_UT_LOG_GUARD: Lazy<Arc<Mutex<Option<Vec<WorkerGuard>>>>> =
427    Lazy::new(|| Arc::new(Mutex::new(None)));
428
429const DEFAULT_LOG_TARGETS: &str = "info";
430
431#[allow(clippy::print_stdout)]
432pub fn init_global_logging(
433    app_name: &str,
434    opts: &LoggingOptions,
435    tracing_opts: &TracingOptions,
436    node_id: Option<String>,
437    slow_query_opts: Option<&SlowQueryOptions>,
438) -> Vec<WorkerGuard> {
439    static START: Once = Once::new();
440    let mut guards = vec![];
441    let node_id = node_id.unwrap_or_else(|| "none".to_string());
442
443    START.call_once(|| {
444        // Enable log compatible layer to convert log record to tracing span.
445        LogTracer::init().expect("log tracer must be valid");
446
447        // Configure the stdout logging layer.
448        let stdout_logging_layer = if opts.append_stdout {
449            let (writer, guard) = tracing_appender::non_blocking(std::io::stdout());
450            guards.push(guard);
451
452            if opts.log_format == LogFormat::Json {
453                Some(
454                    Layer::new()
455                        .json()
456                        .with_writer(writer)
457                        .with_ansi(std::io::stdout().is_terminal())
458                        .boxed(),
459                )
460            } else {
461                Some(
462                    Layer::new()
463                        .with_writer(writer)
464                        .with_ansi(std::io::stdout().is_terminal())
465                        .boxed(),
466                )
467            }
468        } else {
469            None
470        };
471
472        // Configure the file logging layer with rolling policy.
473        let file_logging_layer = if !opts.dir.is_empty() {
474            let rolling_appender = RollingFileAppender::builder()
475                .rotation(Rotation::HOURLY)
476                .filename_prefix("greptimedb")
477                .max_log_files(opts.max_log_files)
478                .build(&opts.dir)
479                .unwrap_or_else(|e| {
480                    panic!(
481                        "initializing rolling file appender at {} failed: {}",
482                        &opts.dir, e
483                    )
484                });
485            let (writer, guard) = tracing_appender::non_blocking(rolling_appender);
486            guards.push(guard);
487
488            if opts.log_format == LogFormat::Json {
489                Some(
490                    Layer::new()
491                        .json()
492                        .with_writer(writer)
493                        .with_ansi(false)
494                        .boxed(),
495                )
496            } else {
497                Some(Layer::new().with_writer(writer).with_ansi(false).boxed())
498            }
499        } else {
500            None
501        };
502
503        // Configure the error file logging layer with rolling policy.
504        let err_file_logging_layer = if !opts.dir.is_empty() {
505            let rolling_appender = RollingFileAppender::builder()
506                .rotation(Rotation::HOURLY)
507                .filename_prefix("greptimedb-err")
508                .max_log_files(opts.max_log_files)
509                .build(&opts.dir)
510                .unwrap_or_else(|e| {
511                    panic!(
512                        "initializing rolling file appender at {} failed: {}",
513                        &opts.dir, e
514                    )
515                });
516            let (writer, guard) = tracing_appender::non_blocking(rolling_appender);
517            guards.push(guard);
518
519            if opts.log_format == LogFormat::Json {
520                Some(
521                    Layer::new()
522                        .json()
523                        .with_writer(writer)
524                        .with_ansi(false)
525                        .with_filter(filter::LevelFilter::ERROR)
526                        .boxed(),
527                )
528            } else {
529                Some(
530                    Layer::new()
531                        .with_writer(writer)
532                        .with_ansi(false)
533                        .with_filter(filter::LevelFilter::ERROR)
534                        .boxed(),
535                )
536            }
537        } else {
538            None
539        };
540
541        let slow_query_logging_layer = build_slow_query_logger(opts, slow_query_opts, &mut guards);
542
543        // resolve log level settings from:
544        // - options from command line or config files
545        // - environment variable: RUST_LOG
546        // - default settings
547        let filter = opts
548            .level
549            .as_deref()
550            .or(env::var(EnvFilter::DEFAULT_ENV).ok().as_deref())
551            .unwrap_or(DEFAULT_LOG_TARGETS)
552            .parse::<filter::Targets>()
553            .expect("error parsing log level string");
554
555        let (dyn_filter, reload_handle) = tracing_subscriber::reload::Layer::new(filter.clone());
556
557        LOG_RELOAD_HANDLE
558            .set(reload_handle)
559            .expect("reload handle already set, maybe init_global_logging get called twice?");
560
561        let mut initial_tracer = None;
562        let trace_state = if opts.enable_otlp_tracing {
563            let tracer = create_tracer(app_name, &node_id, opts);
564            initial_tracer = Some(tracer.clone());
565            TraceState::Ready(tracer)
566        } else {
567            TraceState::Deferred(TraceContext {
568                app_name: app_name.to_string(),
569                node_id: node_id.clone(),
570                logging_opts: opts.clone(),
571            })
572        };
573
574        TRACER
575            .set(Mutex::new(trace_state))
576            .expect("trace state already initialized");
577
578        let initial_trace_layer = initial_tracer
579            .as_ref()
580            .map(|tracer| tracing_opentelemetry::layer().with_tracer(tracer.clone()));
581
582        let (dyn_trace_layer, trace_reload_handle) = TraceLayer::new(initial_trace_layer);
583
584        TRACE_RELOAD_HANDLE
585            .set(trace_reload_handle)
586            .unwrap_or_else(|_| panic!("failed to set trace reload handle"));
587
588        // Must enable 'tokio_unstable' cfg to use this feature.
589        // For example: `RUSTFLAGS="--cfg tokio_unstable" cargo run -F common-telemetry/console -- standalone start`
590        #[cfg(feature = "tokio-console")]
591        let subscriber = {
592            let tokio_console_layer =
593                if let Some(tokio_console_addr) = &tracing_opts.tokio_console_addr {
594                    let addr: std::net::SocketAddr = tokio_console_addr.parse().unwrap_or_else(|e| {
595                    panic!("Invalid binding address '{tokio_console_addr}' for tokio-console: {e}");
596                });
597                    println!("tokio-console listening on {addr}");
598
599                    Some(
600                        console_subscriber::ConsoleLayer::builder()
601                            .server_addr(addr)
602                            .spawn(),
603                    )
604                } else {
605                    None
606                };
607
608            Registry::default()
609                .with(dyn_filter)
610                .with(dyn_trace_layer)
611                .with(tokio_console_layer)
612                .with(stdout_logging_layer)
613                .with(file_logging_layer)
614                .with(err_file_logging_layer)
615                .with(slow_query_logging_layer)
616        };
617
618        // consume the `tracing_opts` to avoid "unused" warnings.
619        let _ = tracing_opts;
620
621        #[cfg(not(feature = "tokio-console"))]
622        let subscriber = Registry::default()
623            .with(dyn_filter)
624            .with(dyn_trace_layer)
625            .with(stdout_logging_layer)
626            .with(file_logging_layer)
627            .with(err_file_logging_layer)
628            .with(slow_query_logging_layer);
629
630        global::set_text_map_propagator(TraceContextPropagator::new());
631
632        tracing::subscriber::set_global_default(subscriber)
633            .expect("error setting global tracing subscriber");
634    });
635
636    guards
637}
638
639fn create_tracer(app_name: &str, node_id: &str, opts: &LoggingOptions) -> Tracer {
640    let sampler = opts
641        .tracing_sample_ratio
642        .as_ref()
643        .map(create_sampler)
644        .map(Sampler::ParentBased)
645        .unwrap_or(Sampler::ParentBased(Box::new(Sampler::AlwaysOn)));
646
647    let resource = opentelemetry_sdk::Resource::builder_empty()
648        .with_attributes([
649            KeyValue::new(resource::SERVICE_NAME, app_name.to_string()),
650            KeyValue::new(resource::SERVICE_INSTANCE_ID, node_id.to_string()),
651            KeyValue::new(resource::SERVICE_VERSION, common_version::version()),
652            KeyValue::new(resource::PROCESS_PID, std::process::id().to_string()),
653        ])
654        .build();
655
656    opentelemetry_sdk::trace::SdkTracerProvider::builder()
657        .with_batch_exporter(build_otlp_exporter(opts))
658        .with_sampler(sampler)
659        .with_resource(resource)
660        .build()
661        .tracer("greptimedb")
662}
663
664/// Ensure that the OTLP tracer has been constructed, building it lazily if needed.
665pub fn get_or_init_tracer() -> Result<Tracer, &'static str> {
666    let state = TRACER.get().ok_or("trace state is not initialized")?;
667    let mut guard = state.lock().expect("trace state lock poisoned");
668
669    match &mut *guard {
670        TraceState::Ready(tracer) => Ok(tracer.clone()),
671        TraceState::Deferred(context) => {
672            let tracer = create_tracer(&context.app_name, &context.node_id, &context.logging_opts);
673            *guard = TraceState::Ready(tracer.clone());
674            Ok(tracer)
675        }
676    }
677}
678
679fn build_otlp_exporter(opts: &LoggingOptions) -> SpanExporter {
680    let protocol = opts
681        .otlp_export_protocol
682        .clone()
683        .unwrap_or(OtlpExportProtocol::Http);
684
685    let endpoint = opts
686        .otlp_endpoint
687        .as_ref()
688        .map(|e| {
689            if e.starts_with("http") {
690                e.clone()
691            } else {
692                format!("http://{}", e)
693            }
694        })
695        .unwrap_or_else(|| match protocol {
696            OtlpExportProtocol::Grpc => DEFAULT_OTLP_GRPC_ENDPOINT.to_string(),
697            OtlpExportProtocol::Http => DEFAULT_OTLP_HTTP_ENDPOINT.to_string(),
698        });
699
700    match protocol {
701        OtlpExportProtocol::Grpc => SpanExporter::builder()
702            .with_tonic()
703            .with_endpoint(endpoint)
704            .build()
705            .expect("Failed to create OTLP gRPC exporter "),
706
707        OtlpExportProtocol::Http => SpanExporter::builder()
708            .with_http()
709            .with_endpoint(endpoint)
710            .with_protocol(Protocol::HttpBinary)
711            .with_headers(opts.otlp_headers.clone())
712            .build()
713            .expect("Failed to create OTLP HTTP exporter "),
714    }
715}
716
717fn build_slow_query_logger<S>(
718    opts: &LoggingOptions,
719    slow_query_opts: Option<&SlowQueryOptions>,
720    guards: &mut Vec<WorkerGuard>,
721) -> Option<Box<dyn tracing_subscriber::Layer<S> + Send + Sync + 'static>>
722where
723    S: tracing::Subscriber
724        + Send
725        + 'static
726        + for<'span> tracing_subscriber::registry::LookupSpan<'span>,
727{
728    if let Some(slow_query_opts) = slow_query_opts {
729        if !opts.dir.is_empty()
730            && slow_query_opts.enable
731            && slow_query_opts.record_type == SlowQueriesRecordType::Log
732        {
733            let rolling_appender = RollingFileAppender::builder()
734                .rotation(Rotation::HOURLY)
735                .filename_prefix("greptimedb-slow-queries")
736                .max_log_files(opts.max_log_files)
737                .build(&opts.dir)
738                .unwrap_or_else(|e| {
739                    panic!(
740                        "initializing rolling file appender at {} failed: {}",
741                        &opts.dir, e
742                    )
743                });
744            let (writer, guard) = tracing_appender::non_blocking(rolling_appender);
745            guards.push(guard);
746
747            // Only logs if the field contains "slow".
748            let slow_query_filter = FilterFn::new(|metadata| {
749                metadata
750                    .fields()
751                    .iter()
752                    .any(|field| field.name().contains("slow"))
753            });
754
755            if opts.log_format == LogFormat::Json {
756                Some(
757                    Layer::new()
758                        .json()
759                        .with_writer(writer)
760                        .with_ansi(false)
761                        .with_filter(slow_query_filter)
762                        .boxed(),
763                )
764            } else {
765                Some(
766                    Layer::new()
767                        .with_writer(writer)
768                        .with_ansi(false)
769                        .with_filter(slow_query_filter)
770                        .boxed(),
771                )
772            }
773        } else {
774            None
775        }
776    } else {
777        None
778    }
779}
780
781#[cfg(test)]
782mod tests {
783    use super::*;
784
785    #[test]
786    fn test_logging_options_deserialization_default() {
787        let json = r#"{}"#;
788        let opts: LoggingOptions = serde_json::from_str(json).unwrap();
789
790        assert_eq!(opts.log_format, LogFormat::Text);
791        assert_eq!(opts.dir, "");
792        assert_eq!(opts.level, None);
793        assert!(opts.append_stdout);
794    }
795
796    #[test]
797    fn test_logging_options_deserialization_empty_log_format() {
798        let json = r#"{"log_format": ""}"#;
799        let opts: LoggingOptions = serde_json::from_str(json).unwrap();
800
801        // Empty string should use default (Text)
802        assert_eq!(opts.log_format, LogFormat::Text);
803    }
804
805    #[test]
806    fn test_logging_options_deserialization_valid_log_format() {
807        let json_format = r#"{"log_format": "json"}"#;
808        let opts: LoggingOptions = serde_json::from_str(json_format).unwrap();
809        assert_eq!(opts.log_format, LogFormat::Json);
810
811        let text_format = r#"{"log_format": "text"}"#;
812        let opts: LoggingOptions = serde_json::from_str(text_format).unwrap();
813        assert_eq!(opts.log_format, LogFormat::Text);
814    }
815
816    #[test]
817    fn test_logging_options_deserialization_missing_log_format() {
818        let json = r#"{"dir": "/tmp/logs"}"#;
819        let opts: LoggingOptions = serde_json::from_str(json).unwrap();
820
821        // Missing log_format should use default (Text)
822        assert_eq!(opts.log_format, LogFormat::Text);
823        assert_eq!(opts.dir, "/tmp/logs");
824    }
825
826    #[test]
827    fn test_slow_query_options_deserialization_default() {
828        let json = r#"{"enable": true, "threshold": "30s"}"#;
829        let opts: SlowQueryOptions = serde_json::from_str(json).unwrap();
830
831        assert_eq!(opts.record_type, SlowQueriesRecordType::SystemTable);
832        assert!(opts.enable);
833    }
834
835    #[test]
836    fn test_slow_query_options_deserialization_empty_record_type() {
837        let json = r#"{"enable": true, "record_type": "", "threshold": "30s"}"#;
838        let opts: SlowQueryOptions = serde_json::from_str(json).unwrap();
839
840        // Empty string should use default (SystemTable)
841        assert_eq!(opts.record_type, SlowQueriesRecordType::SystemTable);
842        assert!(opts.enable);
843    }
844
845    #[test]
846    fn test_slow_query_options_deserialization_valid_record_type() {
847        let system_table_json =
848            r#"{"enable": true, "record_type": "system_table", "threshold": "30s"}"#;
849        let opts: SlowQueryOptions = serde_json::from_str(system_table_json).unwrap();
850        assert_eq!(opts.record_type, SlowQueriesRecordType::SystemTable);
851
852        let log_json = r#"{"enable": true, "record_type": "log", "threshold": "30s"}"#;
853        let opts: SlowQueryOptions = serde_json::from_str(log_json).unwrap();
854        assert_eq!(opts.record_type, SlowQueriesRecordType::Log);
855    }
856
857    #[test]
858    fn test_slow_query_options_deserialization_missing_record_type() {
859        let json = r#"{"enable": false, "threshold": "30s"}"#;
860        let opts: SlowQueryOptions = serde_json::from_str(json).unwrap();
861
862        // Missing record_type should use default (SystemTable)
863        assert_eq!(opts.record_type, SlowQueriesRecordType::SystemTable);
864        assert!(!opts.enable);
865    }
866
867    #[test]
868    fn test_otlp_export_protocol_deserialization_valid_values() {
869        let grpc_json = r#""grpc""#;
870        let protocol: OtlpExportProtocol = serde_json::from_str(grpc_json).unwrap();
871        assert_eq!(protocol, OtlpExportProtocol::Grpc);
872
873        let http_json = r#""http""#;
874        let protocol: OtlpExportProtocol = serde_json::from_str(http_json).unwrap();
875        assert_eq!(protocol, OtlpExportProtocol::Http);
876    }
877}