1use std::net::SocketAddr;
16use std::sync::Arc;
17
18use api::v1::meta::cluster_server::ClusterServer;
19use api::v1::meta::heartbeat_server::HeartbeatServer;
20use api::v1::meta::procedure_service_server::ProcedureServiceServer;
21use api::v1::meta::store_server::StoreServer;
22use common_base::Plugins;
23use common_config::Configurable;
24#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
25use common_meta::distributed_time_constants::META_LEASE_SECS;
26use common_meta::kv_backend::chroot::ChrootKvBackend;
27use common_meta::kv_backend::etcd::EtcdStore;
28use common_meta::kv_backend::memory::MemoryKvBackend;
29#[cfg(feature = "mysql_kvbackend")]
30use common_meta::kv_backend::rds::MySqlStore;
31#[cfg(feature = "pg_kvbackend")]
32use common_meta::kv_backend::rds::PgStore;
33use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
34use common_telemetry::info;
35#[cfg(feature = "pg_kvbackend")]
36use deadpool_postgres::{Config, Runtime};
37use etcd_client::Client;
38use servers::configurator::ConfiguratorRef;
39use servers::export_metrics::ExportMetricsTask;
40use servers::http::{HttpServer, HttpServerBuilder};
41use servers::metrics_handler::MetricsHandler;
42use servers::server::Server;
43#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
44use snafu::OptionExt;
45use snafu::ResultExt;
46#[cfg(feature = "mysql_kvbackend")]
47use sqlx::mysql::MySqlConnectOptions;
48#[cfg(feature = "mysql_kvbackend")]
49use sqlx::mysql::MySqlPool;
50use tokio::net::TcpListener;
51use tokio::sync::mpsc::{self, Receiver, Sender};
52use tokio::sync::{oneshot, Mutex};
53#[cfg(feature = "pg_kvbackend")]
54use tokio_postgres::NoTls;
55use tonic::codec::CompressionEncoding;
56use tonic::transport::server::{Router, TcpIncoming};
57
58use crate::election::etcd::EtcdElection;
59#[cfg(feature = "mysql_kvbackend")]
60use crate::election::rds::mysql::MySqlElection;
61#[cfg(feature = "pg_kvbackend")]
62use crate::election::rds::postgres::PgElection;
63#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
64use crate::election::CANDIDATE_LEASE_SECS;
65use crate::metasrv::builder::MetasrvBuilder;
66use crate::metasrv::{BackendImpl, Metasrv, MetasrvOptions, SelectTarget, SelectorRef};
67use crate::node_excluder::NodeExcluderRef;
68use crate::selector::lease_based::LeaseBasedSelector;
69use crate::selector::load_based::LoadBasedSelector;
70use crate::selector::round_robin::RoundRobinSelector;
71use crate::selector::weight_compute::RegionNumsBasedWeightCompute;
72use crate::selector::SelectorType;
73use crate::service::admin;
74use crate::{error, Result};
75
76pub struct MetasrvInstance {
77 metasrv: Arc<Metasrv>,
78
79 http_server: HttpServer,
80
81 opts: MetasrvOptions,
82
83 signal_sender: Option<Sender<()>>,
84
85 plugins: Plugins,
86
87 export_metrics_task: Option<ExportMetricsTask>,
88
89 serve_state: Arc<Mutex<Option<oneshot::Receiver<Result<()>>>>>,
91
92 bind_addr: Option<SocketAddr>,
94}
95
96impl MetasrvInstance {
97 pub async fn new(
98 opts: MetasrvOptions,
99 plugins: Plugins,
100 metasrv: Metasrv,
101 ) -> Result<MetasrvInstance> {
102 let http_server = HttpServerBuilder::new(opts.http.clone())
103 .with_metrics_handler(MetricsHandler)
104 .with_greptime_config_options(opts.to_toml().context(error::TomlFormatSnafu)?)
105 .build();
106
107 let metasrv = Arc::new(metasrv);
108 plugins.insert::<Arc<Metasrv>>(metasrv.clone());
110 let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
111 .context(error::InitExportMetricsTaskSnafu)?;
112 Ok(MetasrvInstance {
113 metasrv,
114 http_server,
115 opts,
116 signal_sender: None,
117 plugins,
118 export_metrics_task,
119 serve_state: Default::default(),
120 bind_addr: None,
121 })
122 }
123
124 pub async fn start(&mut self) -> Result<()> {
125 self.metasrv.try_start().await?;
126
127 if let Some(t) = self.export_metrics_task.as_ref() {
128 t.start(None).context(error::InitExportMetricsTaskSnafu)?
129 }
130
131 let (tx, rx) = mpsc::channel::<()>(1);
132
133 self.signal_sender = Some(tx);
134
135 let mut router = router(self.metasrv.clone());
136 if let Some(configurator) = self.metasrv.plugins().get::<ConfiguratorRef>() {
137 router = configurator.config_grpc(router);
138 }
139
140 let (serve_state_tx, serve_state_rx) = oneshot::channel();
141
142 let socket_addr =
143 bootstrap_metasrv_with_router(&self.opts.grpc.bind_addr, router, serve_state_tx, rx)
144 .await?;
145 self.bind_addr = Some(socket_addr);
146
147 let addr = self.opts.http.addr.parse().context(error::ParseAddrSnafu {
148 addr: &self.opts.http.addr,
149 })?;
150 self.http_server
151 .start(addr)
152 .await
153 .context(error::StartHttpSnafu)?;
154
155 *self.serve_state.lock().await = Some(serve_state_rx);
156 Ok(())
157 }
158
159 pub async fn shutdown(&self) -> Result<()> {
160 if let Some(mut rx) = self.serve_state.lock().await.take() {
161 if let Ok(Err(err)) = rx.try_recv() {
162 common_telemetry::error!(err; "Metasrv start failed")
163 }
164 }
165 if let Some(signal) = &self.signal_sender {
166 signal
167 .send(())
168 .await
169 .context(error::SendShutdownSignalSnafu)?;
170 }
171 self.metasrv.shutdown().await?;
172 self.http_server
173 .shutdown()
174 .await
175 .context(error::ShutdownServerSnafu {
176 server: self.http_server.name(),
177 })?;
178 Ok(())
179 }
180
181 pub fn plugins(&self) -> Plugins {
182 self.plugins.clone()
183 }
184
185 pub fn get_inner(&self) -> &Metasrv {
186 &self.metasrv
187 }
188 pub fn bind_addr(&self) -> &Option<SocketAddr> {
189 &self.bind_addr
190 }
191}
192
193pub async fn bootstrap_metasrv_with_router(
194 bind_addr: &str,
195 router: Router,
196 serve_state_tx: oneshot::Sender<Result<()>>,
197 mut shutdown_rx: Receiver<()>,
198) -> Result<SocketAddr> {
199 let listener = TcpListener::bind(bind_addr)
200 .await
201 .context(error::TcpBindSnafu { addr: bind_addr })?;
202
203 let real_bind_addr = listener
204 .local_addr()
205 .context(error::TcpBindSnafu { addr: bind_addr })?;
206
207 info!("gRPC server is bound to: {}", real_bind_addr);
208
209 let incoming =
210 TcpIncoming::from_listener(listener, true, None).context(error::TcpIncomingSnafu)?;
211
212 let _handle = common_runtime::spawn_global(async move {
213 let result = router
214 .serve_with_incoming_shutdown(incoming, async {
215 let _ = shutdown_rx.recv().await;
216 })
217 .await
218 .inspect_err(|err| common_telemetry::error!(err;"Failed to start metasrv"))
219 .context(error::StartGrpcSnafu);
220 let _ = serve_state_tx.send(result);
221 });
222
223 Ok(real_bind_addr)
224}
225
226#[macro_export]
227macro_rules! add_compressed_service {
228 ($builder:expr, $server:expr) => {
229 $builder.add_service(
230 $server
231 .accept_compressed(CompressionEncoding::Gzip)
232 .accept_compressed(CompressionEncoding::Zstd)
233 .send_compressed(CompressionEncoding::Gzip)
234 .send_compressed(CompressionEncoding::Zstd),
235 )
236 };
237}
238
239pub fn router(metasrv: Arc<Metasrv>) -> Router {
240 let mut router = tonic::transport::Server::builder().accept_http1(true); let router = add_compressed_service!(router, HeartbeatServer::from_arc(metasrv.clone()));
242 let router = add_compressed_service!(router, StoreServer::from_arc(metasrv.clone()));
243 let router = add_compressed_service!(router, ClusterServer::from_arc(metasrv.clone()));
244 let router = add_compressed_service!(router, ProcedureServiceServer::from_arc(metasrv.clone()));
245 router.add_service(admin::make_admin_service(metasrv))
246}
247
248pub async fn metasrv_builder(
249 opts: &MetasrvOptions,
250 plugins: Plugins,
251 kv_backend: Option<KvBackendRef>,
252) -> Result<MetasrvBuilder> {
253 let (mut kv_backend, election) = match (kv_backend, &opts.backend) {
254 (Some(kv_backend), _) => (kv_backend, None),
255 (None, BackendImpl::MemoryStore) => (Arc::new(MemoryKvBackend::new()) as _, None),
256 (None, BackendImpl::EtcdStore) => {
257 let etcd_client = create_etcd_client(&opts.store_addrs).await?;
258 let kv_backend = EtcdStore::with_etcd_client(etcd_client.clone(), opts.max_txn_ops);
259 let election = EtcdElection::with_etcd_client(
260 &opts.grpc.server_addr,
261 etcd_client,
262 opts.store_key_prefix.clone(),
263 )
264 .await?;
265
266 (kv_backend, Some(election))
267 }
268 #[cfg(feature = "pg_kvbackend")]
269 (None, BackendImpl::PostgresStore) => {
270 use std::time::Duration;
271
272 use common_meta::distributed_time_constants::POSTGRES_KEEP_ALIVE_SECS;
273
274 use crate::election::rds::postgres::ElectionPgClient;
275
276 let candidate_lease_ttl = Duration::from_secs(CANDIDATE_LEASE_SECS);
277 let execution_timeout = Duration::from_secs(META_LEASE_SECS);
278 let statement_timeout = Duration::from_secs(META_LEASE_SECS);
279 let idle_session_timeout = Duration::from_secs(META_LEASE_SECS);
280 let meta_lease_ttl = Duration::from_secs(META_LEASE_SECS);
281
282 let mut cfg = Config::new();
283 cfg.keepalives = Some(true);
284 cfg.keepalives_idle = Some(Duration::from_secs(POSTGRES_KEEP_ALIVE_SECS));
285 let pool = create_postgres_pool_with(&opts.store_addrs, cfg).await?;
287
288 let election_client = ElectionPgClient::new(
289 pool,
290 execution_timeout,
291 idle_session_timeout,
292 statement_timeout,
293 )?;
294 let election = PgElection::with_pg_client(
295 opts.grpc.server_addr.clone(),
296 election_client,
297 opts.store_key_prefix.clone(),
298 candidate_lease_ttl,
299 meta_lease_ttl,
300 &opts.meta_table_name,
301 opts.meta_election_lock_id,
302 )
303 .await?;
304
305 let pool = create_postgres_pool(&opts.store_addrs).await?;
306 let kv_backend = PgStore::with_pg_pool(pool, &opts.meta_table_name, opts.max_txn_ops)
307 .await
308 .context(error::KvBackendSnafu)?;
309
310 (kv_backend, Some(election))
311 }
312 #[cfg(feature = "mysql_kvbackend")]
313 (None, BackendImpl::MysqlStore) => {
314 use std::time::Duration;
315
316 use crate::election::rds::mysql::ElectionMysqlClient;
317
318 let pool = create_mysql_pool(&opts.store_addrs).await?;
319 let kv_backend =
320 MySqlStore::with_mysql_pool(pool, &opts.meta_table_name, opts.max_txn_ops)
321 .await
322 .context(error::KvBackendSnafu)?;
323 let election_table_name = opts.meta_table_name.clone() + "_election";
325 let pool = create_mysql_pool(&opts.store_addrs).await?;
327 let execution_timeout = Duration::from_secs(META_LEASE_SECS);
328 let statement_timeout = Duration::from_secs(META_LEASE_SECS);
329 let idle_session_timeout = Duration::from_secs(META_LEASE_SECS);
330 let innode_lock_wait_timeout = Duration::from_secs(META_LEASE_SECS / 2);
331 let meta_lease_ttl = Duration::from_secs(META_LEASE_SECS);
332 let candidate_lease_ttl = Duration::from_secs(CANDIDATE_LEASE_SECS);
333
334 let election_client = ElectionMysqlClient::new(
335 pool,
336 execution_timeout,
337 statement_timeout,
338 innode_lock_wait_timeout,
339 idle_session_timeout,
340 &election_table_name,
341 );
342 let election = MySqlElection::with_mysql_client(
343 opts.grpc.server_addr.clone(),
344 election_client,
345 opts.store_key_prefix.clone(),
346 candidate_lease_ttl,
347 meta_lease_ttl,
348 &election_table_name,
349 )
350 .await?;
351 (kv_backend, Some(election))
352 }
353 };
354
355 if !opts.store_key_prefix.is_empty() {
356 info!(
357 "using chroot kv backend with prefix: {prefix}",
358 prefix = opts.store_key_prefix
359 );
360 kv_backend = Arc::new(ChrootKvBackend::new(
361 opts.store_key_prefix.clone().into_bytes(),
362 kv_backend,
363 ))
364 }
365
366 let in_memory = Arc::new(MemoryKvBackend::new()) as ResettableKvBackendRef;
367
368 let node_excluder = plugins
369 .get::<NodeExcluderRef>()
370 .unwrap_or_else(|| Arc::new(Vec::new()) as NodeExcluderRef);
371 let selector = if let Some(selector) = plugins.get::<SelectorRef>() {
372 info!("Using selector from plugins");
373 selector
374 } else {
375 let selector = match opts.selector {
376 SelectorType::LoadBased => Arc::new(LoadBasedSelector::new(
377 RegionNumsBasedWeightCompute,
378 node_excluder,
379 )) as SelectorRef,
380 SelectorType::LeaseBased => {
381 Arc::new(LeaseBasedSelector::new(node_excluder)) as SelectorRef
382 }
383 SelectorType::RoundRobin => Arc::new(RoundRobinSelector::new(
384 SelectTarget::Datanode,
385 node_excluder,
386 )) as SelectorRef,
387 };
388 info!(
389 "Using selector from options, selector type: {}",
390 opts.selector.as_ref()
391 );
392 selector
393 };
394
395 Ok(MetasrvBuilder::new()
396 .options(opts.clone())
397 .kv_backend(kv_backend)
398 .in_memory(in_memory)
399 .selector(selector)
400 .election(election)
401 .plugins(plugins))
402}
403
404pub async fn create_etcd_client(store_addrs: &[String]) -> Result<Client> {
405 let etcd_endpoints = store_addrs
406 .iter()
407 .map(|x| x.trim())
408 .filter(|x| !x.is_empty())
409 .collect::<Vec<_>>();
410 Client::connect(&etcd_endpoints, None)
411 .await
412 .context(error::ConnectEtcdSnafu)
413}
414
415#[cfg(feature = "pg_kvbackend")]
416pub async fn create_postgres_pool(store_addrs: &[String]) -> Result<deadpool_postgres::Pool> {
420 create_postgres_pool_with(store_addrs, Config::new()).await
421}
422
423#[cfg(feature = "pg_kvbackend")]
424pub async fn create_postgres_pool_with(
428 store_addrs: &[String],
429 mut cfg: Config,
430) -> Result<deadpool_postgres::Pool> {
431 let postgres_url = store_addrs.first().context(error::InvalidArgumentsSnafu {
432 err_msg: "empty store addrs",
433 })?;
434 cfg.url = Some(postgres_url.to_string());
435 let pool = cfg
436 .create_pool(Some(Runtime::Tokio1), NoTls)
437 .context(error::CreatePostgresPoolSnafu)?;
438 Ok(pool)
439}
440
441#[cfg(feature = "mysql_kvbackend")]
442async fn setup_mysql_options(store_addrs: &[String]) -> Result<MySqlConnectOptions> {
443 let mysql_url = store_addrs.first().context(error::InvalidArgumentsSnafu {
444 err_msg: "empty store addrs",
445 })?;
446 let opts: MySqlConnectOptions = mysql_url
448 .parse()
449 .context(error::ParseMySqlUrlSnafu { mysql_url })?;
450 let opts = opts
451 .no_engine_substitution(false)
452 .pipes_as_concat(false)
453 .timezone(None)
454 .set_names(false);
455 Ok(opts)
456}
457
458#[cfg(feature = "mysql_kvbackend")]
459pub async fn create_mysql_pool(store_addrs: &[String]) -> Result<MySqlPool> {
460 let opts = setup_mysql_options(store_addrs).await?;
461 let pool = MySqlPool::connect_with(opts)
462 .await
463 .context(error::CreateMySqlPoolSnafu)?;
464
465 Ok(pool)
466}