1use std::net::SocketAddr;
16use std::sync::Arc;
17
18use api::v1::meta::cluster_server::ClusterServer;
19use api::v1::meta::heartbeat_server::HeartbeatServer;
20use api::v1::meta::procedure_service_server::ProcedureServiceServer;
21use api::v1::meta::store_server::StoreServer;
22use common_base::Plugins;
23use common_config::Configurable;
24#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
25use common_meta::distributed_time_constants::META_LEASE_SECS;
26use common_meta::kv_backend::chroot::ChrootKvBackend;
27use common_meta::kv_backend::etcd::EtcdStore;
28use common_meta::kv_backend::memory::MemoryKvBackend;
29use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
30use common_telemetry::info;
31use either::Either;
32use servers::configurator::ConfiguratorRef;
33use servers::export_metrics::ExportMetricsTask;
34use servers::http::{HttpServer, HttpServerBuilder};
35use servers::metrics_handler::MetricsHandler;
36use servers::server::Server;
37use snafu::ResultExt;
38use tokio::net::TcpListener;
39use tokio::sync::mpsc::{self, Receiver, Sender};
40use tokio::sync::{Mutex, oneshot};
41use tonic::codec::CompressionEncoding;
42use tonic::transport::server::{Router, TcpIncoming};
43
44use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
45#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
46use crate::election::CANDIDATE_LEASE_SECS;
47use crate::election::etcd::EtcdElection;
48use crate::metasrv::builder::MetasrvBuilder;
49use crate::metasrv::{
50 BackendImpl, ElectionRef, Metasrv, MetasrvOptions, SelectTarget, SelectorRef,
51};
52use crate::selector::SelectorType;
53use crate::selector::lease_based::LeaseBasedSelector;
54use crate::selector::load_based::LoadBasedSelector;
55use crate::selector::round_robin::RoundRobinSelector;
56use crate::selector::weight_compute::RegionNumsBasedWeightCompute;
57use crate::service::admin;
58use crate::service::admin::admin_axum_router;
59use crate::utils::etcd::create_etcd_client_with_tls;
60use crate::{Result, error};
61
62pub struct MetasrvInstance {
63 metasrv: Arc<Metasrv>,
64
65 http_server: Either<Option<HttpServerBuilder>, HttpServer>,
66
67 opts: MetasrvOptions,
68
69 signal_sender: Option<Sender<()>>,
70
71 plugins: Plugins,
72
73 export_metrics_task: Option<ExportMetricsTask>,
74
75 serve_state: Arc<Mutex<Option<oneshot::Receiver<Result<()>>>>>,
77
78 bind_addr: Option<SocketAddr>,
80}
81
82impl MetasrvInstance {
83 pub async fn new(metasrv: Metasrv) -> Result<MetasrvInstance> {
84 let opts = metasrv.options().clone();
85 let plugins = metasrv.plugins().clone();
86 let metasrv = Arc::new(metasrv);
87
88 let extra_routers = admin_axum_router(metasrv.clone());
90
91 let mut builder = HttpServerBuilder::new(opts.http.clone())
92 .with_metrics_handler(MetricsHandler)
93 .with_greptime_config_options(opts.to_toml().context(error::TomlFormatSnafu)?);
94 builder = builder.with_extra_router(extra_routers);
95
96 plugins.insert::<Arc<Metasrv>>(metasrv.clone());
98 let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
99 .context(error::InitExportMetricsTaskSnafu)?;
100 Ok(MetasrvInstance {
101 metasrv,
102 http_server: Either::Left(Some(builder)),
103 opts,
104 signal_sender: None,
105 plugins,
106 export_metrics_task,
107 serve_state: Default::default(),
108 bind_addr: None,
109 })
110 }
111
112 pub async fn start(&mut self) -> Result<()> {
113 if let Some(builder) = self.http_server.as_mut().left()
114 && let Some(builder) = builder.take()
115 {
116 let mut server = builder.build();
117
118 let addr = self.opts.http.addr.parse().context(error::ParseAddrSnafu {
119 addr: &self.opts.http.addr,
120 })?;
121 info!("starting http server at {}", addr);
122 server.start(addr).await.context(error::StartHttpSnafu)?;
123
124 self.http_server = Either::Right(server);
125 } else {
126 return Ok(());
130 };
131
132 self.metasrv.try_start().await?;
133
134 if let Some(t) = self.export_metrics_task.as_ref() {
135 t.start(None).context(error::InitExportMetricsTaskSnafu)?
136 }
137
138 let (tx, rx) = mpsc::channel::<()>(1);
139
140 self.signal_sender = Some(tx);
141
142 let mut router = router(self.metasrv.clone());
144 if let Some(configurator) = self.metasrv.plugins().get::<ConfiguratorRef>() {
145 router = configurator.config_grpc(router);
146 }
147
148 let (serve_state_tx, serve_state_rx) = oneshot::channel();
149
150 let socket_addr =
151 bootstrap_metasrv_with_router(&self.opts.grpc.bind_addr, router, serve_state_tx, rx)
152 .await?;
153 self.bind_addr = Some(socket_addr);
154
155 *self.serve_state.lock().await = Some(serve_state_rx);
156 Ok(())
157 }
158
159 pub async fn shutdown(&self) -> Result<()> {
160 if let Some(mut rx) = self.serve_state.lock().await.take()
161 && let Ok(Err(err)) = rx.try_recv()
162 {
163 common_telemetry::error!(err; "Metasrv start failed")
164 }
165 if let Some(signal) = &self.signal_sender {
166 signal
167 .send(())
168 .await
169 .context(error::SendShutdownSignalSnafu)?;
170 }
171 self.metasrv.shutdown().await?;
172
173 if let Some(http_server) = self.http_server.as_ref().right() {
174 http_server
175 .shutdown()
176 .await
177 .context(error::ShutdownServerSnafu {
178 server: http_server.name(),
179 })?;
180 }
181 Ok(())
182 }
183
184 pub fn plugins(&self) -> Plugins {
185 self.plugins.clone()
186 }
187
188 pub fn get_inner(&self) -> &Metasrv {
189 &self.metasrv
190 }
191 pub fn bind_addr(&self) -> &Option<SocketAddr> {
192 &self.bind_addr
193 }
194
195 pub fn mut_http_server(&mut self) -> &mut Either<Option<HttpServerBuilder>, HttpServer> {
196 &mut self.http_server
197 }
198
199 pub fn http_server(&self) -> Option<&HttpServer> {
200 self.http_server.as_ref().right()
201 }
202}
203
204pub async fn bootstrap_metasrv_with_router(
205 bind_addr: &str,
206 router: Router,
207 serve_state_tx: oneshot::Sender<Result<()>>,
208 mut shutdown_rx: Receiver<()>,
209) -> Result<SocketAddr> {
210 let listener = TcpListener::bind(bind_addr)
211 .await
212 .context(error::TcpBindSnafu { addr: bind_addr })?;
213
214 let real_bind_addr = listener
215 .local_addr()
216 .context(error::TcpBindSnafu { addr: bind_addr })?;
217
218 info!("gRPC server is bound to: {}", real_bind_addr);
219
220 let incoming = TcpIncoming::from(listener).with_nodelay(Some(true));
221
222 let _handle = common_runtime::spawn_global(async move {
223 let result = router
224 .serve_with_incoming_shutdown(incoming, async {
225 let _ = shutdown_rx.recv().await;
226 })
227 .await
228 .inspect_err(|err| common_telemetry::error!(err;"Failed to start metasrv"))
229 .context(error::StartGrpcSnafu);
230 let _ = serve_state_tx.send(result);
231 });
232
233 Ok(real_bind_addr)
234}
235
236#[macro_export]
237macro_rules! add_compressed_service {
238 ($builder:expr, $server:expr) => {
239 $builder.add_service(
240 $server
241 .accept_compressed(CompressionEncoding::Gzip)
242 .accept_compressed(CompressionEncoding::Zstd)
243 .send_compressed(CompressionEncoding::Gzip)
244 .send_compressed(CompressionEncoding::Zstd),
245 )
246 };
247}
248
249pub fn router(metasrv: Arc<Metasrv>) -> Router {
250 let mut router = tonic::transport::Server::builder().accept_http1(true); let router = add_compressed_service!(router, HeartbeatServer::from_arc(metasrv.clone()));
252 let router = add_compressed_service!(router, StoreServer::from_arc(metasrv.clone()));
253 let router = add_compressed_service!(router, ClusterServer::from_arc(metasrv.clone()));
254 let router = add_compressed_service!(router, ProcedureServiceServer::from_arc(metasrv.clone()));
255 router.add_service(admin::make_admin_service(metasrv))
256}
257
258pub async fn metasrv_builder(
259 opts: &MetasrvOptions,
260 plugins: Plugins,
261 kv_backend: Option<KvBackendRef>,
262) -> Result<MetasrvBuilder> {
263 let (mut kv_backend, election) = match (kv_backend, &opts.backend) {
264 (Some(kv_backend), _) => (kv_backend, None),
265 (None, BackendImpl::MemoryStore) => (Arc::new(MemoryKvBackend::new()) as _, None),
266 (None, BackendImpl::EtcdStore) => {
267 let etcd_client =
268 create_etcd_client_with_tls(&opts.store_addrs, opts.backend_tls.as_ref()).await?;
269 let kv_backend = EtcdStore::with_etcd_client(etcd_client.clone(), opts.max_txn_ops);
270 let election = EtcdElection::with_etcd_client(
271 &opts.grpc.server_addr,
272 etcd_client,
273 opts.store_key_prefix.clone(),
274 )
275 .await?;
276
277 (kv_backend, Some(election))
278 }
279 #[cfg(feature = "pg_kvbackend")]
280 (None, BackendImpl::PostgresStore) => {
281 use std::time::Duration;
282
283 use common_meta::distributed_time_constants::POSTGRES_KEEP_ALIVE_SECS;
284 use common_meta::kv_backend::rds::PgStore;
285 use deadpool_postgres::Config;
286
287 use crate::election::rds::postgres::{ElectionPgClient, PgElection};
288 use crate::utils::postgres::create_postgres_pool;
289
290 let candidate_lease_ttl = Duration::from_secs(CANDIDATE_LEASE_SECS);
291 let execution_timeout = Duration::from_secs(META_LEASE_SECS);
292 let statement_timeout = Duration::from_secs(META_LEASE_SECS);
293 let idle_session_timeout = Duration::from_secs(META_LEASE_SECS);
294 let meta_lease_ttl = Duration::from_secs(META_LEASE_SECS);
295
296 let mut cfg = Config::new();
297 cfg.keepalives = Some(true);
298 cfg.keepalives_idle = Some(Duration::from_secs(POSTGRES_KEEP_ALIVE_SECS));
299 let pool = create_postgres_pool(&opts.store_addrs, Some(cfg), opts.backend_tls.clone())
301 .await?;
302
303 let election_client = ElectionPgClient::new(
304 pool,
305 execution_timeout,
306 idle_session_timeout,
307 statement_timeout,
308 )?;
309 let election = PgElection::with_pg_client(
310 opts.grpc.server_addr.clone(),
311 election_client,
312 opts.store_key_prefix.clone(),
313 candidate_lease_ttl,
314 meta_lease_ttl,
315 opts.meta_schema_name.as_deref(),
316 &opts.meta_table_name,
317 opts.meta_election_lock_id,
318 )
319 .await?;
320
321 let pool =
322 create_postgres_pool(&opts.store_addrs, None, opts.backend_tls.clone()).await?;
323 let kv_backend = PgStore::with_pg_pool(
324 pool,
325 opts.meta_schema_name.as_deref(),
326 &opts.meta_table_name,
327 opts.max_txn_ops,
328 )
329 .await
330 .context(error::KvBackendSnafu)?;
331
332 (kv_backend, Some(election))
333 }
334 #[cfg(feature = "mysql_kvbackend")]
335 (None, BackendImpl::MysqlStore) => {
336 use std::time::Duration;
337
338 use common_meta::kv_backend::rds::MySqlStore;
339
340 use crate::election::rds::mysql::{ElectionMysqlClient, MySqlElection};
341 use crate::utils::mysql::create_mysql_pool;
342
343 let pool = create_mysql_pool(&opts.store_addrs, opts.backend_tls.as_ref()).await?;
344 let kv_backend =
345 MySqlStore::with_mysql_pool(pool, &opts.meta_table_name, opts.max_txn_ops)
346 .await
347 .context(error::KvBackendSnafu)?;
348 let election_table_name = opts.meta_table_name.clone() + "_election";
350 let pool = create_mysql_pool(&opts.store_addrs, opts.backend_tls.as_ref()).await?;
352 let execution_timeout = Duration::from_secs(META_LEASE_SECS);
353 let statement_timeout = Duration::from_secs(META_LEASE_SECS);
354 let idle_session_timeout = Duration::from_secs(META_LEASE_SECS);
355 let innode_lock_wait_timeout = Duration::from_secs(META_LEASE_SECS / 2);
356 let meta_lease_ttl = Duration::from_secs(META_LEASE_SECS);
357 let candidate_lease_ttl = Duration::from_secs(CANDIDATE_LEASE_SECS);
358
359 let election_client = ElectionMysqlClient::new(
360 pool,
361 execution_timeout,
362 statement_timeout,
363 innode_lock_wait_timeout,
364 idle_session_timeout,
365 &election_table_name,
366 );
367 let election = MySqlElection::with_mysql_client(
368 opts.grpc.server_addr.clone(),
369 election_client,
370 opts.store_key_prefix.clone(),
371 candidate_lease_ttl,
372 meta_lease_ttl,
373 &election_table_name,
374 )
375 .await?;
376 (kv_backend, Some(election))
377 }
378 };
379
380 if !opts.store_key_prefix.is_empty() {
381 info!(
382 "using chroot kv backend with prefix: {prefix}",
383 prefix = opts.store_key_prefix
384 );
385 kv_backend = Arc::new(ChrootKvBackend::new(
386 opts.store_key_prefix.clone().into_bytes(),
387 kv_backend,
388 ))
389 }
390
391 let in_memory = Arc::new(MemoryKvBackend::new()) as ResettableKvBackendRef;
392 let meta_peer_client = build_default_meta_peer_client(&election, &in_memory);
393
394 let selector = if let Some(selector) = plugins.get::<SelectorRef>() {
395 info!("Using selector from plugins");
396 selector
397 } else {
398 let selector = match opts.selector {
399 SelectorType::LoadBased => Arc::new(LoadBasedSelector::new(
400 RegionNumsBasedWeightCompute,
401 meta_peer_client.clone(),
402 )) as SelectorRef,
403 SelectorType::LeaseBased => Arc::new(LeaseBasedSelector) as SelectorRef,
404 SelectorType::RoundRobin => {
405 Arc::new(RoundRobinSelector::new(SelectTarget::Datanode)) as SelectorRef
406 }
407 };
408 info!(
409 "Using selector from options, selector type: {}",
410 opts.selector.as_ref()
411 );
412 selector
413 };
414
415 Ok(MetasrvBuilder::new()
416 .options(opts.clone())
417 .kv_backend(kv_backend)
418 .in_memory(in_memory)
419 .selector(selector)
420 .election(election)
421 .meta_peer_client(meta_peer_client)
422 .plugins(plugins))
423}
424
425pub(crate) fn build_default_meta_peer_client(
426 election: &Option<ElectionRef>,
427 in_memory: &ResettableKvBackendRef,
428) -> MetaPeerClientRef {
429 MetaPeerClientBuilder::default()
430 .election(election.clone())
431 .in_memory(in_memory.clone())
432 .build()
433 .map(Arc::new)
434 .unwrap()
436}