meta_srv/
bootstrap.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::net::SocketAddr;
16use std::sync::Arc;
17
18use api::v1::meta::cluster_server::ClusterServer;
19use api::v1::meta::heartbeat_server::HeartbeatServer;
20use api::v1::meta::procedure_service_server::ProcedureServiceServer;
21use api::v1::meta::store_server::StoreServer;
22use common_base::Plugins;
23use common_config::Configurable;
24#[cfg(feature = "pg_kvbackend")]
25use common_error::ext::BoxedError;
26#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
27use common_meta::distributed_time_constants::META_LEASE_SECS;
28use common_meta::kv_backend::chroot::ChrootKvBackend;
29use common_meta::kv_backend::etcd::EtcdStore;
30use common_meta::kv_backend::memory::MemoryKvBackend;
31#[cfg(feature = "pg_kvbackend")]
32use common_meta::kv_backend::rds::postgres::create_postgres_tls_connector;
33#[cfg(feature = "pg_kvbackend")]
34use common_meta::kv_backend::rds::postgres::{TlsMode as PgTlsMode, TlsOption as PgTlsOption};
35#[cfg(feature = "mysql_kvbackend")]
36use common_meta::kv_backend::rds::MySqlStore;
37#[cfg(feature = "pg_kvbackend")]
38use common_meta::kv_backend::rds::PgStore;
39use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
40use common_telemetry::info;
41#[cfg(feature = "pg_kvbackend")]
42use deadpool_postgres::{Config, Runtime};
43use either::Either;
44use etcd_client::{Client, ConnectOptions};
45use servers::configurator::ConfiguratorRef;
46use servers::export_metrics::ExportMetricsTask;
47use servers::http::{HttpServer, HttpServerBuilder};
48use servers::metrics_handler::MetricsHandler;
49use servers::server::Server;
50use servers::tls::TlsOption;
51#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
52use snafu::OptionExt;
53use snafu::ResultExt;
54#[cfg(feature = "mysql_kvbackend")]
55use sqlx::mysql::MySqlConnectOptions;
56#[cfg(feature = "mysql_kvbackend")]
57use sqlx::mysql::MySqlPool;
58use tokio::net::TcpListener;
59use tokio::sync::mpsc::{self, Receiver, Sender};
60use tokio::sync::{oneshot, Mutex};
61#[cfg(feature = "pg_kvbackend")]
62use tokio_postgres::NoTls;
63use tonic::codec::CompressionEncoding;
64use tonic::transport::server::{Router, TcpIncoming};
65
66use crate::election::etcd::EtcdElection;
67#[cfg(feature = "mysql_kvbackend")]
68use crate::election::rds::mysql::MySqlElection;
69#[cfg(feature = "pg_kvbackend")]
70use crate::election::rds::postgres::PgElection;
71#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
72use crate::election::CANDIDATE_LEASE_SECS;
73use crate::metasrv::builder::MetasrvBuilder;
74use crate::metasrv::{BackendImpl, Metasrv, MetasrvOptions, SelectTarget, SelectorRef};
75use crate::node_excluder::NodeExcluderRef;
76use crate::selector::lease_based::LeaseBasedSelector;
77use crate::selector::load_based::LoadBasedSelector;
78use crate::selector::round_robin::RoundRobinSelector;
79use crate::selector::weight_compute::RegionNumsBasedWeightCompute;
80use crate::selector::SelectorType;
81use crate::service::admin;
82use crate::service::admin::admin_axum_router;
83use crate::{error, Result};
84
85pub struct MetasrvInstance {
86    metasrv: Arc<Metasrv>,
87
88    http_server: Either<Option<HttpServerBuilder>, HttpServer>,
89
90    opts: MetasrvOptions,
91
92    signal_sender: Option<Sender<()>>,
93
94    plugins: Plugins,
95
96    export_metrics_task: Option<ExportMetricsTask>,
97
98    /// gRPC serving state receiver. Only present if the gRPC server is started.
99    serve_state: Arc<Mutex<Option<oneshot::Receiver<Result<()>>>>>,
100
101    /// gRPC bind addr
102    bind_addr: Option<SocketAddr>,
103}
104
105impl MetasrvInstance {
106    pub async fn new(metasrv: Metasrv) -> Result<MetasrvInstance> {
107        let opts = metasrv.options().clone();
108        let plugins = metasrv.plugins().clone();
109        let metasrv = Arc::new(metasrv);
110
111        // Wire up the admin_axum_router as an extra router
112        let extra_routers = admin_axum_router(metasrv.clone());
113
114        let mut builder = HttpServerBuilder::new(opts.http.clone())
115            .with_metrics_handler(MetricsHandler)
116            .with_greptime_config_options(opts.to_toml().context(error::TomlFormatSnafu)?);
117        builder = builder.with_extra_router(extra_routers);
118
119        // put metasrv into plugins for later use
120        plugins.insert::<Arc<Metasrv>>(metasrv.clone());
121        let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
122            .context(error::InitExportMetricsTaskSnafu)?;
123        Ok(MetasrvInstance {
124            metasrv,
125            http_server: Either::Left(Some(builder)),
126            opts,
127            signal_sender: None,
128            plugins,
129            export_metrics_task,
130            serve_state: Default::default(),
131            bind_addr: None,
132        })
133    }
134
135    pub async fn start(&mut self) -> Result<()> {
136        if let Some(builder) = self.http_server.as_mut().left()
137            && let Some(builder) = builder.take()
138        {
139            let mut server = builder.build();
140
141            let addr = self.opts.http.addr.parse().context(error::ParseAddrSnafu {
142                addr: &self.opts.http.addr,
143            })?;
144            info!("starting http server at {}", addr);
145            server.start(addr).await.context(error::StartHttpSnafu)?;
146
147            self.http_server = Either::Right(server);
148        } else {
149            // If the http server builder is not present, the Metasrv has to be called "start"
150            // already, regardless of the startup was successful or not. Return an `Ok` here for
151            // simplicity.
152            return Ok(());
153        };
154
155        self.metasrv.try_start().await?;
156
157        if let Some(t) = self.export_metrics_task.as_ref() {
158            t.start(None).context(error::InitExportMetricsTaskSnafu)?
159        }
160
161        let (tx, rx) = mpsc::channel::<()>(1);
162
163        self.signal_sender = Some(tx);
164
165        // Start gRPC server with admin services for backward compatibility
166        let mut router = router(self.metasrv.clone());
167        if let Some(configurator) = self.metasrv.plugins().get::<ConfiguratorRef>() {
168            router = configurator.config_grpc(router);
169        }
170
171        let (serve_state_tx, serve_state_rx) = oneshot::channel();
172
173        let socket_addr =
174            bootstrap_metasrv_with_router(&self.opts.grpc.bind_addr, router, serve_state_tx, rx)
175                .await?;
176        self.bind_addr = Some(socket_addr);
177
178        *self.serve_state.lock().await = Some(serve_state_rx);
179        Ok(())
180    }
181
182    pub async fn shutdown(&self) -> Result<()> {
183        if let Some(mut rx) = self.serve_state.lock().await.take() {
184            if let Ok(Err(err)) = rx.try_recv() {
185                common_telemetry::error!(err; "Metasrv start failed")
186            }
187        }
188        if let Some(signal) = &self.signal_sender {
189            signal
190                .send(())
191                .await
192                .context(error::SendShutdownSignalSnafu)?;
193        }
194        self.metasrv.shutdown().await?;
195
196        if let Some(http_server) = self.http_server.as_ref().right() {
197            http_server
198                .shutdown()
199                .await
200                .context(error::ShutdownServerSnafu {
201                    server: http_server.name(),
202                })?;
203        }
204        Ok(())
205    }
206
207    pub fn plugins(&self) -> Plugins {
208        self.plugins.clone()
209    }
210
211    pub fn get_inner(&self) -> &Metasrv {
212        &self.metasrv
213    }
214    pub fn bind_addr(&self) -> &Option<SocketAddr> {
215        &self.bind_addr
216    }
217
218    pub fn mut_http_server(&mut self) -> &mut Either<Option<HttpServerBuilder>, HttpServer> {
219        &mut self.http_server
220    }
221
222    pub fn http_server(&self) -> Option<&HttpServer> {
223        self.http_server.as_ref().right()
224    }
225}
226
227pub async fn bootstrap_metasrv_with_router(
228    bind_addr: &str,
229    router: Router,
230    serve_state_tx: oneshot::Sender<Result<()>>,
231    mut shutdown_rx: Receiver<()>,
232) -> Result<SocketAddr> {
233    let listener = TcpListener::bind(bind_addr)
234        .await
235        .context(error::TcpBindSnafu { addr: bind_addr })?;
236
237    let real_bind_addr = listener
238        .local_addr()
239        .context(error::TcpBindSnafu { addr: bind_addr })?;
240
241    info!("gRPC server is bound to: {}", real_bind_addr);
242
243    let incoming = TcpIncoming::from(listener).with_nodelay(Some(true));
244
245    let _handle = common_runtime::spawn_global(async move {
246        let result = router
247            .serve_with_incoming_shutdown(incoming, async {
248                let _ = shutdown_rx.recv().await;
249            })
250            .await
251            .inspect_err(|err| common_telemetry::error!(err;"Failed to start metasrv"))
252            .context(error::StartGrpcSnafu);
253        let _ = serve_state_tx.send(result);
254    });
255
256    Ok(real_bind_addr)
257}
258
259#[macro_export]
260macro_rules! add_compressed_service {
261    ($builder:expr, $server:expr) => {
262        $builder.add_service(
263            $server
264                .accept_compressed(CompressionEncoding::Gzip)
265                .accept_compressed(CompressionEncoding::Zstd)
266                .send_compressed(CompressionEncoding::Gzip)
267                .send_compressed(CompressionEncoding::Zstd),
268        )
269    };
270}
271
272pub fn router(metasrv: Arc<Metasrv>) -> Router {
273    let mut router = tonic::transport::Server::builder().accept_http1(true); // for admin services
274    let router = add_compressed_service!(router, HeartbeatServer::from_arc(metasrv.clone()));
275    let router = add_compressed_service!(router, StoreServer::from_arc(metasrv.clone()));
276    let router = add_compressed_service!(router, ClusterServer::from_arc(metasrv.clone()));
277    let router = add_compressed_service!(router, ProcedureServiceServer::from_arc(metasrv.clone()));
278    router.add_service(admin::make_admin_service(metasrv))
279}
280
281pub async fn metasrv_builder(
282    opts: &MetasrvOptions,
283    plugins: Plugins,
284    kv_backend: Option<KvBackendRef>,
285) -> Result<MetasrvBuilder> {
286    let (mut kv_backend, election) = match (kv_backend, &opts.backend) {
287        (Some(kv_backend), _) => (kv_backend, None),
288        (None, BackendImpl::MemoryStore) => (Arc::new(MemoryKvBackend::new()) as _, None),
289        (None, BackendImpl::EtcdStore) => {
290            let etcd_client =
291                create_etcd_client_with_tls(&opts.store_addrs, opts.backend_tls.as_ref()).await?;
292            let kv_backend = EtcdStore::with_etcd_client(etcd_client.clone(), opts.max_txn_ops);
293            let election = EtcdElection::with_etcd_client(
294                &opts.grpc.server_addr,
295                etcd_client,
296                opts.store_key_prefix.clone(),
297            )
298            .await?;
299
300            (kv_backend, Some(election))
301        }
302        #[cfg(feature = "pg_kvbackend")]
303        (None, BackendImpl::PostgresStore) => {
304            use std::time::Duration;
305
306            use common_meta::distributed_time_constants::POSTGRES_KEEP_ALIVE_SECS;
307
308            use crate::election::rds::postgres::ElectionPgClient;
309
310            let candidate_lease_ttl = Duration::from_secs(CANDIDATE_LEASE_SECS);
311            let execution_timeout = Duration::from_secs(META_LEASE_SECS);
312            let statement_timeout = Duration::from_secs(META_LEASE_SECS);
313            let idle_session_timeout = Duration::from_secs(META_LEASE_SECS);
314            let meta_lease_ttl = Duration::from_secs(META_LEASE_SECS);
315
316            let mut cfg = Config::new();
317            cfg.keepalives = Some(true);
318            cfg.keepalives_idle = Some(Duration::from_secs(POSTGRES_KEEP_ALIVE_SECS));
319            // We use a separate pool for election since we need a different session keep-alive idle time.
320            let pool =
321                create_postgres_pool_with(&opts.store_addrs, cfg, opts.backend_tls.clone()).await?;
322
323            let election_client = ElectionPgClient::new(
324                pool,
325                execution_timeout,
326                idle_session_timeout,
327                statement_timeout,
328            )?;
329            let election = PgElection::with_pg_client(
330                opts.grpc.server_addr.clone(),
331                election_client,
332                opts.store_key_prefix.clone(),
333                candidate_lease_ttl,
334                meta_lease_ttl,
335                opts.meta_schema_name.as_deref(),
336                &opts.meta_table_name,
337                opts.meta_election_lock_id,
338            )
339            .await?;
340
341            let pool = create_postgres_pool(&opts.store_addrs, opts.backend_tls.clone()).await?;
342            let kv_backend = PgStore::with_pg_pool(
343                pool,
344                opts.meta_schema_name.as_deref(),
345                &opts.meta_table_name,
346                opts.max_txn_ops,
347            )
348            .await
349            .context(error::KvBackendSnafu)?;
350
351            (kv_backend, Some(election))
352        }
353        #[cfg(feature = "mysql_kvbackend")]
354        (None, BackendImpl::MysqlStore) => {
355            use std::time::Duration;
356
357            use crate::election::rds::mysql::ElectionMysqlClient;
358
359            let pool = create_mysql_pool(&opts.store_addrs).await?;
360            let kv_backend =
361                MySqlStore::with_mysql_pool(pool, &opts.meta_table_name, opts.max_txn_ops)
362                    .await
363                    .context(error::KvBackendSnafu)?;
364            // Since election will acquire a lock of the table, we need a separate table for election.
365            let election_table_name = opts.meta_table_name.clone() + "_election";
366            // We use a separate pool for election since we need a different session keep-alive idle time.
367            let pool = create_mysql_pool(&opts.store_addrs).await?;
368            let execution_timeout = Duration::from_secs(META_LEASE_SECS);
369            let statement_timeout = Duration::from_secs(META_LEASE_SECS);
370            let idle_session_timeout = Duration::from_secs(META_LEASE_SECS);
371            let innode_lock_wait_timeout = Duration::from_secs(META_LEASE_SECS / 2);
372            let meta_lease_ttl = Duration::from_secs(META_LEASE_SECS);
373            let candidate_lease_ttl = Duration::from_secs(CANDIDATE_LEASE_SECS);
374
375            let election_client = ElectionMysqlClient::new(
376                pool,
377                execution_timeout,
378                statement_timeout,
379                innode_lock_wait_timeout,
380                idle_session_timeout,
381                &election_table_name,
382            );
383            let election = MySqlElection::with_mysql_client(
384                opts.grpc.server_addr.clone(),
385                election_client,
386                opts.store_key_prefix.clone(),
387                candidate_lease_ttl,
388                meta_lease_ttl,
389                &election_table_name,
390            )
391            .await?;
392            (kv_backend, Some(election))
393        }
394    };
395
396    if !opts.store_key_prefix.is_empty() {
397        info!(
398            "using chroot kv backend with prefix: {prefix}",
399            prefix = opts.store_key_prefix
400        );
401        kv_backend = Arc::new(ChrootKvBackend::new(
402            opts.store_key_prefix.clone().into_bytes(),
403            kv_backend,
404        ))
405    }
406
407    let in_memory = Arc::new(MemoryKvBackend::new()) as ResettableKvBackendRef;
408
409    let node_excluder = plugins
410        .get::<NodeExcluderRef>()
411        .unwrap_or_else(|| Arc::new(Vec::new()) as NodeExcluderRef);
412    let selector = if let Some(selector) = plugins.get::<SelectorRef>() {
413        info!("Using selector from plugins");
414        selector
415    } else {
416        let selector = match opts.selector {
417            SelectorType::LoadBased => Arc::new(LoadBasedSelector::new(
418                RegionNumsBasedWeightCompute,
419                node_excluder,
420            )) as SelectorRef,
421            SelectorType::LeaseBased => {
422                Arc::new(LeaseBasedSelector::new(node_excluder)) as SelectorRef
423            }
424            SelectorType::RoundRobin => Arc::new(RoundRobinSelector::new(
425                SelectTarget::Datanode,
426                node_excluder,
427            )) as SelectorRef,
428        };
429        info!(
430            "Using selector from options, selector type: {}",
431            opts.selector.as_ref()
432        );
433        selector
434    };
435
436    Ok(MetasrvBuilder::new()
437        .options(opts.clone())
438        .kv_backend(kv_backend)
439        .in_memory(in_memory)
440        .selector(selector)
441        .election(election)
442        .plugins(plugins))
443}
444
445pub async fn create_etcd_client(store_addrs: &[String]) -> Result<Client> {
446    create_etcd_client_with_tls(store_addrs, None).await
447}
448
449fn build_connection_options(tls_config: Option<&TlsOption>) -> Result<Option<ConnectOptions>> {
450    use std::fs;
451
452    use common_telemetry::debug;
453    use etcd_client::{Certificate, ConnectOptions, Identity, TlsOptions};
454    use servers::tls::TlsMode;
455
456    // If TLS options are not provided, return None
457    let Some(tls_config) = tls_config else {
458        return Ok(None);
459    };
460    // If TLS is disabled, return None
461    if matches!(tls_config.mode, TlsMode::Disable) {
462        return Ok(None);
463    }
464    let mut etcd_tls_opts = TlsOptions::new();
465    // Set CA certificate if provided
466    if !tls_config.ca_cert_path.is_empty() {
467        debug!("Using CA certificate from {}", tls_config.ca_cert_path);
468        let ca_cert_pem = fs::read(&tls_config.ca_cert_path).context(error::FileIoSnafu {
469            path: &tls_config.ca_cert_path,
470        })?;
471        let ca_cert = Certificate::from_pem(ca_cert_pem);
472        etcd_tls_opts = etcd_tls_opts.ca_certificate(ca_cert);
473    }
474    // Set client identity (cert + key) if both are provided
475    if !tls_config.cert_path.is_empty() && !tls_config.key_path.is_empty() {
476        debug!(
477            "Using client certificate from {} and key from {}",
478            tls_config.cert_path, tls_config.key_path
479        );
480        let cert_pem = fs::read(&tls_config.cert_path).context(error::FileIoSnafu {
481            path: &tls_config.cert_path,
482        })?;
483        let key_pem = fs::read(&tls_config.key_path).context(error::FileIoSnafu {
484            path: &tls_config.key_path,
485        })?;
486        let identity = Identity::from_pem(cert_pem, key_pem);
487        etcd_tls_opts = etcd_tls_opts.identity(identity);
488    }
489    // Enable native TLS roots for additional trust anchors
490    etcd_tls_opts = etcd_tls_opts.with_native_roots();
491    Ok(Some(ConnectOptions::new().with_tls(etcd_tls_opts)))
492}
493
494pub async fn create_etcd_client_with_tls(
495    store_addrs: &[String],
496    tls_config: Option<&TlsOption>,
497) -> Result<Client> {
498    let etcd_endpoints = store_addrs
499        .iter()
500        .map(|x| x.trim())
501        .filter(|x| !x.is_empty())
502        .collect::<Vec<_>>();
503
504    let connect_options = build_connection_options(tls_config)?;
505
506    Client::connect(&etcd_endpoints, connect_options)
507        .await
508        .context(error::ConnectEtcdSnafu)
509}
510
511#[cfg(feature = "pg_kvbackend")]
512/// Converts servers::tls::TlsOption to postgres::TlsOption to avoid circular dependencies
513fn convert_tls_option(tls_option: &TlsOption) -> PgTlsOption {
514    let mode = match tls_option.mode {
515        servers::tls::TlsMode::Disable => PgTlsMode::Disable,
516        servers::tls::TlsMode::Prefer => PgTlsMode::Prefer,
517        servers::tls::TlsMode::Require => PgTlsMode::Require,
518        servers::tls::TlsMode::VerifyCa => PgTlsMode::VerifyCa,
519        servers::tls::TlsMode::VerifyFull => PgTlsMode::VerifyFull,
520    };
521
522    PgTlsOption {
523        mode,
524        cert_path: tls_option.cert_path.clone(),
525        key_path: tls_option.key_path.clone(),
526        ca_cert_path: tls_option.ca_cert_path.clone(),
527        watch: tls_option.watch,
528    }
529}
530
531#[cfg(feature = "pg_kvbackend")]
532/// Creates a pool for the Postgres backend with optional TLS.
533///
534/// It only use first store addr to create a pool.
535pub async fn create_postgres_pool(
536    store_addrs: &[String],
537    tls_config: Option<TlsOption>,
538) -> Result<deadpool_postgres::Pool> {
539    create_postgres_pool_with(store_addrs, Config::new(), tls_config).await
540}
541
542#[cfg(feature = "pg_kvbackend")]
543/// Creates a pool for the Postgres backend with config and optional TLS.
544///
545/// It only use first store addr to create a pool, and use the given config to create a pool.
546pub async fn create_postgres_pool_with(
547    store_addrs: &[String],
548    mut cfg: Config,
549    tls_config: Option<TlsOption>,
550) -> Result<deadpool_postgres::Pool> {
551    let postgres_url = store_addrs.first().context(error::InvalidArgumentsSnafu {
552        err_msg: "empty store addrs",
553    })?;
554    cfg.url = Some(postgres_url.to_string());
555
556    let pool = if let Some(tls_config) = tls_config {
557        let pg_tls_config = convert_tls_option(&tls_config);
558        let tls_connector =
559            create_postgres_tls_connector(&pg_tls_config).map_err(|e| error::Error::Other {
560                source: BoxedError::new(e),
561                location: snafu::Location::new(file!(), line!(), 0),
562            })?;
563        cfg.create_pool(Some(Runtime::Tokio1), tls_connector)
564            .context(error::CreatePostgresPoolSnafu)?
565    } else {
566        cfg.create_pool(Some(Runtime::Tokio1), NoTls)
567            .context(error::CreatePostgresPoolSnafu)?
568    };
569
570    Ok(pool)
571}
572
573#[cfg(feature = "mysql_kvbackend")]
574async fn setup_mysql_options(store_addrs: &[String]) -> Result<MySqlConnectOptions> {
575    let mysql_url = store_addrs.first().context(error::InvalidArgumentsSnafu {
576        err_msg: "empty store addrs",
577    })?;
578    // Avoid `SET` commands in sqlx
579    let opts: MySqlConnectOptions = mysql_url
580        .parse()
581        .context(error::ParseMySqlUrlSnafu { mysql_url })?;
582    let opts = opts
583        .no_engine_substitution(false)
584        .pipes_as_concat(false)
585        .timezone(None)
586        .set_names(false);
587    Ok(opts)
588}
589
590#[cfg(feature = "mysql_kvbackend")]
591pub async fn create_mysql_pool(store_addrs: &[String]) -> Result<MySqlPool> {
592    let opts = setup_mysql_options(store_addrs).await?;
593    let pool = MySqlPool::connect_with(opts)
594        .await
595        .context(error::CreateMySqlPoolSnafu)?;
596
597    Ok(pool)
598}
599
600#[cfg(test)]
601mod tests {
602    use servers::tls::TlsMode;
603
604    use super::*;
605
606    #[tokio::test]
607    async fn test_create_etcd_client_tls_without_certs() {
608        let endpoints: Vec<String> = match std::env::var("GT_ETCD_TLS_ENDPOINTS") {
609            Ok(endpoints_str) => endpoints_str
610                .split(',')
611                .map(|s| s.trim().to_string())
612                .collect(),
613            Err(_) => return,
614        };
615
616        let tls_config = TlsOption {
617            mode: TlsMode::Require,
618            ca_cert_path: String::new(),
619            cert_path: String::new(),
620            key_path: String::new(),
621            watch: false,
622        };
623
624        let _client = create_etcd_client_with_tls(&endpoints, Some(&tls_config))
625            .await
626            .unwrap();
627    }
628
629    #[tokio::test]
630    async fn test_create_etcd_client_tls_with_client_certs() {
631        let endpoints: Vec<String> = match std::env::var("GT_ETCD_TLS_ENDPOINTS") {
632            Ok(endpoints_str) => endpoints_str
633                .split(',')
634                .map(|s| s.trim().to_string())
635                .collect(),
636            Err(_) => return,
637        };
638
639        let cert_dir = std::env::current_dir()
640            .unwrap()
641            .join("tests-integration")
642            .join("fixtures")
643            .join("etcd-tls-certs");
644
645        if cert_dir.join("client.crt").exists() && cert_dir.join("client-key.pem").exists() {
646            let tls_config = TlsOption {
647                mode: TlsMode::Require,
648                ca_cert_path: String::new(),
649                cert_path: cert_dir.join("client.crt").to_string_lossy().to_string(),
650                key_path: cert_dir
651                    .join("client-key.pem")
652                    .to_string_lossy()
653                    .to_string(),
654                watch: false,
655            };
656
657            let _client = create_etcd_client_with_tls(&endpoints, Some(&tls_config))
658                .await
659                .unwrap();
660        }
661    }
662
663    #[tokio::test]
664    async fn test_create_etcd_client_tls_with_full_certs() {
665        let endpoints: Vec<String> = match std::env::var("GT_ETCD_TLS_ENDPOINTS") {
666            Ok(endpoints_str) => endpoints_str
667                .split(',')
668                .map(|s| s.trim().to_string())
669                .collect(),
670            Err(_) => return,
671        };
672
673        let cert_dir = std::env::current_dir()
674            .unwrap()
675            .join("tests-integration")
676            .join("fixtures")
677            .join("etcd-tls-certs");
678
679        if cert_dir.join("ca.crt").exists()
680            && cert_dir.join("client.crt").exists()
681            && cert_dir.join("client-key.pem").exists()
682        {
683            let tls_config = TlsOption {
684                mode: TlsMode::Require,
685                ca_cert_path: cert_dir.join("ca.crt").to_string_lossy().to_string(),
686                cert_path: cert_dir.join("client.crt").to_string_lossy().to_string(),
687                key_path: cert_dir
688                    .join("client-key.pem")
689                    .to_string_lossy()
690                    .to_string(),
691                watch: false,
692            };
693
694            let _client = create_etcd_client_with_tls(&endpoints, Some(&tls_config))
695                .await
696                .unwrap();
697        }
698    }
699}