1use std::net::SocketAddr;
16use std::sync::Arc;
17
18use api::v1::meta::cluster_server::ClusterServer;
19use api::v1::meta::heartbeat_server::HeartbeatServer;
20use api::v1::meta::procedure_service_server::ProcedureServiceServer;
21use api::v1::meta::store_server::StoreServer;
22use common_base::Plugins;
23use common_config::Configurable;
24#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
25use common_meta::distributed_time_constants::META_LEASE_SECS;
26use common_meta::kv_backend::chroot::ChrootKvBackend;
27use common_meta::kv_backend::etcd::EtcdStore;
28use common_meta::kv_backend::memory::MemoryKvBackend;
29use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
30use common_telemetry::info;
31use either::Either;
32use servers::configurator::ConfiguratorRef;
33use servers::http::{HttpServer, HttpServerBuilder};
34use servers::metrics_handler::MetricsHandler;
35use servers::server::Server;
36use snafu::ResultExt;
37use tokio::net::TcpListener;
38use tokio::sync::mpsc::{self, Receiver, Sender};
39use tokio::sync::{Mutex, oneshot};
40use tonic::codec::CompressionEncoding;
41use tonic::transport::server::{Router, TcpIncoming};
42
43use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
44#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
45use crate::election::CANDIDATE_LEASE_SECS;
46use crate::election::etcd::EtcdElection;
47use crate::metasrv::builder::MetasrvBuilder;
48use crate::metasrv::{
49 BackendImpl, ElectionRef, Metasrv, MetasrvOptions, SelectTarget, SelectorRef,
50};
51use crate::selector::SelectorType;
52use crate::selector::lease_based::LeaseBasedSelector;
53use crate::selector::load_based::LoadBasedSelector;
54use crate::selector::round_robin::RoundRobinSelector;
55use crate::selector::weight_compute::RegionNumsBasedWeightCompute;
56use crate::service::admin;
57use crate::service::admin::admin_axum_router;
58use crate::utils::etcd::create_etcd_client_with_tls;
59use crate::{Result, error};
60
61pub struct MetasrvInstance {
62 metasrv: Arc<Metasrv>,
63
64 http_server: Either<Option<HttpServerBuilder>, HttpServer>,
65
66 opts: MetasrvOptions,
67
68 signal_sender: Option<Sender<()>>,
69
70 plugins: Plugins,
71
72 serve_state: Arc<Mutex<Option<oneshot::Receiver<Result<()>>>>>,
74
75 bind_addr: Option<SocketAddr>,
77}
78
79impl MetasrvInstance {
80 pub async fn new(metasrv: Metasrv) -> Result<MetasrvInstance> {
81 let opts = metasrv.options().clone();
82 let plugins = metasrv.plugins().clone();
83 let metasrv = Arc::new(metasrv);
84
85 let extra_routers = admin_axum_router(metasrv.clone());
87
88 let mut builder = HttpServerBuilder::new(opts.http.clone())
89 .with_metrics_handler(MetricsHandler)
90 .with_greptime_config_options(opts.to_toml().context(error::TomlFormatSnafu)?);
91 builder = builder.with_extra_router(extra_routers);
92
93 plugins.insert::<Arc<Metasrv>>(metasrv.clone());
95 Ok(MetasrvInstance {
96 metasrv,
97 http_server: Either::Left(Some(builder)),
98 opts,
99 signal_sender: None,
100 plugins,
101 serve_state: Default::default(),
102 bind_addr: None,
103 })
104 }
105
106 pub async fn start(&mut self) -> Result<()> {
107 if let Some(builder) = self.http_server.as_mut().left()
108 && let Some(builder) = builder.take()
109 {
110 let mut server = builder.build();
111
112 let addr = self.opts.http.addr.parse().context(error::ParseAddrSnafu {
113 addr: &self.opts.http.addr,
114 })?;
115 info!("starting http server at {}", addr);
116 server.start(addr).await.context(error::StartHttpSnafu)?;
117
118 self.http_server = Either::Right(server);
119 } else {
120 return Ok(());
124 };
125
126 self.metasrv.try_start().await?;
127
128 let (tx, rx) = mpsc::channel::<()>(1);
129
130 self.signal_sender = Some(tx);
131
132 let mut router = router(self.metasrv.clone());
134 if let Some(configurator) = self.metasrv.plugins().get::<ConfiguratorRef>() {
135 router = configurator.config_grpc(router);
136 }
137
138 let (serve_state_tx, serve_state_rx) = oneshot::channel();
139
140 let socket_addr =
141 bootstrap_metasrv_with_router(&self.opts.grpc.bind_addr, router, serve_state_tx, rx)
142 .await?;
143 self.bind_addr = Some(socket_addr);
144
145 *self.serve_state.lock().await = Some(serve_state_rx);
146 Ok(())
147 }
148
149 pub async fn shutdown(&self) -> Result<()> {
150 if let Some(mut rx) = self.serve_state.lock().await.take()
151 && let Ok(Err(err)) = rx.try_recv()
152 {
153 common_telemetry::error!(err; "Metasrv start failed")
154 }
155 if let Some(signal) = &self.signal_sender {
156 signal
157 .send(())
158 .await
159 .context(error::SendShutdownSignalSnafu)?;
160 }
161 self.metasrv.shutdown().await?;
162
163 if let Some(http_server) = self.http_server.as_ref().right() {
164 http_server
165 .shutdown()
166 .await
167 .context(error::ShutdownServerSnafu {
168 server: http_server.name(),
169 })?;
170 }
171 Ok(())
172 }
173
174 pub fn plugins(&self) -> Plugins {
175 self.plugins.clone()
176 }
177
178 pub fn get_inner(&self) -> &Metasrv {
179 &self.metasrv
180 }
181 pub fn bind_addr(&self) -> &Option<SocketAddr> {
182 &self.bind_addr
183 }
184
185 pub fn mut_http_server(&mut self) -> &mut Either<Option<HttpServerBuilder>, HttpServer> {
186 &mut self.http_server
187 }
188
189 pub fn http_server(&self) -> Option<&HttpServer> {
190 self.http_server.as_ref().right()
191 }
192}
193
194pub async fn bootstrap_metasrv_with_router(
195 bind_addr: &str,
196 router: Router,
197 serve_state_tx: oneshot::Sender<Result<()>>,
198 mut shutdown_rx: Receiver<()>,
199) -> Result<SocketAddr> {
200 let listener = TcpListener::bind(bind_addr)
201 .await
202 .context(error::TcpBindSnafu { addr: bind_addr })?;
203
204 let real_bind_addr = listener
205 .local_addr()
206 .context(error::TcpBindSnafu { addr: bind_addr })?;
207
208 info!("gRPC server is bound to: {}", real_bind_addr);
209
210 let incoming = TcpIncoming::from(listener).with_nodelay(Some(true));
211
212 let _handle = common_runtime::spawn_global(async move {
213 let result = router
214 .serve_with_incoming_shutdown(incoming, async {
215 let _ = shutdown_rx.recv().await;
216 })
217 .await
218 .inspect_err(|err| common_telemetry::error!(err;"Failed to start metasrv"))
219 .context(error::StartGrpcSnafu);
220 let _ = serve_state_tx.send(result);
221 });
222
223 Ok(real_bind_addr)
224}
225
226#[macro_export]
227macro_rules! add_compressed_service {
228 ($builder:expr, $server:expr) => {
229 $builder.add_service(
230 $server
231 .accept_compressed(CompressionEncoding::Gzip)
232 .accept_compressed(CompressionEncoding::Zstd)
233 .send_compressed(CompressionEncoding::Gzip)
234 .send_compressed(CompressionEncoding::Zstd),
235 )
236 };
237}
238
239pub fn router(metasrv: Arc<Metasrv>) -> Router {
240 let mut router = tonic::transport::Server::builder().accept_http1(true); let router = add_compressed_service!(router, HeartbeatServer::from_arc(metasrv.clone()));
242 let router = add_compressed_service!(router, StoreServer::from_arc(metasrv.clone()));
243 let router = add_compressed_service!(router, ClusterServer::from_arc(metasrv.clone()));
244 let router = add_compressed_service!(router, ProcedureServiceServer::from_arc(metasrv.clone()));
245 router.add_service(admin::make_admin_service(metasrv))
246}
247
248pub async fn metasrv_builder(
249 opts: &MetasrvOptions,
250 plugins: Plugins,
251 kv_backend: Option<KvBackendRef>,
252) -> Result<MetasrvBuilder> {
253 let (mut kv_backend, election) = match (kv_backend, &opts.backend) {
254 (Some(kv_backend), _) => (kv_backend, None),
255 (None, BackendImpl::MemoryStore) => (Arc::new(MemoryKvBackend::new()) as _, None),
256 (None, BackendImpl::EtcdStore) => {
257 let etcd_client =
258 create_etcd_client_with_tls(&opts.store_addrs, opts.backend_tls.as_ref()).await?;
259 let kv_backend = EtcdStore::with_etcd_client(etcd_client.clone(), opts.max_txn_ops);
260 let election = EtcdElection::with_etcd_client(
261 &opts.grpc.server_addr,
262 etcd_client,
263 opts.store_key_prefix.clone(),
264 )
265 .await?;
266
267 (kv_backend, Some(election))
268 }
269 #[cfg(feature = "pg_kvbackend")]
270 (None, BackendImpl::PostgresStore) => {
271 use std::time::Duration;
272
273 use common_meta::distributed_time_constants::POSTGRES_KEEP_ALIVE_SECS;
274 use common_meta::kv_backend::rds::PgStore;
275 use deadpool_postgres::Config;
276
277 use crate::election::rds::postgres::{ElectionPgClient, PgElection};
278 use crate::utils::postgres::create_postgres_pool;
279
280 let candidate_lease_ttl = Duration::from_secs(CANDIDATE_LEASE_SECS);
281 let execution_timeout = Duration::from_secs(META_LEASE_SECS);
282 let statement_timeout = Duration::from_secs(META_LEASE_SECS);
283 let idle_session_timeout = Duration::from_secs(META_LEASE_SECS);
284 let meta_lease_ttl = Duration::from_secs(META_LEASE_SECS);
285
286 let mut cfg = Config::new();
287 cfg.keepalives = Some(true);
288 cfg.keepalives_idle = Some(Duration::from_secs(POSTGRES_KEEP_ALIVE_SECS));
289 let pool = create_postgres_pool(&opts.store_addrs, Some(cfg), opts.backend_tls.clone())
291 .await?;
292
293 let election_client = ElectionPgClient::new(
294 pool,
295 execution_timeout,
296 idle_session_timeout,
297 statement_timeout,
298 )?;
299 let election = PgElection::with_pg_client(
300 opts.grpc.server_addr.clone(),
301 election_client,
302 opts.store_key_prefix.clone(),
303 candidate_lease_ttl,
304 meta_lease_ttl,
305 opts.meta_schema_name.as_deref(),
306 &opts.meta_table_name,
307 opts.meta_election_lock_id,
308 )
309 .await?;
310
311 let pool =
312 create_postgres_pool(&opts.store_addrs, None, opts.backend_tls.clone()).await?;
313 let kv_backend = PgStore::with_pg_pool(
314 pool,
315 opts.meta_schema_name.as_deref(),
316 &opts.meta_table_name,
317 opts.max_txn_ops,
318 )
319 .await
320 .context(error::KvBackendSnafu)?;
321
322 (kv_backend, Some(election))
323 }
324 #[cfg(feature = "mysql_kvbackend")]
325 (None, BackendImpl::MysqlStore) => {
326 use std::time::Duration;
327
328 use common_meta::kv_backend::rds::MySqlStore;
329
330 use crate::election::rds::mysql::{ElectionMysqlClient, MySqlElection};
331 use crate::utils::mysql::create_mysql_pool;
332
333 let pool = create_mysql_pool(&opts.store_addrs, opts.backend_tls.as_ref()).await?;
334 let kv_backend =
335 MySqlStore::with_mysql_pool(pool, &opts.meta_table_name, opts.max_txn_ops)
336 .await
337 .context(error::KvBackendSnafu)?;
338 let election_table_name = opts.meta_table_name.clone() + "_election";
340 let pool = create_mysql_pool(&opts.store_addrs, opts.backend_tls.as_ref()).await?;
342 let execution_timeout = Duration::from_secs(META_LEASE_SECS);
343 let statement_timeout = Duration::from_secs(META_LEASE_SECS);
344 let idle_session_timeout = Duration::from_secs(META_LEASE_SECS);
345 let innode_lock_wait_timeout = Duration::from_secs(META_LEASE_SECS / 2);
346 let meta_lease_ttl = Duration::from_secs(META_LEASE_SECS);
347 let candidate_lease_ttl = Duration::from_secs(CANDIDATE_LEASE_SECS);
348
349 let election_client = ElectionMysqlClient::new(
350 pool,
351 execution_timeout,
352 statement_timeout,
353 innode_lock_wait_timeout,
354 idle_session_timeout,
355 &election_table_name,
356 );
357 let election = MySqlElection::with_mysql_client(
358 opts.grpc.server_addr.clone(),
359 election_client,
360 opts.store_key_prefix.clone(),
361 candidate_lease_ttl,
362 meta_lease_ttl,
363 &election_table_name,
364 )
365 .await?;
366 (kv_backend, Some(election))
367 }
368 };
369
370 if !opts.store_key_prefix.is_empty() {
371 info!(
372 "using chroot kv backend with prefix: {prefix}",
373 prefix = opts.store_key_prefix
374 );
375 kv_backend = Arc::new(ChrootKvBackend::new(
376 opts.store_key_prefix.clone().into_bytes(),
377 kv_backend,
378 ))
379 }
380
381 let in_memory = Arc::new(MemoryKvBackend::new()) as ResettableKvBackendRef;
382 let meta_peer_client = build_default_meta_peer_client(&election, &in_memory);
383
384 let selector = if let Some(selector) = plugins.get::<SelectorRef>() {
385 info!("Using selector from plugins");
386 selector
387 } else {
388 let selector = match opts.selector {
389 SelectorType::LoadBased => Arc::new(LoadBasedSelector::new(
390 RegionNumsBasedWeightCompute,
391 meta_peer_client.clone(),
392 )) as SelectorRef,
393 SelectorType::LeaseBased => Arc::new(LeaseBasedSelector) as SelectorRef,
394 SelectorType::RoundRobin => {
395 Arc::new(RoundRobinSelector::new(SelectTarget::Datanode)) as SelectorRef
396 }
397 };
398 info!(
399 "Using selector from options, selector type: {}",
400 opts.selector.as_ref()
401 );
402 selector
403 };
404
405 Ok(MetasrvBuilder::new()
406 .options(opts.clone())
407 .kv_backend(kv_backend)
408 .in_memory(in_memory)
409 .selector(selector)
410 .election(election)
411 .meta_peer_client(meta_peer_client)
412 .plugins(plugins))
413}
414
415pub(crate) fn build_default_meta_peer_client(
416 election: &Option<ElectionRef>,
417 in_memory: &ResettableKvBackendRef,
418) -> MetaPeerClientRef {
419 MetaPeerClientBuilder::default()
420 .election(election.clone())
421 .in_memory(in_memory.clone())
422 .build()
423 .map(Arc::new)
424 .unwrap()
426}