Skip to main content

meta_srv/
bootstrap.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::net::SocketAddr;
16use std::sync::Arc;
17
18use api::v1::meta::cluster_server::ClusterServer;
19use api::v1::meta::config_server::ConfigServer;
20use api::v1::meta::heartbeat_server::HeartbeatServer;
21use api::v1::meta::procedure_service_server::ProcedureServiceServer;
22use api::v1::meta::store_server::StoreServer;
23use common_base::Plugins;
24use common_config::Configurable;
25#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
26use common_meta::distributed_time_constants::META_LEASE_SECS;
27use common_meta::election::etcd::EtcdElection;
28use common_meta::kv_backend::chroot::ChrootKvBackend;
29use common_meta::kv_backend::etcd::EtcdStore;
30use common_meta::kv_backend::memory::MemoryKvBackend;
31use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
32use common_telemetry::info;
33use either::Either;
34use servers::configurator::GrpcRouterConfiguratorRef;
35use servers::http::{HttpServer, HttpServerBuilder};
36use servers::metrics_handler::MetricsHandler;
37use servers::server::Server;
38use snafu::ResultExt;
39use tokio::net::TcpListener;
40use tokio::sync::mpsc::{self, Receiver, Sender};
41use tokio::sync::{Mutex, oneshot};
42use tonic::codec::CompressionEncoding;
43use tonic::transport::server::{Router, TcpIncoming};
44
45use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
46use crate::error::OtherSnafu;
47use crate::metasrv::builder::MetasrvBuilder;
48use crate::metasrv::{
49    BackendImpl, ElectionRef, Metasrv, MetasrvOptions, SelectTarget, SelectorRef,
50};
51use crate::selector::lease_based::LeaseBasedSelector;
52use crate::selector::load_based::LoadBasedSelector;
53use crate::selector::round_robin::RoundRobinSelector;
54use crate::selector::weight_compute::RegionNumsBasedWeightCompute;
55use crate::selector::{Selector, SelectorType};
56use crate::service::admin;
57use crate::service::admin::admin_axum_router;
58use crate::utils::etcd::create_etcd_client_with_tls;
59use crate::{Result, error};
60
61pub struct MetasrvInstance {
62    metasrv: Arc<Metasrv>,
63
64    http_server: Either<Option<HttpServerBuilder>, HttpServer>,
65
66    opts: MetasrvOptions,
67
68    signal_sender: Option<Sender<()>>,
69
70    plugins: Plugins,
71
72    /// gRPC serving state receiver. Only present if the gRPC server is started.
73    serve_state: Arc<Mutex<Option<oneshot::Receiver<Result<()>>>>>,
74
75    /// gRPC bind addr
76    bind_addr: Option<SocketAddr>,
77}
78
79impl MetasrvInstance {
80    pub async fn new(metasrv: Metasrv) -> Result<MetasrvInstance> {
81        let opts = metasrv.options().clone();
82        let plugins = metasrv.plugins().clone();
83        let metasrv = Arc::new(metasrv);
84
85        // Wire up the admin_axum_router as an extra router
86        let extra_routers = admin_axum_router(metasrv.clone());
87
88        let mut builder = HttpServerBuilder::new(opts.http.clone())
89            .with_metrics_handler(MetricsHandler)
90            .with_greptime_config_options(opts.to_toml().context(error::TomlFormatSnafu)?);
91        builder = builder.with_extra_router(extra_routers);
92
93        // put metasrv into plugins for later use
94        plugins.insert::<Arc<Metasrv>>(metasrv.clone());
95        Ok(MetasrvInstance {
96            metasrv,
97            http_server: Either::Left(Some(builder)),
98            opts,
99            signal_sender: None,
100            plugins,
101            serve_state: Default::default(),
102            bind_addr: None,
103        })
104    }
105
106    pub async fn start(&mut self) -> Result<()> {
107        if let Some(builder) = self.http_server.as_mut().left()
108            && let Some(builder) = builder.take()
109        {
110            let mut server = builder.build();
111
112            let addr = self.opts.http.addr.parse().context(error::ParseAddrSnafu {
113                addr: &self.opts.http.addr,
114            })?;
115            info!("starting http server at {}", addr);
116            server.start(addr).await.context(error::StartHttpSnafu)?;
117
118            self.http_server = Either::Right(server);
119        } else {
120            // If the http server builder is not present, the Metasrv has to be called "start"
121            // already, regardless of the startup was successful or not. Return an `Ok` here for
122            // simplicity.
123            return Ok(());
124        };
125
126        self.metasrv.try_start().await?;
127
128        let (tx, rx) = mpsc::channel::<()>(1);
129
130        self.signal_sender = Some(tx);
131
132        // Start gRPC server with admin services for backward compatibility
133        let mut router = router(self.metasrv.clone());
134        if let Some(configurator) = self
135            .metasrv
136            .plugins()
137            .get::<GrpcRouterConfiguratorRef<()>>()
138        {
139            router = configurator
140                .configure_grpc_router(router, ())
141                .await
142                .context(OtherSnafu)?;
143        }
144
145        let (serve_state_tx, serve_state_rx) = oneshot::channel();
146
147        let socket_addr =
148            bootstrap_metasrv_with_router(&self.opts.grpc.bind_addr, router, serve_state_tx, rx)
149                .await?;
150        self.bind_addr = Some(socket_addr);
151
152        *self.serve_state.lock().await = Some(serve_state_rx);
153        Ok(())
154    }
155
156    pub async fn shutdown(&self) -> Result<()> {
157        if let Some(mut rx) = self.serve_state.lock().await.take()
158            && let Ok(Err(err)) = rx.try_recv()
159        {
160            common_telemetry::error!(err; "Metasrv start failed")
161        }
162        if let Some(signal) = &self.signal_sender {
163            signal
164                .send(())
165                .await
166                .context(error::SendShutdownSignalSnafu)?;
167        }
168        self.metasrv.shutdown().await?;
169
170        if let Some(http_server) = self.http_server.as_ref().right() {
171            http_server
172                .shutdown()
173                .await
174                .context(error::ShutdownServerSnafu {
175                    server: http_server.name(),
176                })?;
177        }
178        Ok(())
179    }
180
181    pub fn plugins(&self) -> Plugins {
182        self.plugins.clone()
183    }
184
185    pub fn get_inner(&self) -> &Metasrv {
186        &self.metasrv
187    }
188    pub fn bind_addr(&self) -> &Option<SocketAddr> {
189        &self.bind_addr
190    }
191
192    pub fn mut_http_server(&mut self) -> &mut Either<Option<HttpServerBuilder>, HttpServer> {
193        &mut self.http_server
194    }
195
196    pub fn http_server(&self) -> Option<&HttpServer> {
197        self.http_server.as_ref().right()
198    }
199}
200
201pub async fn bootstrap_metasrv_with_router(
202    bind_addr: &str,
203    router: Router,
204    serve_state_tx: oneshot::Sender<Result<()>>,
205    mut shutdown_rx: Receiver<()>,
206) -> Result<SocketAddr> {
207    let listener = TcpListener::bind(bind_addr)
208        .await
209        .context(error::TcpBindSnafu { addr: bind_addr })?;
210
211    let real_bind_addr = listener
212        .local_addr()
213        .context(error::TcpBindSnafu { addr: bind_addr })?;
214
215    info!("gRPC server is bound to: {}", real_bind_addr);
216
217    let incoming = TcpIncoming::from(listener).with_nodelay(Some(true));
218
219    let _handle = common_runtime::spawn_global(async move {
220        let result = router
221            .serve_with_incoming_shutdown(incoming, async {
222                let _ = shutdown_rx.recv().await;
223            })
224            .await
225            .inspect_err(|err| common_telemetry::error!(err;"Failed to start metasrv"))
226            .context(error::StartGrpcSnafu);
227        let _ = serve_state_tx.send(result);
228    });
229
230    Ok(real_bind_addr)
231}
232
233#[macro_export]
234macro_rules! add_compressed_service {
235    ($builder:expr, $server:expr) => {
236        $builder.add_service(
237            $server
238                .accept_compressed(CompressionEncoding::Gzip)
239                .accept_compressed(CompressionEncoding::Zstd)
240                .send_compressed(CompressionEncoding::Gzip)
241                .send_compressed(CompressionEncoding::Zstd),
242        )
243    };
244}
245
246pub fn router(metasrv: Arc<Metasrv>) -> Router {
247    let mut router = tonic::transport::Server::builder()
248        // for admin services
249        .accept_http1(true)
250        // For quick network failures detection.
251        .http2_keepalive_interval(Some(metasrv.options().grpc.http2_keep_alive_interval))
252        .http2_keepalive_timeout(Some(metasrv.options().grpc.http2_keep_alive_timeout));
253    let router = add_compressed_service!(router, HeartbeatServer::from_arc(metasrv.clone()));
254    let router = add_compressed_service!(router, StoreServer::from_arc(metasrv.clone()));
255    let router = add_compressed_service!(router, ClusterServer::from_arc(metasrv.clone()));
256    let router = add_compressed_service!(router, ProcedureServiceServer::from_arc(metasrv.clone()));
257    let router = add_compressed_service!(router, ConfigServer::from_arc(metasrv.clone()));
258    router.add_service(admin::make_admin_service(metasrv))
259}
260
261pub async fn metasrv_builder(
262    opts: &MetasrvOptions,
263    plugins: Plugins,
264    kv_backend: Option<KvBackendRef>,
265) -> Result<MetasrvBuilder> {
266    let (mut kv_backend, election) = match (kv_backend, &opts.backend) {
267        (Some(kv_backend), _) => (kv_backend, None),
268        (None, BackendImpl::MemoryStore) => (Arc::new(MemoryKvBackend::new()) as _, None),
269        (None, BackendImpl::EtcdStore) => {
270            let etcd_client = create_etcd_client_with_tls(
271                &opts.store_addrs,
272                &opts.backend_client,
273                opts.backend_tls.as_ref(),
274            )
275            .await?;
276            let kv_backend = EtcdStore::with_etcd_client(etcd_client.clone(), opts.max_txn_ops);
277            let election = EtcdElection::with_etcd_client(
278                &opts.grpc.server_addr,
279                etcd_client,
280                opts.store_key_prefix.clone(),
281            )
282            .await
283            .context(error::KvBackendSnafu)?;
284
285            (kv_backend, Some(election))
286        }
287        #[cfg(feature = "pg_kvbackend")]
288        (None, BackendImpl::PostgresStore) => {
289            use std::time::Duration;
290
291            use common_meta::distributed_time_constants::POSTGRES_KEEP_ALIVE_SECS;
292            use common_meta::election::CANDIDATE_LEASE_SECS;
293            use common_meta::election::rds::postgres::{ElectionPgClient, PgElection};
294            use common_meta::kv_backend::rds::PgStore;
295            use deadpool_postgres::{Config, ManagerConfig, RecyclingMethod};
296
297            use crate::utils::postgres::create_postgres_pool;
298
299            let candidate_lease_ttl = Duration::from_secs(CANDIDATE_LEASE_SECS);
300            let execution_timeout = Duration::from_secs(META_LEASE_SECS);
301            let statement_timeout = Duration::from_secs(META_LEASE_SECS);
302            let idle_session_timeout = Duration::from_secs(META_LEASE_SECS);
303            let meta_lease_ttl = Duration::from_secs(META_LEASE_SECS);
304
305            let mut cfg = Config::new();
306            cfg.keepalives = Some(true);
307            cfg.keepalives_idle = Some(Duration::from_secs(POSTGRES_KEEP_ALIVE_SECS));
308            cfg.manager = Some(ManagerConfig {
309                recycling_method: RecyclingMethod::Verified,
310            });
311            // Use a dedicated pool for the election client to allow customized session settings.
312            let pool = create_postgres_pool(
313                &opts.store_addrs,
314                Some(cfg.clone()),
315                opts.backend_tls.clone(),
316            )
317            .await?;
318
319            let election_client = ElectionPgClient::new(
320                pool,
321                execution_timeout,
322                idle_session_timeout,
323                statement_timeout,
324            )
325            .context(error::KvBackendSnafu)?;
326            let election = PgElection::with_pg_client(
327                opts.grpc.server_addr.clone(),
328                election_client,
329                opts.store_key_prefix.clone(),
330                candidate_lease_ttl,
331                meta_lease_ttl,
332                opts.meta_schema_name.as_deref(),
333                &opts.meta_table_name,
334                opts.meta_election_lock_id,
335            )
336            .await
337            .context(error::KvBackendSnafu)?;
338
339            let pool = create_postgres_pool(&opts.store_addrs, Some(cfg), opts.backend_tls.clone())
340                .await?;
341            let kv_backend = PgStore::with_pg_pool(
342                pool,
343                opts.meta_schema_name.as_deref(),
344                &opts.meta_table_name,
345                opts.max_txn_ops,
346                opts.auto_create_schema,
347            )
348            .await
349            .context(error::KvBackendSnafu)?;
350
351            (kv_backend, Some(election))
352        }
353        #[cfg(feature = "mysql_kvbackend")]
354        (None, BackendImpl::MysqlStore) => {
355            use std::time::Duration;
356
357            use common_meta::election::CANDIDATE_LEASE_SECS;
358            use common_meta::election::rds::mysql::{ElectionMysqlClient, MySqlElection};
359            use common_meta::kv_backend::rds::MySqlStore;
360
361            use crate::utils::mysql::create_mysql_pool;
362
363            let pool = create_mysql_pool(&opts.store_addrs, opts.backend_tls.as_ref()).await?;
364            let kv_backend =
365                MySqlStore::with_mysql_pool(pool, &opts.meta_table_name, opts.max_txn_ops)
366                    .await
367                    .context(error::KvBackendSnafu)?;
368            // Since election will acquire a lock of the table, we need a separate table for election.
369            let election_table_name = opts.meta_table_name.clone() + "_election";
370            // We use a separate pool for election since we need a different session keep-alive idle time.
371            let pool = create_mysql_pool(&opts.store_addrs, opts.backend_tls.as_ref()).await?;
372            let execution_timeout = Duration::from_secs(META_LEASE_SECS);
373            let statement_timeout = Duration::from_secs(META_LEASE_SECS);
374            let idle_session_timeout = Duration::from_secs(META_LEASE_SECS);
375            let innode_lock_wait_timeout = Duration::from_secs(META_LEASE_SECS / 2);
376            let meta_lease_ttl = Duration::from_secs(META_LEASE_SECS);
377            let candidate_lease_ttl = Duration::from_secs(CANDIDATE_LEASE_SECS);
378
379            let election_client = ElectionMysqlClient::new(
380                pool,
381                execution_timeout,
382                statement_timeout,
383                innode_lock_wait_timeout,
384                idle_session_timeout,
385                &election_table_name,
386            );
387            let election = MySqlElection::with_mysql_client(
388                opts.grpc.server_addr.clone(),
389                election_client,
390                opts.store_key_prefix.clone(),
391                candidate_lease_ttl,
392                meta_lease_ttl,
393                &election_table_name,
394            )
395            .await
396            .context(error::KvBackendSnafu)?;
397            (kv_backend, Some(election))
398        }
399    };
400
401    if !opts.store_key_prefix.is_empty() {
402        info!(
403            "using chroot kv backend with prefix: {prefix}",
404            prefix = opts.store_key_prefix
405        );
406        kv_backend = Arc::new(ChrootKvBackend::new(
407            opts.store_key_prefix.clone().into_bytes(),
408            kv_backend,
409        ))
410    }
411
412    let in_memory = Arc::new(MemoryKvBackend::new()) as ResettableKvBackendRef;
413    let meta_peer_client = build_default_meta_peer_client(&election, &in_memory);
414
415    let selector = if let Some(selector) = plugins.get::<SelectorRef>() {
416        info!("Using selector from plugins");
417        selector
418    } else {
419        let selector: Arc<
420            dyn Selector<
421                    Context = crate::metasrv::SelectorContext,
422                    Output = Vec<common_meta::peer::Peer>,
423                >,
424        > = match opts.selector {
425            SelectorType::LoadBased => Arc::new(LoadBasedSelector::new(
426                RegionNumsBasedWeightCompute,
427                meta_peer_client.clone(),
428            )) as SelectorRef,
429            SelectorType::LeaseBased => Arc::new(LeaseBasedSelector) as SelectorRef,
430            SelectorType::RoundRobin => {
431                Arc::new(RoundRobinSelector::new(SelectTarget::Datanode)) as SelectorRef
432            }
433        };
434        info!(
435            "Using selector from options, selector type: {}",
436            opts.selector.as_ref()
437        );
438        selector
439    };
440
441    Ok(MetasrvBuilder::new()
442        .options(opts.clone())
443        .kv_backend(kv_backend)
444        .in_memory(in_memory)
445        .selector(selector)
446        .election(election)
447        .meta_peer_client(meta_peer_client)
448        .plugins(plugins))
449}
450
451pub(crate) fn build_default_meta_peer_client(
452    election: &Option<ElectionRef>,
453    in_memory: &ResettableKvBackendRef,
454) -> MetaPeerClientRef {
455    MetaPeerClientBuilder::default()
456        .election(election.clone())
457        .in_memory(in_memory.clone())
458        .build()
459        .map(Arc::new)
460        // Safety: all required fields set at initialization
461        .unwrap()
462}