1use std::net::SocketAddr;
16use std::sync::Arc;
17
18use api::v1::meta::cluster_server::ClusterServer;
19use api::v1::meta::heartbeat_server::HeartbeatServer;
20use api::v1::meta::procedure_service_server::ProcedureServiceServer;
21use api::v1::meta::store_server::StoreServer;
22use common_base::Plugins;
23use common_config::Configurable;
24#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
25use common_meta::distributed_time_constants::META_LEASE_SECS;
26use common_meta::kv_backend::chroot::ChrootKvBackend;
27use common_meta::kv_backend::etcd::EtcdStore;
28use common_meta::kv_backend::memory::MemoryKvBackend;
29use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
30use common_telemetry::info;
31use either::Either;
32use servers::configurator::GrpcRouterConfiguratorRef;
33use servers::http::{HttpServer, HttpServerBuilder};
34use servers::metrics_handler::MetricsHandler;
35use servers::server::Server;
36use snafu::ResultExt;
37use tokio::net::TcpListener;
38use tokio::sync::mpsc::{self, Receiver, Sender};
39use tokio::sync::{Mutex, oneshot};
40use tonic::codec::CompressionEncoding;
41use tonic::transport::server::{Router, TcpIncoming};
42
43use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
44#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
45use crate::election::CANDIDATE_LEASE_SECS;
46use crate::election::etcd::EtcdElection;
47use crate::error::OtherSnafu;
48use crate::metasrv::builder::MetasrvBuilder;
49use crate::metasrv::{
50 BackendImpl, ElectionRef, Metasrv, MetasrvOptions, SelectTarget, SelectorRef,
51};
52use crate::selector::lease_based::LeaseBasedSelector;
53use crate::selector::load_based::LoadBasedSelector;
54use crate::selector::round_robin::RoundRobinSelector;
55use crate::selector::weight_compute::RegionNumsBasedWeightCompute;
56use crate::selector::{Selector, SelectorType};
57use crate::service::admin;
58use crate::service::admin::admin_axum_router;
59use crate::utils::etcd::create_etcd_client_with_tls;
60use crate::{Result, error};
61
62pub struct MetasrvInstance {
63 metasrv: Arc<Metasrv>,
64
65 http_server: Either<Option<HttpServerBuilder>, HttpServer>,
66
67 opts: MetasrvOptions,
68
69 signal_sender: Option<Sender<()>>,
70
71 plugins: Plugins,
72
73 serve_state: Arc<Mutex<Option<oneshot::Receiver<Result<()>>>>>,
75
76 bind_addr: Option<SocketAddr>,
78}
79
80impl MetasrvInstance {
81 pub async fn new(metasrv: Metasrv) -> Result<MetasrvInstance> {
82 let opts = metasrv.options().clone();
83 let plugins = metasrv.plugins().clone();
84 let metasrv = Arc::new(metasrv);
85
86 let extra_routers = admin_axum_router(metasrv.clone());
88
89 let mut builder = HttpServerBuilder::new(opts.http.clone())
90 .with_metrics_handler(MetricsHandler)
91 .with_greptime_config_options(opts.to_toml().context(error::TomlFormatSnafu)?);
92 builder = builder.with_extra_router(extra_routers);
93
94 plugins.insert::<Arc<Metasrv>>(metasrv.clone());
96 Ok(MetasrvInstance {
97 metasrv,
98 http_server: Either::Left(Some(builder)),
99 opts,
100 signal_sender: None,
101 plugins,
102 serve_state: Default::default(),
103 bind_addr: None,
104 })
105 }
106
107 pub async fn start(&mut self) -> Result<()> {
108 if let Some(builder) = self.http_server.as_mut().left()
109 && let Some(builder) = builder.take()
110 {
111 let mut server = builder.build();
112
113 let addr = self.opts.http.addr.parse().context(error::ParseAddrSnafu {
114 addr: &self.opts.http.addr,
115 })?;
116 info!("starting http server at {}", addr);
117 server.start(addr).await.context(error::StartHttpSnafu)?;
118
119 self.http_server = Either::Right(server);
120 } else {
121 return Ok(());
125 };
126
127 self.metasrv.try_start().await?;
128
129 let (tx, rx) = mpsc::channel::<()>(1);
130
131 self.signal_sender = Some(tx);
132
133 let mut router = router(self.metasrv.clone());
135 if let Some(configurator) = self
136 .metasrv
137 .plugins()
138 .get::<GrpcRouterConfiguratorRef<()>>()
139 {
140 router = configurator
141 .configure_grpc_router(router, ())
142 .await
143 .context(OtherSnafu)?;
144 }
145
146 let (serve_state_tx, serve_state_rx) = oneshot::channel();
147
148 let socket_addr =
149 bootstrap_metasrv_with_router(&self.opts.grpc.bind_addr, router, serve_state_tx, rx)
150 .await?;
151 self.bind_addr = Some(socket_addr);
152
153 *self.serve_state.lock().await = Some(serve_state_rx);
154 Ok(())
155 }
156
157 pub async fn shutdown(&self) -> Result<()> {
158 if let Some(mut rx) = self.serve_state.lock().await.take()
159 && let Ok(Err(err)) = rx.try_recv()
160 {
161 common_telemetry::error!(err; "Metasrv start failed")
162 }
163 if let Some(signal) = &self.signal_sender {
164 signal
165 .send(())
166 .await
167 .context(error::SendShutdownSignalSnafu)?;
168 }
169 self.metasrv.shutdown().await?;
170
171 if let Some(http_server) = self.http_server.as_ref().right() {
172 http_server
173 .shutdown()
174 .await
175 .context(error::ShutdownServerSnafu {
176 server: http_server.name(),
177 })?;
178 }
179 Ok(())
180 }
181
182 pub fn plugins(&self) -> Plugins {
183 self.plugins.clone()
184 }
185
186 pub fn get_inner(&self) -> &Metasrv {
187 &self.metasrv
188 }
189 pub fn bind_addr(&self) -> &Option<SocketAddr> {
190 &self.bind_addr
191 }
192
193 pub fn mut_http_server(&mut self) -> &mut Either<Option<HttpServerBuilder>, HttpServer> {
194 &mut self.http_server
195 }
196
197 pub fn http_server(&self) -> Option<&HttpServer> {
198 self.http_server.as_ref().right()
199 }
200}
201
202pub async fn bootstrap_metasrv_with_router(
203 bind_addr: &str,
204 router: Router,
205 serve_state_tx: oneshot::Sender<Result<()>>,
206 mut shutdown_rx: Receiver<()>,
207) -> Result<SocketAddr> {
208 let listener = TcpListener::bind(bind_addr)
209 .await
210 .context(error::TcpBindSnafu { addr: bind_addr })?;
211
212 let real_bind_addr = listener
213 .local_addr()
214 .context(error::TcpBindSnafu { addr: bind_addr })?;
215
216 info!("gRPC server is bound to: {}", real_bind_addr);
217
218 let incoming = TcpIncoming::from(listener).with_nodelay(Some(true));
219
220 let _handle = common_runtime::spawn_global(async move {
221 let result = router
222 .serve_with_incoming_shutdown(incoming, async {
223 let _ = shutdown_rx.recv().await;
224 })
225 .await
226 .inspect_err(|err| common_telemetry::error!(err;"Failed to start metasrv"))
227 .context(error::StartGrpcSnafu);
228 let _ = serve_state_tx.send(result);
229 });
230
231 Ok(real_bind_addr)
232}
233
234#[macro_export]
235macro_rules! add_compressed_service {
236 ($builder:expr, $server:expr) => {
237 $builder.add_service(
238 $server
239 .accept_compressed(CompressionEncoding::Gzip)
240 .accept_compressed(CompressionEncoding::Zstd)
241 .send_compressed(CompressionEncoding::Gzip)
242 .send_compressed(CompressionEncoding::Zstd),
243 )
244 };
245}
246
247pub fn router(metasrv: Arc<Metasrv>) -> Router {
248 let mut router = tonic::transport::Server::builder()
249 .accept_http1(true)
251 .http2_keepalive_interval(Some(metasrv.options().grpc.http2_keep_alive_interval))
253 .http2_keepalive_timeout(Some(metasrv.options().grpc.http2_keep_alive_timeout));
254 let router = add_compressed_service!(router, HeartbeatServer::from_arc(metasrv.clone()));
255 let router = add_compressed_service!(router, StoreServer::from_arc(metasrv.clone()));
256 let router = add_compressed_service!(router, ClusterServer::from_arc(metasrv.clone()));
257 let router = add_compressed_service!(router, ProcedureServiceServer::from_arc(metasrv.clone()));
258 router.add_service(admin::make_admin_service(metasrv))
259}
260
261pub async fn metasrv_builder(
262 opts: &MetasrvOptions,
263 plugins: Plugins,
264 kv_backend: Option<KvBackendRef>,
265) -> Result<MetasrvBuilder> {
266 let (mut kv_backend, election) = match (kv_backend, &opts.backend) {
267 (Some(kv_backend), _) => (kv_backend, None),
268 (None, BackendImpl::MemoryStore) => (Arc::new(MemoryKvBackend::new()) as _, None),
269 (None, BackendImpl::EtcdStore) => {
270 let etcd_client = create_etcd_client_with_tls(
271 &opts.store_addrs,
272 &opts.backend_client,
273 opts.backend_tls.as_ref(),
274 )
275 .await?;
276 let kv_backend = EtcdStore::with_etcd_client(etcd_client.clone(), opts.max_txn_ops);
277 let election = EtcdElection::with_etcd_client(
278 &opts.grpc.server_addr,
279 etcd_client,
280 opts.store_key_prefix.clone(),
281 )
282 .await?;
283
284 (kv_backend, Some(election))
285 }
286 #[cfg(feature = "pg_kvbackend")]
287 (None, BackendImpl::PostgresStore) => {
288 use std::time::Duration;
289
290 use common_meta::distributed_time_constants::POSTGRES_KEEP_ALIVE_SECS;
291 use common_meta::kv_backend::rds::PgStore;
292 use deadpool_postgres::{Config, ManagerConfig, RecyclingMethod};
293
294 use crate::election::rds::postgres::{ElectionPgClient, PgElection};
295 use crate::utils::postgres::create_postgres_pool;
296
297 let candidate_lease_ttl = Duration::from_secs(CANDIDATE_LEASE_SECS);
298 let execution_timeout = Duration::from_secs(META_LEASE_SECS);
299 let statement_timeout = Duration::from_secs(META_LEASE_SECS);
300 let idle_session_timeout = Duration::from_secs(META_LEASE_SECS);
301 let meta_lease_ttl = Duration::from_secs(META_LEASE_SECS);
302
303 let mut cfg = Config::new();
304 cfg.keepalives = Some(true);
305 cfg.keepalives_idle = Some(Duration::from_secs(POSTGRES_KEEP_ALIVE_SECS));
306 cfg.manager = Some(ManagerConfig {
307 recycling_method: RecyclingMethod::Verified,
308 });
309 let pool = create_postgres_pool(
311 &opts.store_addrs,
312 Some(cfg.clone()),
313 opts.backend_tls.clone(),
314 )
315 .await?;
316
317 let election_client = ElectionPgClient::new(
318 pool,
319 execution_timeout,
320 idle_session_timeout,
321 statement_timeout,
322 )?;
323 let election = PgElection::with_pg_client(
324 opts.grpc.server_addr.clone(),
325 election_client,
326 opts.store_key_prefix.clone(),
327 candidate_lease_ttl,
328 meta_lease_ttl,
329 opts.meta_schema_name.as_deref(),
330 &opts.meta_table_name,
331 opts.meta_election_lock_id,
332 )
333 .await?;
334
335 let pool = create_postgres_pool(&opts.store_addrs, Some(cfg), opts.backend_tls.clone())
336 .await?;
337 let kv_backend = PgStore::with_pg_pool(
338 pool,
339 opts.meta_schema_name.as_deref(),
340 &opts.meta_table_name,
341 opts.max_txn_ops,
342 opts.auto_create_schema,
343 )
344 .await
345 .context(error::KvBackendSnafu)?;
346
347 (kv_backend, Some(election))
348 }
349 #[cfg(feature = "mysql_kvbackend")]
350 (None, BackendImpl::MysqlStore) => {
351 use std::time::Duration;
352
353 use common_meta::kv_backend::rds::MySqlStore;
354
355 use crate::election::rds::mysql::{ElectionMysqlClient, MySqlElection};
356 use crate::utils::mysql::create_mysql_pool;
357
358 let pool = create_mysql_pool(&opts.store_addrs, opts.backend_tls.as_ref()).await?;
359 let kv_backend =
360 MySqlStore::with_mysql_pool(pool, &opts.meta_table_name, opts.max_txn_ops)
361 .await
362 .context(error::KvBackendSnafu)?;
363 let election_table_name = opts.meta_table_name.clone() + "_election";
365 let pool = create_mysql_pool(&opts.store_addrs, opts.backend_tls.as_ref()).await?;
367 let execution_timeout = Duration::from_secs(META_LEASE_SECS);
368 let statement_timeout = Duration::from_secs(META_LEASE_SECS);
369 let idle_session_timeout = Duration::from_secs(META_LEASE_SECS);
370 let innode_lock_wait_timeout = Duration::from_secs(META_LEASE_SECS / 2);
371 let meta_lease_ttl = Duration::from_secs(META_LEASE_SECS);
372 let candidate_lease_ttl = Duration::from_secs(CANDIDATE_LEASE_SECS);
373
374 let election_client = ElectionMysqlClient::new(
375 pool,
376 execution_timeout,
377 statement_timeout,
378 innode_lock_wait_timeout,
379 idle_session_timeout,
380 &election_table_name,
381 );
382 let election = MySqlElection::with_mysql_client(
383 opts.grpc.server_addr.clone(),
384 election_client,
385 opts.store_key_prefix.clone(),
386 candidate_lease_ttl,
387 meta_lease_ttl,
388 &election_table_name,
389 )
390 .await?;
391 (kv_backend, Some(election))
392 }
393 };
394
395 if !opts.store_key_prefix.is_empty() {
396 info!(
397 "using chroot kv backend with prefix: {prefix}",
398 prefix = opts.store_key_prefix
399 );
400 kv_backend = Arc::new(ChrootKvBackend::new(
401 opts.store_key_prefix.clone().into_bytes(),
402 kv_backend,
403 ))
404 }
405
406 let in_memory = Arc::new(MemoryKvBackend::new()) as ResettableKvBackendRef;
407 let meta_peer_client = build_default_meta_peer_client(&election, &in_memory);
408
409 let selector = if let Some(selector) = plugins.get::<SelectorRef>() {
410 info!("Using selector from plugins");
411 selector
412 } else {
413 let selector: Arc<
414 dyn Selector<
415 Context = crate::metasrv::SelectorContext,
416 Output = Vec<common_meta::peer::Peer>,
417 >,
418 > = match opts.selector {
419 SelectorType::LoadBased => Arc::new(LoadBasedSelector::new(
420 RegionNumsBasedWeightCompute,
421 meta_peer_client.clone(),
422 )) as SelectorRef,
423 SelectorType::LeaseBased => Arc::new(LeaseBasedSelector) as SelectorRef,
424 SelectorType::RoundRobin => {
425 Arc::new(RoundRobinSelector::new(SelectTarget::Datanode)) as SelectorRef
426 }
427 };
428 info!(
429 "Using selector from options, selector type: {}",
430 opts.selector.as_ref()
431 );
432 selector
433 };
434
435 Ok(MetasrvBuilder::new()
436 .options(opts.clone())
437 .kv_backend(kv_backend)
438 .in_memory(in_memory)
439 .selector(selector)
440 .election(election)
441 .meta_peer_client(meta_peer_client)
442 .plugins(plugins))
443}
444
445pub(crate) fn build_default_meta_peer_client(
446 election: &Option<ElectionRef>,
447 in_memory: &ResettableKvBackendRef,
448) -> MetaPeerClientRef {
449 MetaPeerClientBuilder::default()
450 .election(election.clone())
451 .in_memory(in_memory.clone())
452 .build()
453 .map(Arc::new)
454 .unwrap()
456}