1use std::sync::Arc;
16
17use common_base::readable_size::ReadableSize;
18use common_config::config::Configurable;
19use common_event_recorder::EventRecorderOptions;
20use common_options::datanode::DatanodeClientOptions;
21use common_options::memory::MemoryOptions;
22use common_telemetry::logging::{LoggingOptions, SlowQueryOptions, TracingOptions};
23use meta_client::MetaClientOptions;
24use query::options::QueryOptions;
25use serde::{Deserialize, Serialize};
26use servers::export_metrics::{ExportMetricsOption, ExportMetricsTask};
27use servers::grpc::GrpcOptions;
28use servers::heartbeat_options::HeartbeatOptions;
29use servers::http::HttpOptions;
30use servers::server::ServerHandlers;
31use snafu::ResultExt;
32
33use crate::error;
34use crate::error::Result;
35use crate::heartbeat::HeartbeatTask;
36use crate::instance::Instance;
37use crate::instance::prom_store::ExportMetricHandler;
38use crate::service_config::{
39 InfluxdbOptions, JaegerOptions, MysqlOptions, OpentsdbOptions, OtlpOptions, PostgresOptions,
40 PromStoreOptions,
41};
42
43#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
44#[serde(default)]
45pub struct FrontendOptions {
46 pub node_id: Option<String>,
47 pub default_timezone: Option<String>,
48 pub default_column_prefix: Option<String>,
49 pub heartbeat: HeartbeatOptions,
50 pub http: HttpOptions,
51 pub grpc: GrpcOptions,
52 pub internal_grpc: Option<GrpcOptions>,
55 pub mysql: MysqlOptions,
56 pub postgres: PostgresOptions,
57 pub opentsdb: OpentsdbOptions,
58 pub influxdb: InfluxdbOptions,
59 pub prom_store: PromStoreOptions,
60 pub jaeger: JaegerOptions,
61 pub otlp: OtlpOptions,
62 pub meta_client: Option<MetaClientOptions>,
63 pub logging: LoggingOptions,
64 pub datanode: DatanodeClientOptions,
65 pub user_provider: Option<String>,
66 pub export_metrics: ExportMetricsOption,
67 pub tracing: TracingOptions,
68 pub query: QueryOptions,
69 pub max_in_flight_write_bytes: Option<ReadableSize>,
70 pub slow_query: SlowQueryOptions,
71 pub memory: MemoryOptions,
72 pub event_recorder: EventRecorderOptions,
74}
75
76impl Default for FrontendOptions {
77 fn default() -> Self {
78 Self {
79 node_id: None,
80 default_timezone: None,
81 default_column_prefix: None,
82 heartbeat: HeartbeatOptions::frontend_default(),
83 http: HttpOptions::default(),
84 grpc: GrpcOptions::default(),
85 internal_grpc: None,
86 mysql: MysqlOptions::default(),
87 postgres: PostgresOptions::default(),
88 opentsdb: OpentsdbOptions::default(),
89 influxdb: InfluxdbOptions::default(),
90 jaeger: JaegerOptions::default(),
91 prom_store: PromStoreOptions::default(),
92 otlp: OtlpOptions::default(),
93 meta_client: None,
94 logging: LoggingOptions::default(),
95 datanode: DatanodeClientOptions::default(),
96 user_provider: None,
97 export_metrics: ExportMetricsOption::default(),
98 tracing: TracingOptions::default(),
99 query: QueryOptions::default(),
100 max_in_flight_write_bytes: None,
101 slow_query: SlowQueryOptions::default(),
102 memory: MemoryOptions::default(),
103 event_recorder: EventRecorderOptions::default(),
104 }
105 }
106}
107
108impl Configurable for FrontendOptions {
109 fn env_list_keys() -> Option<&'static [&'static str]> {
110 Some(&["meta_client.metasrv_addrs"])
111 }
112}
113
114pub struct Frontend {
117 pub instance: Arc<Instance>,
118 pub servers: ServerHandlers,
119 pub heartbeat_task: Option<HeartbeatTask>,
120 pub export_metrics_task: Option<ExportMetricsTask>,
121}
122
123impl Frontend {
124 pub async fn start(&mut self) -> Result<()> {
125 if let Some(t) = &self.heartbeat_task {
126 t.start().await?;
127 }
128
129 if let Some(t) = self.export_metrics_task.as_ref() {
130 if t.send_by_handler {
131 let inserter = self.instance.inserter().clone();
132 let statement_executor = self.instance.statement_executor().clone();
133 let handler = ExportMetricHandler::new_handler(inserter, statement_executor);
134 t.start(Some(handler)).context(error::StartServerSnafu)?
135 } else {
136 t.start(None).context(error::StartServerSnafu)?;
137 }
138 }
139
140 self.servers
141 .start_all()
142 .await
143 .context(error::StartServerSnafu)
144 }
145
146 pub async fn shutdown(&mut self) -> Result<()> {
147 self.servers
148 .shutdown_all()
149 .await
150 .context(error::ShutdownServerSnafu)
151 }
152
153 pub fn server_handlers(&self) -> &ServerHandlers {
154 &self.servers
155 }
156}
157
158#[cfg(test)]
159mod tests {
160 use super::*;
161
162 #[test]
163 fn test_toml() {
164 let opts = FrontendOptions::default();
165 let toml_string = toml::to_string(&opts).unwrap();
166 let _parsed: FrontendOptions = toml::from_str(&toml_string).unwrap();
167 }
168}