1use std::net::SocketAddr;
16use std::sync::Arc;
17
18use api::v1::meta::cluster_server::ClusterServer;
19use api::v1::meta::config_server::ConfigServer;
20use api::v1::meta::heartbeat_server::HeartbeatServer;
21use api::v1::meta::procedure_service_server::ProcedureServiceServer;
22use api::v1::meta::store_server::StoreServer;
23use common_base::Plugins;
24use common_config::Configurable;
25#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
26use common_meta::distributed_time_constants::META_LEASE_SECS;
27use common_meta::kv_backend::chroot::ChrootKvBackend;
28use common_meta::kv_backend::etcd::EtcdStore;
29use common_meta::kv_backend::memory::MemoryKvBackend;
30use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
31use common_telemetry::info;
32use either::Either;
33use servers::configurator::GrpcRouterConfiguratorRef;
34use servers::http::{HttpServer, HttpServerBuilder};
35use servers::metrics_handler::MetricsHandler;
36use servers::server::Server;
37use snafu::ResultExt;
38use tokio::net::TcpListener;
39use tokio::sync::mpsc::{self, Receiver, Sender};
40use tokio::sync::{Mutex, oneshot};
41use tonic::codec::CompressionEncoding;
42use tonic::transport::server::{Router, TcpIncoming};
43
44use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
45#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
46use crate::election::CANDIDATE_LEASE_SECS;
47use crate::election::etcd::EtcdElection;
48use crate::error::OtherSnafu;
49use crate::metasrv::builder::MetasrvBuilder;
50use crate::metasrv::{
51 BackendImpl, ElectionRef, Metasrv, MetasrvOptions, SelectTarget, SelectorRef,
52};
53use crate::selector::lease_based::LeaseBasedSelector;
54use crate::selector::load_based::LoadBasedSelector;
55use crate::selector::round_robin::RoundRobinSelector;
56use crate::selector::weight_compute::RegionNumsBasedWeightCompute;
57use crate::selector::{Selector, SelectorType};
58use crate::service::admin;
59use crate::service::admin::admin_axum_router;
60use crate::utils::etcd::create_etcd_client_with_tls;
61use crate::{Result, error};
62
63pub struct MetasrvInstance {
64 metasrv: Arc<Metasrv>,
65
66 http_server: Either<Option<HttpServerBuilder>, HttpServer>,
67
68 opts: MetasrvOptions,
69
70 signal_sender: Option<Sender<()>>,
71
72 plugins: Plugins,
73
74 serve_state: Arc<Mutex<Option<oneshot::Receiver<Result<()>>>>>,
76
77 bind_addr: Option<SocketAddr>,
79}
80
81impl MetasrvInstance {
82 pub async fn new(metasrv: Metasrv) -> Result<MetasrvInstance> {
83 let opts = metasrv.options().clone();
84 let plugins = metasrv.plugins().clone();
85 let metasrv = Arc::new(metasrv);
86
87 let extra_routers = admin_axum_router(metasrv.clone());
89
90 let mut builder = HttpServerBuilder::new(opts.http.clone())
91 .with_metrics_handler(MetricsHandler)
92 .with_greptime_config_options(opts.to_toml().context(error::TomlFormatSnafu)?);
93 builder = builder.with_extra_router(extra_routers);
94
95 plugins.insert::<Arc<Metasrv>>(metasrv.clone());
97 Ok(MetasrvInstance {
98 metasrv,
99 http_server: Either::Left(Some(builder)),
100 opts,
101 signal_sender: None,
102 plugins,
103 serve_state: Default::default(),
104 bind_addr: None,
105 })
106 }
107
108 pub async fn start(&mut self) -> Result<()> {
109 if let Some(builder) = self.http_server.as_mut().left()
110 && let Some(builder) = builder.take()
111 {
112 let mut server = builder.build();
113
114 let addr = self.opts.http.addr.parse().context(error::ParseAddrSnafu {
115 addr: &self.opts.http.addr,
116 })?;
117 info!("starting http server at {}", addr);
118 server.start(addr).await.context(error::StartHttpSnafu)?;
119
120 self.http_server = Either::Right(server);
121 } else {
122 return Ok(());
126 };
127
128 self.metasrv.try_start().await?;
129
130 let (tx, rx) = mpsc::channel::<()>(1);
131
132 self.signal_sender = Some(tx);
133
134 let mut router = router(self.metasrv.clone());
136 if let Some(configurator) = self
137 .metasrv
138 .plugins()
139 .get::<GrpcRouterConfiguratorRef<()>>()
140 {
141 router = configurator
142 .configure_grpc_router(router, ())
143 .await
144 .context(OtherSnafu)?;
145 }
146
147 let (serve_state_tx, serve_state_rx) = oneshot::channel();
148
149 let socket_addr =
150 bootstrap_metasrv_with_router(&self.opts.grpc.bind_addr, router, serve_state_tx, rx)
151 .await?;
152 self.bind_addr = Some(socket_addr);
153
154 *self.serve_state.lock().await = Some(serve_state_rx);
155 Ok(())
156 }
157
158 pub async fn shutdown(&self) -> Result<()> {
159 if let Some(mut rx) = self.serve_state.lock().await.take()
160 && let Ok(Err(err)) = rx.try_recv()
161 {
162 common_telemetry::error!(err; "Metasrv start failed")
163 }
164 if let Some(signal) = &self.signal_sender {
165 signal
166 .send(())
167 .await
168 .context(error::SendShutdownSignalSnafu)?;
169 }
170 self.metasrv.shutdown().await?;
171
172 if let Some(http_server) = self.http_server.as_ref().right() {
173 http_server
174 .shutdown()
175 .await
176 .context(error::ShutdownServerSnafu {
177 server: http_server.name(),
178 })?;
179 }
180 Ok(())
181 }
182
183 pub fn plugins(&self) -> Plugins {
184 self.plugins.clone()
185 }
186
187 pub fn get_inner(&self) -> &Metasrv {
188 &self.metasrv
189 }
190 pub fn bind_addr(&self) -> &Option<SocketAddr> {
191 &self.bind_addr
192 }
193
194 pub fn mut_http_server(&mut self) -> &mut Either<Option<HttpServerBuilder>, HttpServer> {
195 &mut self.http_server
196 }
197
198 pub fn http_server(&self) -> Option<&HttpServer> {
199 self.http_server.as_ref().right()
200 }
201}
202
203pub async fn bootstrap_metasrv_with_router(
204 bind_addr: &str,
205 router: Router,
206 serve_state_tx: oneshot::Sender<Result<()>>,
207 mut shutdown_rx: Receiver<()>,
208) -> Result<SocketAddr> {
209 let listener = TcpListener::bind(bind_addr)
210 .await
211 .context(error::TcpBindSnafu { addr: bind_addr })?;
212
213 let real_bind_addr = listener
214 .local_addr()
215 .context(error::TcpBindSnafu { addr: bind_addr })?;
216
217 info!("gRPC server is bound to: {}", real_bind_addr);
218
219 let incoming = TcpIncoming::from(listener).with_nodelay(Some(true));
220
221 let _handle = common_runtime::spawn_global(async move {
222 let result = router
223 .serve_with_incoming_shutdown(incoming, async {
224 let _ = shutdown_rx.recv().await;
225 })
226 .await
227 .inspect_err(|err| common_telemetry::error!(err;"Failed to start metasrv"))
228 .context(error::StartGrpcSnafu);
229 let _ = serve_state_tx.send(result);
230 });
231
232 Ok(real_bind_addr)
233}
234
235#[macro_export]
236macro_rules! add_compressed_service {
237 ($builder:expr, $server:expr) => {
238 $builder.add_service(
239 $server
240 .accept_compressed(CompressionEncoding::Gzip)
241 .accept_compressed(CompressionEncoding::Zstd)
242 .send_compressed(CompressionEncoding::Gzip)
243 .send_compressed(CompressionEncoding::Zstd),
244 )
245 };
246}
247
248pub fn router(metasrv: Arc<Metasrv>) -> Router {
249 let mut router = tonic::transport::Server::builder()
250 .accept_http1(true)
252 .http2_keepalive_interval(Some(metasrv.options().grpc.http2_keep_alive_interval))
254 .http2_keepalive_timeout(Some(metasrv.options().grpc.http2_keep_alive_timeout));
255 let router = add_compressed_service!(router, HeartbeatServer::from_arc(metasrv.clone()));
256 let router = add_compressed_service!(router, StoreServer::from_arc(metasrv.clone()));
257 let router = add_compressed_service!(router, ClusterServer::from_arc(metasrv.clone()));
258 let router = add_compressed_service!(router, ProcedureServiceServer::from_arc(metasrv.clone()));
259 let router = add_compressed_service!(router, ConfigServer::from_arc(metasrv.clone()));
260 router.add_service(admin::make_admin_service(metasrv))
261}
262
263pub async fn metasrv_builder(
264 opts: &MetasrvOptions,
265 plugins: Plugins,
266 kv_backend: Option<KvBackendRef>,
267) -> Result<MetasrvBuilder> {
268 let (mut kv_backend, election) = match (kv_backend, &opts.backend) {
269 (Some(kv_backend), _) => (kv_backend, None),
270 (None, BackendImpl::MemoryStore) => (Arc::new(MemoryKvBackend::new()) as _, None),
271 (None, BackendImpl::EtcdStore) => {
272 let etcd_client = create_etcd_client_with_tls(
273 &opts.store_addrs,
274 &opts.backend_client,
275 opts.backend_tls.as_ref(),
276 )
277 .await?;
278 let kv_backend = EtcdStore::with_etcd_client(etcd_client.clone(), opts.max_txn_ops);
279 let election = EtcdElection::with_etcd_client(
280 &opts.grpc.server_addr,
281 etcd_client,
282 opts.store_key_prefix.clone(),
283 )
284 .await?;
285
286 (kv_backend, Some(election))
287 }
288 #[cfg(feature = "pg_kvbackend")]
289 (None, BackendImpl::PostgresStore) => {
290 use std::time::Duration;
291
292 use common_meta::distributed_time_constants::POSTGRES_KEEP_ALIVE_SECS;
293 use common_meta::kv_backend::rds::PgStore;
294 use deadpool_postgres::{Config, ManagerConfig, RecyclingMethod};
295
296 use crate::election::rds::postgres::{ElectionPgClient, PgElection};
297 use crate::utils::postgres::create_postgres_pool;
298
299 let candidate_lease_ttl = Duration::from_secs(CANDIDATE_LEASE_SECS);
300 let execution_timeout = Duration::from_secs(META_LEASE_SECS);
301 let statement_timeout = Duration::from_secs(META_LEASE_SECS);
302 let idle_session_timeout = Duration::from_secs(META_LEASE_SECS);
303 let meta_lease_ttl = Duration::from_secs(META_LEASE_SECS);
304
305 let mut cfg = Config::new();
306 cfg.keepalives = Some(true);
307 cfg.keepalives_idle = Some(Duration::from_secs(POSTGRES_KEEP_ALIVE_SECS));
308 cfg.manager = Some(ManagerConfig {
309 recycling_method: RecyclingMethod::Verified,
310 });
311 let pool = create_postgres_pool(
313 &opts.store_addrs,
314 Some(cfg.clone()),
315 opts.backend_tls.clone(),
316 )
317 .await?;
318
319 let election_client = ElectionPgClient::new(
320 pool,
321 execution_timeout,
322 idle_session_timeout,
323 statement_timeout,
324 )?;
325 let election = PgElection::with_pg_client(
326 opts.grpc.server_addr.clone(),
327 election_client,
328 opts.store_key_prefix.clone(),
329 candidate_lease_ttl,
330 meta_lease_ttl,
331 opts.meta_schema_name.as_deref(),
332 &opts.meta_table_name,
333 opts.meta_election_lock_id,
334 )
335 .await?;
336
337 let pool = create_postgres_pool(&opts.store_addrs, Some(cfg), opts.backend_tls.clone())
338 .await?;
339 let kv_backend = PgStore::with_pg_pool(
340 pool,
341 opts.meta_schema_name.as_deref(),
342 &opts.meta_table_name,
343 opts.max_txn_ops,
344 opts.auto_create_schema,
345 )
346 .await
347 .context(error::KvBackendSnafu)?;
348
349 (kv_backend, Some(election))
350 }
351 #[cfg(feature = "mysql_kvbackend")]
352 (None, BackendImpl::MysqlStore) => {
353 use std::time::Duration;
354
355 use common_meta::kv_backend::rds::MySqlStore;
356
357 use crate::election::rds::mysql::{ElectionMysqlClient, MySqlElection};
358 use crate::utils::mysql::create_mysql_pool;
359
360 let pool = create_mysql_pool(&opts.store_addrs, opts.backend_tls.as_ref()).await?;
361 let kv_backend =
362 MySqlStore::with_mysql_pool(pool, &opts.meta_table_name, opts.max_txn_ops)
363 .await
364 .context(error::KvBackendSnafu)?;
365 let election_table_name = opts.meta_table_name.clone() + "_election";
367 let pool = create_mysql_pool(&opts.store_addrs, opts.backend_tls.as_ref()).await?;
369 let execution_timeout = Duration::from_secs(META_LEASE_SECS);
370 let statement_timeout = Duration::from_secs(META_LEASE_SECS);
371 let idle_session_timeout = Duration::from_secs(META_LEASE_SECS);
372 let innode_lock_wait_timeout = Duration::from_secs(META_LEASE_SECS / 2);
373 let meta_lease_ttl = Duration::from_secs(META_LEASE_SECS);
374 let candidate_lease_ttl = Duration::from_secs(CANDIDATE_LEASE_SECS);
375
376 let election_client = ElectionMysqlClient::new(
377 pool,
378 execution_timeout,
379 statement_timeout,
380 innode_lock_wait_timeout,
381 idle_session_timeout,
382 &election_table_name,
383 );
384 let election = MySqlElection::with_mysql_client(
385 opts.grpc.server_addr.clone(),
386 election_client,
387 opts.store_key_prefix.clone(),
388 candidate_lease_ttl,
389 meta_lease_ttl,
390 &election_table_name,
391 )
392 .await?;
393 (kv_backend, Some(election))
394 }
395 };
396
397 if !opts.store_key_prefix.is_empty() {
398 info!(
399 "using chroot kv backend with prefix: {prefix}",
400 prefix = opts.store_key_prefix
401 );
402 kv_backend = Arc::new(ChrootKvBackend::new(
403 opts.store_key_prefix.clone().into_bytes(),
404 kv_backend,
405 ))
406 }
407
408 let in_memory = Arc::new(MemoryKvBackend::new()) as ResettableKvBackendRef;
409 let meta_peer_client = build_default_meta_peer_client(&election, &in_memory);
410
411 let selector = if let Some(selector) = plugins.get::<SelectorRef>() {
412 info!("Using selector from plugins");
413 selector
414 } else {
415 let selector: Arc<
416 dyn Selector<
417 Context = crate::metasrv::SelectorContext,
418 Output = Vec<common_meta::peer::Peer>,
419 >,
420 > = match opts.selector {
421 SelectorType::LoadBased => Arc::new(LoadBasedSelector::new(
422 RegionNumsBasedWeightCompute,
423 meta_peer_client.clone(),
424 )) as SelectorRef,
425 SelectorType::LeaseBased => Arc::new(LeaseBasedSelector) as SelectorRef,
426 SelectorType::RoundRobin => {
427 Arc::new(RoundRobinSelector::new(SelectTarget::Datanode)) as SelectorRef
428 }
429 };
430 info!(
431 "Using selector from options, selector type: {}",
432 opts.selector.as_ref()
433 );
434 selector
435 };
436
437 Ok(MetasrvBuilder::new()
438 .options(opts.clone())
439 .kv_backend(kv_backend)
440 .in_memory(in_memory)
441 .selector(selector)
442 .election(election)
443 .meta_peer_client(meta_peer_client)
444 .plugins(plugins))
445}
446
447pub(crate) fn build_default_meta_peer_client(
448 election: &Option<ElectionRef>,
449 in_memory: &ResettableKvBackendRef,
450) -> MetaPeerClientRef {
451 MetaPeerClientBuilder::default()
452 .election(election.clone())
453 .in_memory(in_memory.clone())
454 .build()
455 .map(Arc::new)
456 .unwrap()
458}