meta_srv/
bootstrap.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::net::SocketAddr;
16use std::sync::Arc;
17use std::time::Duration;
18
19use api::v1::meta::cluster_server::ClusterServer;
20use api::v1::meta::heartbeat_server::HeartbeatServer;
21use api::v1::meta::procedure_service_server::ProcedureServiceServer;
22use api::v1::meta::store_server::StoreServer;
23use common_base::Plugins;
24use common_config::Configurable;
25#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
26use common_meta::distributed_time_constants::META_LEASE_SECS;
27use common_meta::kv_backend::chroot::ChrootKvBackend;
28use common_meta::kv_backend::etcd::EtcdStore;
29use common_meta::kv_backend::memory::MemoryKvBackend;
30use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
31use common_telemetry::info;
32use either::Either;
33use servers::configurator::GrpcRouterConfiguratorRef;
34use servers::http::{HttpServer, HttpServerBuilder};
35use servers::metrics_handler::MetricsHandler;
36use servers::server::Server;
37use snafu::ResultExt;
38use tokio::net::TcpListener;
39use tokio::sync::mpsc::{self, Receiver, Sender};
40use tokio::sync::{Mutex, oneshot};
41use tonic::codec::CompressionEncoding;
42use tonic::transport::server::{Router, TcpIncoming};
43
44use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
45#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
46use crate::election::CANDIDATE_LEASE_SECS;
47use crate::election::etcd::EtcdElection;
48use crate::error::OtherSnafu;
49use crate::metasrv::builder::MetasrvBuilder;
50use crate::metasrv::{
51    BackendImpl, ElectionRef, Metasrv, MetasrvOptions, SelectTarget, SelectorRef,
52};
53use crate::selector::lease_based::LeaseBasedSelector;
54use crate::selector::load_based::LoadBasedSelector;
55use crate::selector::round_robin::RoundRobinSelector;
56use crate::selector::weight_compute::RegionNumsBasedWeightCompute;
57use crate::selector::{Selector, SelectorType};
58use crate::service::admin;
59use crate::service::admin::admin_axum_router;
60use crate::utils::etcd::create_etcd_client_with_tls;
61use crate::{Result, error};
62
63/// The default keep-alive interval for gRPC.
64const DEFAULT_GRPC_KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(10);
65/// The default keep-alive timeout for gRPC.
66const DEFAULT_GRPC_KEEP_ALIVE_TIMEOUT: Duration = Duration::from_secs(10);
67
68pub struct MetasrvInstance {
69    metasrv: Arc<Metasrv>,
70
71    http_server: Either<Option<HttpServerBuilder>, HttpServer>,
72
73    opts: MetasrvOptions,
74
75    signal_sender: Option<Sender<()>>,
76
77    plugins: Plugins,
78
79    /// gRPC serving state receiver. Only present if the gRPC server is started.
80    serve_state: Arc<Mutex<Option<oneshot::Receiver<Result<()>>>>>,
81
82    /// gRPC bind addr
83    bind_addr: Option<SocketAddr>,
84}
85
86impl MetasrvInstance {
87    pub async fn new(metasrv: Metasrv) -> Result<MetasrvInstance> {
88        let opts = metasrv.options().clone();
89        let plugins = metasrv.plugins().clone();
90        let metasrv = Arc::new(metasrv);
91
92        // Wire up the admin_axum_router as an extra router
93        let extra_routers = admin_axum_router(metasrv.clone());
94
95        let mut builder = HttpServerBuilder::new(opts.http.clone())
96            .with_metrics_handler(MetricsHandler)
97            .with_greptime_config_options(opts.to_toml().context(error::TomlFormatSnafu)?);
98        builder = builder.with_extra_router(extra_routers);
99
100        // put metasrv into plugins for later use
101        plugins.insert::<Arc<Metasrv>>(metasrv.clone());
102        Ok(MetasrvInstance {
103            metasrv,
104            http_server: Either::Left(Some(builder)),
105            opts,
106            signal_sender: None,
107            plugins,
108            serve_state: Default::default(),
109            bind_addr: None,
110        })
111    }
112
113    pub async fn start(&mut self) -> Result<()> {
114        if let Some(builder) = self.http_server.as_mut().left()
115            && let Some(builder) = builder.take()
116        {
117            let mut server = builder.build();
118
119            let addr = self.opts.http.addr.parse().context(error::ParseAddrSnafu {
120                addr: &self.opts.http.addr,
121            })?;
122            info!("starting http server at {}", addr);
123            server.start(addr).await.context(error::StartHttpSnafu)?;
124
125            self.http_server = Either::Right(server);
126        } else {
127            // If the http server builder is not present, the Metasrv has to be called "start"
128            // already, regardless of the startup was successful or not. Return an `Ok` here for
129            // simplicity.
130            return Ok(());
131        };
132
133        self.metasrv.try_start().await?;
134
135        let (tx, rx) = mpsc::channel::<()>(1);
136
137        self.signal_sender = Some(tx);
138
139        // Start gRPC server with admin services for backward compatibility
140        let mut router = router(self.metasrv.clone());
141        if let Some(configurator) = self
142            .metasrv
143            .plugins()
144            .get::<GrpcRouterConfiguratorRef<()>>()
145        {
146            router = configurator
147                .configure_grpc_router(router, ())
148                .await
149                .context(OtherSnafu)?;
150        }
151
152        let (serve_state_tx, serve_state_rx) = oneshot::channel();
153
154        let socket_addr =
155            bootstrap_metasrv_with_router(&self.opts.grpc.bind_addr, router, serve_state_tx, rx)
156                .await?;
157        self.bind_addr = Some(socket_addr);
158
159        *self.serve_state.lock().await = Some(serve_state_rx);
160        Ok(())
161    }
162
163    pub async fn shutdown(&self) -> Result<()> {
164        if let Some(mut rx) = self.serve_state.lock().await.take()
165            && let Ok(Err(err)) = rx.try_recv()
166        {
167            common_telemetry::error!(err; "Metasrv start failed")
168        }
169        if let Some(signal) = &self.signal_sender {
170            signal
171                .send(())
172                .await
173                .context(error::SendShutdownSignalSnafu)?;
174        }
175        self.metasrv.shutdown().await?;
176
177        if let Some(http_server) = self.http_server.as_ref().right() {
178            http_server
179                .shutdown()
180                .await
181                .context(error::ShutdownServerSnafu {
182                    server: http_server.name(),
183                })?;
184        }
185        Ok(())
186    }
187
188    pub fn plugins(&self) -> Plugins {
189        self.plugins.clone()
190    }
191
192    pub fn get_inner(&self) -> &Metasrv {
193        &self.metasrv
194    }
195    pub fn bind_addr(&self) -> &Option<SocketAddr> {
196        &self.bind_addr
197    }
198
199    pub fn mut_http_server(&mut self) -> &mut Either<Option<HttpServerBuilder>, HttpServer> {
200        &mut self.http_server
201    }
202
203    pub fn http_server(&self) -> Option<&HttpServer> {
204        self.http_server.as_ref().right()
205    }
206}
207
208pub async fn bootstrap_metasrv_with_router(
209    bind_addr: &str,
210    router: Router,
211    serve_state_tx: oneshot::Sender<Result<()>>,
212    mut shutdown_rx: Receiver<()>,
213) -> Result<SocketAddr> {
214    let listener = TcpListener::bind(bind_addr)
215        .await
216        .context(error::TcpBindSnafu { addr: bind_addr })?;
217
218    let real_bind_addr = listener
219        .local_addr()
220        .context(error::TcpBindSnafu { addr: bind_addr })?;
221
222    info!("gRPC server is bound to: {}", real_bind_addr);
223
224    let incoming = TcpIncoming::from(listener).with_nodelay(Some(true));
225
226    let _handle = common_runtime::spawn_global(async move {
227        let result = router
228            .serve_with_incoming_shutdown(incoming, async {
229                let _ = shutdown_rx.recv().await;
230            })
231            .await
232            .inspect_err(|err| common_telemetry::error!(err;"Failed to start metasrv"))
233            .context(error::StartGrpcSnafu);
234        let _ = serve_state_tx.send(result);
235    });
236
237    Ok(real_bind_addr)
238}
239
240#[macro_export]
241macro_rules! add_compressed_service {
242    ($builder:expr, $server:expr) => {
243        $builder.add_service(
244            $server
245                .accept_compressed(CompressionEncoding::Gzip)
246                .accept_compressed(CompressionEncoding::Zstd)
247                .send_compressed(CompressionEncoding::Gzip)
248                .send_compressed(CompressionEncoding::Zstd),
249        )
250    };
251}
252
253pub fn router(metasrv: Arc<Metasrv>) -> Router {
254    let mut router = tonic::transport::Server::builder()
255        // for admin services
256        .accept_http1(true)
257        // For quick network failures detection.
258        .http2_keepalive_interval(Some(DEFAULT_GRPC_KEEP_ALIVE_INTERVAL))
259        .http2_keepalive_timeout(Some(DEFAULT_GRPC_KEEP_ALIVE_TIMEOUT));
260    let router = add_compressed_service!(router, HeartbeatServer::from_arc(metasrv.clone()));
261    let router = add_compressed_service!(router, StoreServer::from_arc(metasrv.clone()));
262    let router = add_compressed_service!(router, ClusterServer::from_arc(metasrv.clone()));
263    let router = add_compressed_service!(router, ProcedureServiceServer::from_arc(metasrv.clone()));
264    router.add_service(admin::make_admin_service(metasrv))
265}
266
267pub async fn metasrv_builder(
268    opts: &MetasrvOptions,
269    plugins: Plugins,
270    kv_backend: Option<KvBackendRef>,
271) -> Result<MetasrvBuilder> {
272    let (mut kv_backend, election) = match (kv_backend, &opts.backend) {
273        (Some(kv_backend), _) => (kv_backend, None),
274        (None, BackendImpl::MemoryStore) => (Arc::new(MemoryKvBackend::new()) as _, None),
275        (None, BackendImpl::EtcdStore) => {
276            let etcd_client =
277                create_etcd_client_with_tls(&opts.store_addrs, opts.backend_tls.as_ref()).await?;
278            let kv_backend = EtcdStore::with_etcd_client(etcd_client.clone(), opts.max_txn_ops);
279            let election = EtcdElection::with_etcd_client(
280                &opts.grpc.server_addr,
281                etcd_client,
282                opts.store_key_prefix.clone(),
283            )
284            .await?;
285
286            (kv_backend, Some(election))
287        }
288        #[cfg(feature = "pg_kvbackend")]
289        (None, BackendImpl::PostgresStore) => {
290            use std::time::Duration;
291
292            use common_meta::distributed_time_constants::POSTGRES_KEEP_ALIVE_SECS;
293            use common_meta::kv_backend::rds::PgStore;
294            use deadpool_postgres::Config;
295
296            use crate::election::rds::postgres::{ElectionPgClient, PgElection};
297            use crate::utils::postgres::create_postgres_pool;
298
299            let candidate_lease_ttl = Duration::from_secs(CANDIDATE_LEASE_SECS);
300            let execution_timeout = Duration::from_secs(META_LEASE_SECS);
301            let statement_timeout = Duration::from_secs(META_LEASE_SECS);
302            let idle_session_timeout = Duration::from_secs(META_LEASE_SECS);
303            let meta_lease_ttl = Duration::from_secs(META_LEASE_SECS);
304
305            let mut cfg = Config::new();
306            cfg.keepalives = Some(true);
307            cfg.keepalives_idle = Some(Duration::from_secs(POSTGRES_KEEP_ALIVE_SECS));
308            // We use a separate pool for election since we need a different session keep-alive idle time.
309            let pool = create_postgres_pool(&opts.store_addrs, Some(cfg), opts.backend_tls.clone())
310                .await?;
311
312            let election_client = ElectionPgClient::new(
313                pool,
314                execution_timeout,
315                idle_session_timeout,
316                statement_timeout,
317            )?;
318            let election = PgElection::with_pg_client(
319                opts.grpc.server_addr.clone(),
320                election_client,
321                opts.store_key_prefix.clone(),
322                candidate_lease_ttl,
323                meta_lease_ttl,
324                opts.meta_schema_name.as_deref(),
325                &opts.meta_table_name,
326                opts.meta_election_lock_id,
327            )
328            .await?;
329
330            let pool =
331                create_postgres_pool(&opts.store_addrs, None, opts.backend_tls.clone()).await?;
332            let kv_backend = PgStore::with_pg_pool(
333                pool,
334                opts.meta_schema_name.as_deref(),
335                &opts.meta_table_name,
336                opts.max_txn_ops,
337            )
338            .await
339            .context(error::KvBackendSnafu)?;
340
341            (kv_backend, Some(election))
342        }
343        #[cfg(feature = "mysql_kvbackend")]
344        (None, BackendImpl::MysqlStore) => {
345            use std::time::Duration;
346
347            use common_meta::kv_backend::rds::MySqlStore;
348
349            use crate::election::rds::mysql::{ElectionMysqlClient, MySqlElection};
350            use crate::utils::mysql::create_mysql_pool;
351
352            let pool = create_mysql_pool(&opts.store_addrs, opts.backend_tls.as_ref()).await?;
353            let kv_backend =
354                MySqlStore::with_mysql_pool(pool, &opts.meta_table_name, opts.max_txn_ops)
355                    .await
356                    .context(error::KvBackendSnafu)?;
357            // Since election will acquire a lock of the table, we need a separate table for election.
358            let election_table_name = opts.meta_table_name.clone() + "_election";
359            // We use a separate pool for election since we need a different session keep-alive idle time.
360            let pool = create_mysql_pool(&opts.store_addrs, opts.backend_tls.as_ref()).await?;
361            let execution_timeout = Duration::from_secs(META_LEASE_SECS);
362            let statement_timeout = Duration::from_secs(META_LEASE_SECS);
363            let idle_session_timeout = Duration::from_secs(META_LEASE_SECS);
364            let innode_lock_wait_timeout = Duration::from_secs(META_LEASE_SECS / 2);
365            let meta_lease_ttl = Duration::from_secs(META_LEASE_SECS);
366            let candidate_lease_ttl = Duration::from_secs(CANDIDATE_LEASE_SECS);
367
368            let election_client = ElectionMysqlClient::new(
369                pool,
370                execution_timeout,
371                statement_timeout,
372                innode_lock_wait_timeout,
373                idle_session_timeout,
374                &election_table_name,
375            );
376            let election = MySqlElection::with_mysql_client(
377                opts.grpc.server_addr.clone(),
378                election_client,
379                opts.store_key_prefix.clone(),
380                candidate_lease_ttl,
381                meta_lease_ttl,
382                &election_table_name,
383            )
384            .await?;
385            (kv_backend, Some(election))
386        }
387    };
388
389    if !opts.store_key_prefix.is_empty() {
390        info!(
391            "using chroot kv backend with prefix: {prefix}",
392            prefix = opts.store_key_prefix
393        );
394        kv_backend = Arc::new(ChrootKvBackend::new(
395            opts.store_key_prefix.clone().into_bytes(),
396            kv_backend,
397        ))
398    }
399
400    let in_memory = Arc::new(MemoryKvBackend::new()) as ResettableKvBackendRef;
401    let meta_peer_client = build_default_meta_peer_client(&election, &in_memory);
402
403    let selector = if let Some(selector) = plugins.get::<SelectorRef>() {
404        info!("Using selector from plugins");
405        selector
406    } else {
407        let selector: Arc<
408            dyn Selector<
409                    Context = crate::metasrv::SelectorContext,
410                    Output = Vec<common_meta::peer::Peer>,
411                >,
412        > = match opts.selector {
413            SelectorType::LoadBased => Arc::new(LoadBasedSelector::new(
414                RegionNumsBasedWeightCompute,
415                meta_peer_client.clone(),
416            )) as SelectorRef,
417            SelectorType::LeaseBased => Arc::new(LeaseBasedSelector) as SelectorRef,
418            SelectorType::RoundRobin => {
419                Arc::new(RoundRobinSelector::new(SelectTarget::Datanode)) as SelectorRef
420            }
421        };
422        info!(
423            "Using selector from options, selector type: {}",
424            opts.selector.as_ref()
425        );
426        selector
427    };
428
429    Ok(MetasrvBuilder::new()
430        .options(opts.clone())
431        .kv_backend(kv_backend)
432        .in_memory(in_memory)
433        .selector(selector)
434        .election(election)
435        .meta_peer_client(meta_peer_client)
436        .plugins(plugins))
437}
438
439pub(crate) fn build_default_meta_peer_client(
440    election: &Option<ElectionRef>,
441    in_memory: &ResettableKvBackendRef,
442) -> MetaPeerClientRef {
443    MetaPeerClientBuilder::default()
444        .election(election.clone())
445        .in_memory(in_memory.clone())
446        .build()
447        .map(Arc::new)
448        // Safety: all required fields set at initialization
449        .unwrap()
450}