meta_srv/
bootstrap.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::net::SocketAddr;
16use std::sync::Arc;
17
18use api::v1::meta::cluster_server::ClusterServer;
19use api::v1::meta::heartbeat_server::HeartbeatServer;
20use api::v1::meta::procedure_service_server::ProcedureServiceServer;
21use api::v1::meta::store_server::StoreServer;
22use common_base::Plugins;
23use common_config::Configurable;
24#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
25use common_meta::distributed_time_constants::META_LEASE_SECS;
26use common_meta::kv_backend::chroot::ChrootKvBackend;
27use common_meta::kv_backend::etcd::EtcdStore;
28use common_meta::kv_backend::memory::MemoryKvBackend;
29#[cfg(feature = "mysql_kvbackend")]
30use common_meta::kv_backend::rds::MySqlStore;
31#[cfg(feature = "pg_kvbackend")]
32use common_meta::kv_backend::rds::PgStore;
33use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
34use common_telemetry::info;
35#[cfg(feature = "pg_kvbackend")]
36use deadpool_postgres::{Config, Runtime};
37use etcd_client::Client;
38use servers::configurator::ConfiguratorRef;
39use servers::export_metrics::ExportMetricsTask;
40use servers::http::{HttpServer, HttpServerBuilder};
41use servers::metrics_handler::MetricsHandler;
42use servers::server::Server;
43#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
44use snafu::OptionExt;
45use snafu::ResultExt;
46#[cfg(feature = "mysql_kvbackend")]
47use sqlx::mysql::MySqlConnectOptions;
48#[cfg(feature = "mysql_kvbackend")]
49use sqlx::mysql::MySqlPool;
50use tokio::net::TcpListener;
51use tokio::sync::mpsc::{self, Receiver, Sender};
52use tokio::sync::{oneshot, Mutex};
53#[cfg(feature = "pg_kvbackend")]
54use tokio_postgres::NoTls;
55use tonic::codec::CompressionEncoding;
56use tonic::transport::server::{Router, TcpIncoming};
57
58use crate::election::etcd::EtcdElection;
59#[cfg(feature = "mysql_kvbackend")]
60use crate::election::rds::mysql::MySqlElection;
61#[cfg(feature = "pg_kvbackend")]
62use crate::election::rds::postgres::PgElection;
63#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
64use crate::election::CANDIDATE_LEASE_SECS;
65use crate::metasrv::builder::MetasrvBuilder;
66use crate::metasrv::{BackendImpl, Metasrv, MetasrvOptions, SelectTarget, SelectorRef};
67use crate::node_excluder::NodeExcluderRef;
68use crate::selector::lease_based::LeaseBasedSelector;
69use crate::selector::load_based::LoadBasedSelector;
70use crate::selector::round_robin::RoundRobinSelector;
71use crate::selector::weight_compute::RegionNumsBasedWeightCompute;
72use crate::selector::SelectorType;
73use crate::service::admin;
74use crate::{error, Result};
75
76pub struct MetasrvInstance {
77    metasrv: Arc<Metasrv>,
78
79    http_server: HttpServer,
80
81    opts: MetasrvOptions,
82
83    signal_sender: Option<Sender<()>>,
84
85    plugins: Plugins,
86
87    export_metrics_task: Option<ExportMetricsTask>,
88
89    /// gRPC serving state receiver. Only present if the gRPC server is started.
90    serve_state: Arc<Mutex<Option<oneshot::Receiver<Result<()>>>>>,
91
92    /// gRPC bind addr
93    bind_addr: Option<SocketAddr>,
94}
95
96impl MetasrvInstance {
97    pub async fn new(
98        opts: MetasrvOptions,
99        plugins: Plugins,
100        metasrv: Metasrv,
101    ) -> Result<MetasrvInstance> {
102        let http_server = HttpServerBuilder::new(opts.http.clone())
103            .with_metrics_handler(MetricsHandler)
104            .with_greptime_config_options(opts.to_toml().context(error::TomlFormatSnafu)?)
105            .build();
106
107        let metasrv = Arc::new(metasrv);
108        // put metasrv into plugins for later use
109        plugins.insert::<Arc<Metasrv>>(metasrv.clone());
110        let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
111            .context(error::InitExportMetricsTaskSnafu)?;
112        Ok(MetasrvInstance {
113            metasrv,
114            http_server,
115            opts,
116            signal_sender: None,
117            plugins,
118            export_metrics_task,
119            serve_state: Default::default(),
120            bind_addr: None,
121        })
122    }
123
124    pub async fn start(&mut self) -> Result<()> {
125        self.metasrv.try_start().await?;
126
127        if let Some(t) = self.export_metrics_task.as_ref() {
128            t.start(None).context(error::InitExportMetricsTaskSnafu)?
129        }
130
131        let (tx, rx) = mpsc::channel::<()>(1);
132
133        self.signal_sender = Some(tx);
134
135        let mut router = router(self.metasrv.clone());
136        if let Some(configurator) = self.metasrv.plugins().get::<ConfiguratorRef>() {
137            router = configurator.config_grpc(router);
138        }
139
140        let (serve_state_tx, serve_state_rx) = oneshot::channel();
141
142        let socket_addr =
143            bootstrap_metasrv_with_router(&self.opts.grpc.bind_addr, router, serve_state_tx, rx)
144                .await?;
145        self.bind_addr = Some(socket_addr);
146
147        let addr = self.opts.http.addr.parse().context(error::ParseAddrSnafu {
148            addr: &self.opts.http.addr,
149        })?;
150        self.http_server
151            .start(addr)
152            .await
153            .context(error::StartHttpSnafu)?;
154
155        *self.serve_state.lock().await = Some(serve_state_rx);
156        Ok(())
157    }
158
159    pub async fn shutdown(&self) -> Result<()> {
160        if let Some(mut rx) = self.serve_state.lock().await.take() {
161            if let Ok(Err(err)) = rx.try_recv() {
162                common_telemetry::error!(err; "Metasrv start failed")
163            }
164        }
165        if let Some(signal) = &self.signal_sender {
166            signal
167                .send(())
168                .await
169                .context(error::SendShutdownSignalSnafu)?;
170        }
171        self.metasrv.shutdown().await?;
172        self.http_server
173            .shutdown()
174            .await
175            .context(error::ShutdownServerSnafu {
176                server: self.http_server.name(),
177            })?;
178        Ok(())
179    }
180
181    pub fn plugins(&self) -> Plugins {
182        self.plugins.clone()
183    }
184
185    pub fn get_inner(&self) -> &Metasrv {
186        &self.metasrv
187    }
188    pub fn bind_addr(&self) -> &Option<SocketAddr> {
189        &self.bind_addr
190    }
191}
192
193pub async fn bootstrap_metasrv_with_router(
194    bind_addr: &str,
195    router: Router,
196    serve_state_tx: oneshot::Sender<Result<()>>,
197    mut shutdown_rx: Receiver<()>,
198) -> Result<SocketAddr> {
199    let listener = TcpListener::bind(bind_addr)
200        .await
201        .context(error::TcpBindSnafu { addr: bind_addr })?;
202
203    let real_bind_addr = listener
204        .local_addr()
205        .context(error::TcpBindSnafu { addr: bind_addr })?;
206
207    info!("gRPC server is bound to: {}", real_bind_addr);
208
209    let incoming =
210        TcpIncoming::from_listener(listener, true, None).context(error::TcpIncomingSnafu)?;
211
212    let _handle = common_runtime::spawn_global(async move {
213        let result = router
214            .serve_with_incoming_shutdown(incoming, async {
215                let _ = shutdown_rx.recv().await;
216            })
217            .await
218            .inspect_err(|err| common_telemetry::error!(err;"Failed to start metasrv"))
219            .context(error::StartGrpcSnafu);
220        let _ = serve_state_tx.send(result);
221    });
222
223    Ok(real_bind_addr)
224}
225
226#[macro_export]
227macro_rules! add_compressed_service {
228    ($builder:expr, $server:expr) => {
229        $builder.add_service(
230            $server
231                .accept_compressed(CompressionEncoding::Gzip)
232                .accept_compressed(CompressionEncoding::Zstd)
233                .send_compressed(CompressionEncoding::Gzip)
234                .send_compressed(CompressionEncoding::Zstd),
235        )
236    };
237}
238
239pub fn router(metasrv: Arc<Metasrv>) -> Router {
240    let mut router = tonic::transport::Server::builder().accept_http1(true); // for admin services
241    let router = add_compressed_service!(router, HeartbeatServer::from_arc(metasrv.clone()));
242    let router = add_compressed_service!(router, StoreServer::from_arc(metasrv.clone()));
243    let router = add_compressed_service!(router, ClusterServer::from_arc(metasrv.clone()));
244    let router = add_compressed_service!(router, ProcedureServiceServer::from_arc(metasrv.clone()));
245    router.add_service(admin::make_admin_service(metasrv))
246}
247
248pub async fn metasrv_builder(
249    opts: &MetasrvOptions,
250    plugins: Plugins,
251    kv_backend: Option<KvBackendRef>,
252) -> Result<MetasrvBuilder> {
253    let (mut kv_backend, election) = match (kv_backend, &opts.backend) {
254        (Some(kv_backend), _) => (kv_backend, None),
255        (None, BackendImpl::MemoryStore) => (Arc::new(MemoryKvBackend::new()) as _, None),
256        (None, BackendImpl::EtcdStore) => {
257            let etcd_client = create_etcd_client(&opts.store_addrs).await?;
258            let kv_backend = EtcdStore::with_etcd_client(etcd_client.clone(), opts.max_txn_ops);
259            let election = EtcdElection::with_etcd_client(
260                &opts.grpc.server_addr,
261                etcd_client,
262                opts.store_key_prefix.clone(),
263            )
264            .await?;
265
266            (kv_backend, Some(election))
267        }
268        #[cfg(feature = "pg_kvbackend")]
269        (None, BackendImpl::PostgresStore) => {
270            use std::time::Duration;
271
272            use common_meta::distributed_time_constants::POSTGRES_KEEP_ALIVE_SECS;
273
274            use crate::election::rds::postgres::ElectionPgClient;
275
276            let candidate_lease_ttl = Duration::from_secs(CANDIDATE_LEASE_SECS);
277            let execution_timeout = Duration::from_secs(META_LEASE_SECS);
278            let statement_timeout = Duration::from_secs(META_LEASE_SECS);
279            let idle_session_timeout = Duration::from_secs(META_LEASE_SECS);
280            let meta_lease_ttl = Duration::from_secs(META_LEASE_SECS);
281
282            let mut cfg = Config::new();
283            cfg.keepalives = Some(true);
284            cfg.keepalives_idle = Some(Duration::from_secs(POSTGRES_KEEP_ALIVE_SECS));
285            // We use a separate pool for election since we need a different session keep-alive idle time.
286            let pool = create_postgres_pool_with(&opts.store_addrs, cfg).await?;
287
288            let election_client = ElectionPgClient::new(
289                pool,
290                execution_timeout,
291                idle_session_timeout,
292                statement_timeout,
293            )?;
294            let election = PgElection::with_pg_client(
295                opts.grpc.server_addr.clone(),
296                election_client,
297                opts.store_key_prefix.clone(),
298                candidate_lease_ttl,
299                meta_lease_ttl,
300                &opts.meta_table_name,
301                opts.meta_election_lock_id,
302            )
303            .await?;
304
305            let pool = create_postgres_pool(&opts.store_addrs).await?;
306            let kv_backend = PgStore::with_pg_pool(pool, &opts.meta_table_name, opts.max_txn_ops)
307                .await
308                .context(error::KvBackendSnafu)?;
309
310            (kv_backend, Some(election))
311        }
312        #[cfg(feature = "mysql_kvbackend")]
313        (None, BackendImpl::MysqlStore) => {
314            use std::time::Duration;
315
316            use crate::election::rds::mysql::ElectionMysqlClient;
317
318            let pool = create_mysql_pool(&opts.store_addrs).await?;
319            let kv_backend =
320                MySqlStore::with_mysql_pool(pool, &opts.meta_table_name, opts.max_txn_ops)
321                    .await
322                    .context(error::KvBackendSnafu)?;
323            // Since election will acquire a lock of the table, we need a separate table for election.
324            let election_table_name = opts.meta_table_name.clone() + "_election";
325            // We use a separate pool for election since we need a different session keep-alive idle time.
326            let pool = create_mysql_pool(&opts.store_addrs).await?;
327            let execution_timeout = Duration::from_secs(META_LEASE_SECS);
328            let statement_timeout = Duration::from_secs(META_LEASE_SECS);
329            let idle_session_timeout = Duration::from_secs(META_LEASE_SECS);
330            let innode_lock_wait_timeout = Duration::from_secs(META_LEASE_SECS / 2);
331            let meta_lease_ttl = Duration::from_secs(META_LEASE_SECS);
332            let candidate_lease_ttl = Duration::from_secs(CANDIDATE_LEASE_SECS);
333
334            let election_client = ElectionMysqlClient::new(
335                pool,
336                execution_timeout,
337                statement_timeout,
338                innode_lock_wait_timeout,
339                idle_session_timeout,
340                &election_table_name,
341            );
342            let election = MySqlElection::with_mysql_client(
343                opts.grpc.server_addr.clone(),
344                election_client,
345                opts.store_key_prefix.clone(),
346                candidate_lease_ttl,
347                meta_lease_ttl,
348                &election_table_name,
349            )
350            .await?;
351            (kv_backend, Some(election))
352        }
353    };
354
355    if !opts.store_key_prefix.is_empty() {
356        info!(
357            "using chroot kv backend with prefix: {prefix}",
358            prefix = opts.store_key_prefix
359        );
360        kv_backend = Arc::new(ChrootKvBackend::new(
361            opts.store_key_prefix.clone().into_bytes(),
362            kv_backend,
363        ))
364    }
365
366    let in_memory = Arc::new(MemoryKvBackend::new()) as ResettableKvBackendRef;
367
368    let node_excluder = plugins
369        .get::<NodeExcluderRef>()
370        .unwrap_or_else(|| Arc::new(Vec::new()) as NodeExcluderRef);
371    let selector = if let Some(selector) = plugins.get::<SelectorRef>() {
372        info!("Using selector from plugins");
373        selector
374    } else {
375        let selector = match opts.selector {
376            SelectorType::LoadBased => Arc::new(LoadBasedSelector::new(
377                RegionNumsBasedWeightCompute,
378                node_excluder,
379            )) as SelectorRef,
380            SelectorType::LeaseBased => {
381                Arc::new(LeaseBasedSelector::new(node_excluder)) as SelectorRef
382            }
383            SelectorType::RoundRobin => Arc::new(RoundRobinSelector::new(
384                SelectTarget::Datanode,
385                node_excluder,
386            )) as SelectorRef,
387        };
388        info!(
389            "Using selector from options, selector type: {}",
390            opts.selector.as_ref()
391        );
392        selector
393    };
394
395    Ok(MetasrvBuilder::new()
396        .options(opts.clone())
397        .kv_backend(kv_backend)
398        .in_memory(in_memory)
399        .selector(selector)
400        .election(election)
401        .plugins(plugins))
402}
403
404pub async fn create_etcd_client(store_addrs: &[String]) -> Result<Client> {
405    let etcd_endpoints = store_addrs
406        .iter()
407        .map(|x| x.trim())
408        .filter(|x| !x.is_empty())
409        .collect::<Vec<_>>();
410    Client::connect(&etcd_endpoints, None)
411        .await
412        .context(error::ConnectEtcdSnafu)
413}
414
415#[cfg(feature = "pg_kvbackend")]
416/// Creates a pool for the Postgres backend.
417///
418/// It only use first store addr to create a pool.
419pub async fn create_postgres_pool(store_addrs: &[String]) -> Result<deadpool_postgres::Pool> {
420    create_postgres_pool_with(store_addrs, Config::new()).await
421}
422
423#[cfg(feature = "pg_kvbackend")]
424/// Creates a pool for the Postgres backend.
425///
426/// It only use first store addr to create a pool, and use the given config to create a pool.
427pub async fn create_postgres_pool_with(
428    store_addrs: &[String],
429    mut cfg: Config,
430) -> Result<deadpool_postgres::Pool> {
431    let postgres_url = store_addrs.first().context(error::InvalidArgumentsSnafu {
432        err_msg: "empty store addrs",
433    })?;
434    cfg.url = Some(postgres_url.to_string());
435    let pool = cfg
436        .create_pool(Some(Runtime::Tokio1), NoTls)
437        .context(error::CreatePostgresPoolSnafu)?;
438    Ok(pool)
439}
440
441#[cfg(feature = "mysql_kvbackend")]
442async fn setup_mysql_options(store_addrs: &[String]) -> Result<MySqlConnectOptions> {
443    let mysql_url = store_addrs.first().context(error::InvalidArgumentsSnafu {
444        err_msg: "empty store addrs",
445    })?;
446    // Avoid `SET` commands in sqlx
447    let opts: MySqlConnectOptions = mysql_url
448        .parse()
449        .context(error::ParseMySqlUrlSnafu { mysql_url })?;
450    let opts = opts
451        .no_engine_substitution(false)
452        .pipes_as_concat(false)
453        .timezone(None)
454        .set_names(false);
455    Ok(opts)
456}
457
458#[cfg(feature = "mysql_kvbackend")]
459pub async fn create_mysql_pool(store_addrs: &[String]) -> Result<MySqlPool> {
460    let opts = setup_mysql_options(store_addrs).await?;
461    let pool = MySqlPool::connect_with(opts)
462        .await
463        .context(error::CreateMySqlPoolSnafu)?;
464
465    Ok(pool)
466}