1use std::net::SocketAddr;
16use std::sync::Arc;
17
18use api::v1::meta::cluster_server::ClusterServer;
19use api::v1::meta::heartbeat_server::HeartbeatServer;
20use api::v1::meta::procedure_service_server::ProcedureServiceServer;
21use api::v1::meta::store_server::StoreServer;
22use common_base::Plugins;
23use common_config::Configurable;
24#[cfg(feature = "pg_kvbackend")]
25use common_error::ext::BoxedError;
26#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
27use common_meta::distributed_time_constants::META_LEASE_SECS;
28use common_meta::kv_backend::chroot::ChrootKvBackend;
29use common_meta::kv_backend::etcd::EtcdStore;
30use common_meta::kv_backend::memory::MemoryKvBackend;
31#[cfg(feature = "pg_kvbackend")]
32use common_meta::kv_backend::rds::postgres::create_postgres_tls_connector;
33#[cfg(feature = "pg_kvbackend")]
34use common_meta::kv_backend::rds::postgres::{TlsMode as PgTlsMode, TlsOption as PgTlsOption};
35#[cfg(feature = "mysql_kvbackend")]
36use common_meta::kv_backend::rds::MySqlStore;
37#[cfg(feature = "pg_kvbackend")]
38use common_meta::kv_backend::rds::PgStore;
39use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
40use common_telemetry::info;
41#[cfg(feature = "pg_kvbackend")]
42use deadpool_postgres::{Config, Runtime};
43use either::Either;
44use etcd_client::{Client, ConnectOptions};
45use servers::configurator::ConfiguratorRef;
46use servers::export_metrics::ExportMetricsTask;
47use servers::http::{HttpServer, HttpServerBuilder};
48use servers::metrics_handler::MetricsHandler;
49use servers::server::Server;
50use servers::tls::TlsOption;
51#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
52use snafu::OptionExt;
53use snafu::ResultExt;
54#[cfg(feature = "mysql_kvbackend")]
55use sqlx::mysql::MySqlConnectOptions;
56#[cfg(feature = "mysql_kvbackend")]
57use sqlx::mysql::MySqlPool;
58use tokio::net::TcpListener;
59use tokio::sync::mpsc::{self, Receiver, Sender};
60use tokio::sync::{oneshot, Mutex};
61#[cfg(feature = "pg_kvbackend")]
62use tokio_postgres::NoTls;
63use tonic::codec::CompressionEncoding;
64use tonic::transport::server::{Router, TcpIncoming};
65
66use crate::election::etcd::EtcdElection;
67#[cfg(feature = "mysql_kvbackend")]
68use crate::election::rds::mysql::MySqlElection;
69#[cfg(feature = "pg_kvbackend")]
70use crate::election::rds::postgres::PgElection;
71#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
72use crate::election::CANDIDATE_LEASE_SECS;
73use crate::metasrv::builder::MetasrvBuilder;
74use crate::metasrv::{BackendImpl, Metasrv, MetasrvOptions, SelectTarget, SelectorRef};
75use crate::node_excluder::NodeExcluderRef;
76use crate::selector::lease_based::LeaseBasedSelector;
77use crate::selector::load_based::LoadBasedSelector;
78use crate::selector::round_robin::RoundRobinSelector;
79use crate::selector::weight_compute::RegionNumsBasedWeightCompute;
80use crate::selector::SelectorType;
81use crate::service::admin;
82use crate::service::admin::admin_axum_router;
83use crate::{error, Result};
84
85pub struct MetasrvInstance {
86 metasrv: Arc<Metasrv>,
87
88 http_server: Either<Option<HttpServerBuilder>, HttpServer>,
89
90 opts: MetasrvOptions,
91
92 signal_sender: Option<Sender<()>>,
93
94 plugins: Plugins,
95
96 export_metrics_task: Option<ExportMetricsTask>,
97
98 serve_state: Arc<Mutex<Option<oneshot::Receiver<Result<()>>>>>,
100
101 bind_addr: Option<SocketAddr>,
103}
104
105impl MetasrvInstance {
106 pub async fn new(metasrv: Metasrv) -> Result<MetasrvInstance> {
107 let opts = metasrv.options().clone();
108 let plugins = metasrv.plugins().clone();
109 let metasrv = Arc::new(metasrv);
110
111 let extra_routers = admin_axum_router(metasrv.clone());
113
114 let mut builder = HttpServerBuilder::new(opts.http.clone())
115 .with_metrics_handler(MetricsHandler)
116 .with_greptime_config_options(opts.to_toml().context(error::TomlFormatSnafu)?);
117 builder = builder.with_extra_router(extra_routers);
118
119 plugins.insert::<Arc<Metasrv>>(metasrv.clone());
121 let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
122 .context(error::InitExportMetricsTaskSnafu)?;
123 Ok(MetasrvInstance {
124 metasrv,
125 http_server: Either::Left(Some(builder)),
126 opts,
127 signal_sender: None,
128 plugins,
129 export_metrics_task,
130 serve_state: Default::default(),
131 bind_addr: None,
132 })
133 }
134
135 pub async fn start(&mut self) -> Result<()> {
136 if let Some(builder) = self.http_server.as_mut().left()
137 && let Some(builder) = builder.take()
138 {
139 let mut server = builder.build();
140
141 let addr = self.opts.http.addr.parse().context(error::ParseAddrSnafu {
142 addr: &self.opts.http.addr,
143 })?;
144 info!("starting http server at {}", addr);
145 server.start(addr).await.context(error::StartHttpSnafu)?;
146
147 self.http_server = Either::Right(server);
148 } else {
149 return Ok(());
153 };
154
155 self.metasrv.try_start().await?;
156
157 if let Some(t) = self.export_metrics_task.as_ref() {
158 t.start(None).context(error::InitExportMetricsTaskSnafu)?
159 }
160
161 let (tx, rx) = mpsc::channel::<()>(1);
162
163 self.signal_sender = Some(tx);
164
165 let mut router = router(self.metasrv.clone());
167 if let Some(configurator) = self.metasrv.plugins().get::<ConfiguratorRef>() {
168 router = configurator.config_grpc(router);
169 }
170
171 let (serve_state_tx, serve_state_rx) = oneshot::channel();
172
173 let socket_addr =
174 bootstrap_metasrv_with_router(&self.opts.grpc.bind_addr, router, serve_state_tx, rx)
175 .await?;
176 self.bind_addr = Some(socket_addr);
177
178 *self.serve_state.lock().await = Some(serve_state_rx);
179 Ok(())
180 }
181
182 pub async fn shutdown(&self) -> Result<()> {
183 if let Some(mut rx) = self.serve_state.lock().await.take() {
184 if let Ok(Err(err)) = rx.try_recv() {
185 common_telemetry::error!(err; "Metasrv start failed")
186 }
187 }
188 if let Some(signal) = &self.signal_sender {
189 signal
190 .send(())
191 .await
192 .context(error::SendShutdownSignalSnafu)?;
193 }
194 self.metasrv.shutdown().await?;
195
196 if let Some(http_server) = self.http_server.as_ref().right() {
197 http_server
198 .shutdown()
199 .await
200 .context(error::ShutdownServerSnafu {
201 server: http_server.name(),
202 })?;
203 }
204 Ok(())
205 }
206
207 pub fn plugins(&self) -> Plugins {
208 self.plugins.clone()
209 }
210
211 pub fn get_inner(&self) -> &Metasrv {
212 &self.metasrv
213 }
214 pub fn bind_addr(&self) -> &Option<SocketAddr> {
215 &self.bind_addr
216 }
217
218 pub fn mut_http_server(&mut self) -> &mut Either<Option<HttpServerBuilder>, HttpServer> {
219 &mut self.http_server
220 }
221
222 pub fn http_server(&self) -> Option<&HttpServer> {
223 self.http_server.as_ref().right()
224 }
225}
226
227pub async fn bootstrap_metasrv_with_router(
228 bind_addr: &str,
229 router: Router,
230 serve_state_tx: oneshot::Sender<Result<()>>,
231 mut shutdown_rx: Receiver<()>,
232) -> Result<SocketAddr> {
233 let listener = TcpListener::bind(bind_addr)
234 .await
235 .context(error::TcpBindSnafu { addr: bind_addr })?;
236
237 let real_bind_addr = listener
238 .local_addr()
239 .context(error::TcpBindSnafu { addr: bind_addr })?;
240
241 info!("gRPC server is bound to: {}", real_bind_addr);
242
243 let incoming = TcpIncoming::from(listener).with_nodelay(Some(true));
244
245 let _handle = common_runtime::spawn_global(async move {
246 let result = router
247 .serve_with_incoming_shutdown(incoming, async {
248 let _ = shutdown_rx.recv().await;
249 })
250 .await
251 .inspect_err(|err| common_telemetry::error!(err;"Failed to start metasrv"))
252 .context(error::StartGrpcSnafu);
253 let _ = serve_state_tx.send(result);
254 });
255
256 Ok(real_bind_addr)
257}
258
259#[macro_export]
260macro_rules! add_compressed_service {
261 ($builder:expr, $server:expr) => {
262 $builder.add_service(
263 $server
264 .accept_compressed(CompressionEncoding::Gzip)
265 .accept_compressed(CompressionEncoding::Zstd)
266 .send_compressed(CompressionEncoding::Gzip)
267 .send_compressed(CompressionEncoding::Zstd),
268 )
269 };
270}
271
272pub fn router(metasrv: Arc<Metasrv>) -> Router {
273 let mut router = tonic::transport::Server::builder().accept_http1(true); let router = add_compressed_service!(router, HeartbeatServer::from_arc(metasrv.clone()));
275 let router = add_compressed_service!(router, StoreServer::from_arc(metasrv.clone()));
276 let router = add_compressed_service!(router, ClusterServer::from_arc(metasrv.clone()));
277 let router = add_compressed_service!(router, ProcedureServiceServer::from_arc(metasrv.clone()));
278 router.add_service(admin::make_admin_service(metasrv))
279}
280
281pub async fn metasrv_builder(
282 opts: &MetasrvOptions,
283 plugins: Plugins,
284 kv_backend: Option<KvBackendRef>,
285) -> Result<MetasrvBuilder> {
286 let (mut kv_backend, election) = match (kv_backend, &opts.backend) {
287 (Some(kv_backend), _) => (kv_backend, None),
288 (None, BackendImpl::MemoryStore) => (Arc::new(MemoryKvBackend::new()) as _, None),
289 (None, BackendImpl::EtcdStore) => {
290 let etcd_client =
291 create_etcd_client_with_tls(&opts.store_addrs, opts.backend_tls.as_ref()).await?;
292 let kv_backend = EtcdStore::with_etcd_client(etcd_client.clone(), opts.max_txn_ops);
293 let election = EtcdElection::with_etcd_client(
294 &opts.grpc.server_addr,
295 etcd_client,
296 opts.store_key_prefix.clone(),
297 )
298 .await?;
299
300 (kv_backend, Some(election))
301 }
302 #[cfg(feature = "pg_kvbackend")]
303 (None, BackendImpl::PostgresStore) => {
304 use std::time::Duration;
305
306 use common_meta::distributed_time_constants::POSTGRES_KEEP_ALIVE_SECS;
307
308 use crate::election::rds::postgres::ElectionPgClient;
309
310 let candidate_lease_ttl = Duration::from_secs(CANDIDATE_LEASE_SECS);
311 let execution_timeout = Duration::from_secs(META_LEASE_SECS);
312 let statement_timeout = Duration::from_secs(META_LEASE_SECS);
313 let idle_session_timeout = Duration::from_secs(META_LEASE_SECS);
314 let meta_lease_ttl = Duration::from_secs(META_LEASE_SECS);
315
316 let mut cfg = Config::new();
317 cfg.keepalives = Some(true);
318 cfg.keepalives_idle = Some(Duration::from_secs(POSTGRES_KEEP_ALIVE_SECS));
319 let pool =
321 create_postgres_pool_with(&opts.store_addrs, cfg, opts.backend_tls.clone()).await?;
322
323 let election_client = ElectionPgClient::new(
324 pool,
325 execution_timeout,
326 idle_session_timeout,
327 statement_timeout,
328 )?;
329 let election = PgElection::with_pg_client(
330 opts.grpc.server_addr.clone(),
331 election_client,
332 opts.store_key_prefix.clone(),
333 candidate_lease_ttl,
334 meta_lease_ttl,
335 opts.meta_schema_name.as_deref(),
336 &opts.meta_table_name,
337 opts.meta_election_lock_id,
338 )
339 .await?;
340
341 let pool = create_postgres_pool(&opts.store_addrs, opts.backend_tls.clone()).await?;
342 let kv_backend = PgStore::with_pg_pool(
343 pool,
344 opts.meta_schema_name.as_deref(),
345 &opts.meta_table_name,
346 opts.max_txn_ops,
347 )
348 .await
349 .context(error::KvBackendSnafu)?;
350
351 (kv_backend, Some(election))
352 }
353 #[cfg(feature = "mysql_kvbackend")]
354 (None, BackendImpl::MysqlStore) => {
355 use std::time::Duration;
356
357 use crate::election::rds::mysql::ElectionMysqlClient;
358
359 let pool = create_mysql_pool(&opts.store_addrs).await?;
360 let kv_backend =
361 MySqlStore::with_mysql_pool(pool, &opts.meta_table_name, opts.max_txn_ops)
362 .await
363 .context(error::KvBackendSnafu)?;
364 let election_table_name = opts.meta_table_name.clone() + "_election";
366 let pool = create_mysql_pool(&opts.store_addrs).await?;
368 let execution_timeout = Duration::from_secs(META_LEASE_SECS);
369 let statement_timeout = Duration::from_secs(META_LEASE_SECS);
370 let idle_session_timeout = Duration::from_secs(META_LEASE_SECS);
371 let innode_lock_wait_timeout = Duration::from_secs(META_LEASE_SECS / 2);
372 let meta_lease_ttl = Duration::from_secs(META_LEASE_SECS);
373 let candidate_lease_ttl = Duration::from_secs(CANDIDATE_LEASE_SECS);
374
375 let election_client = ElectionMysqlClient::new(
376 pool,
377 execution_timeout,
378 statement_timeout,
379 innode_lock_wait_timeout,
380 idle_session_timeout,
381 &election_table_name,
382 );
383 let election = MySqlElection::with_mysql_client(
384 opts.grpc.server_addr.clone(),
385 election_client,
386 opts.store_key_prefix.clone(),
387 candidate_lease_ttl,
388 meta_lease_ttl,
389 &election_table_name,
390 )
391 .await?;
392 (kv_backend, Some(election))
393 }
394 };
395
396 if !opts.store_key_prefix.is_empty() {
397 info!(
398 "using chroot kv backend with prefix: {prefix}",
399 prefix = opts.store_key_prefix
400 );
401 kv_backend = Arc::new(ChrootKvBackend::new(
402 opts.store_key_prefix.clone().into_bytes(),
403 kv_backend,
404 ))
405 }
406
407 let in_memory = Arc::new(MemoryKvBackend::new()) as ResettableKvBackendRef;
408
409 let node_excluder = plugins
410 .get::<NodeExcluderRef>()
411 .unwrap_or_else(|| Arc::new(Vec::new()) as NodeExcluderRef);
412 let selector = if let Some(selector) = plugins.get::<SelectorRef>() {
413 info!("Using selector from plugins");
414 selector
415 } else {
416 let selector = match opts.selector {
417 SelectorType::LoadBased => Arc::new(LoadBasedSelector::new(
418 RegionNumsBasedWeightCompute,
419 node_excluder,
420 )) as SelectorRef,
421 SelectorType::LeaseBased => {
422 Arc::new(LeaseBasedSelector::new(node_excluder)) as SelectorRef
423 }
424 SelectorType::RoundRobin => Arc::new(RoundRobinSelector::new(
425 SelectTarget::Datanode,
426 node_excluder,
427 )) as SelectorRef,
428 };
429 info!(
430 "Using selector from options, selector type: {}",
431 opts.selector.as_ref()
432 );
433 selector
434 };
435
436 Ok(MetasrvBuilder::new()
437 .options(opts.clone())
438 .kv_backend(kv_backend)
439 .in_memory(in_memory)
440 .selector(selector)
441 .election(election)
442 .plugins(plugins))
443}
444
445pub async fn create_etcd_client(store_addrs: &[String]) -> Result<Client> {
446 create_etcd_client_with_tls(store_addrs, None).await
447}
448
449fn build_connection_options(tls_config: Option<&TlsOption>) -> Result<Option<ConnectOptions>> {
450 use std::fs;
451
452 use common_telemetry::debug;
453 use etcd_client::{Certificate, ConnectOptions, Identity, TlsOptions};
454 use servers::tls::TlsMode;
455
456 let Some(tls_config) = tls_config else {
458 return Ok(None);
459 };
460 if matches!(tls_config.mode, TlsMode::Disable) {
462 return Ok(None);
463 }
464 let mut etcd_tls_opts = TlsOptions::new();
465 if !tls_config.ca_cert_path.is_empty() {
467 debug!("Using CA certificate from {}", tls_config.ca_cert_path);
468 let ca_cert_pem = fs::read(&tls_config.ca_cert_path).context(error::FileIoSnafu {
469 path: &tls_config.ca_cert_path,
470 })?;
471 let ca_cert = Certificate::from_pem(ca_cert_pem);
472 etcd_tls_opts = etcd_tls_opts.ca_certificate(ca_cert);
473 }
474 if !tls_config.cert_path.is_empty() && !tls_config.key_path.is_empty() {
476 debug!(
477 "Using client certificate from {} and key from {}",
478 tls_config.cert_path, tls_config.key_path
479 );
480 let cert_pem = fs::read(&tls_config.cert_path).context(error::FileIoSnafu {
481 path: &tls_config.cert_path,
482 })?;
483 let key_pem = fs::read(&tls_config.key_path).context(error::FileIoSnafu {
484 path: &tls_config.key_path,
485 })?;
486 let identity = Identity::from_pem(cert_pem, key_pem);
487 etcd_tls_opts = etcd_tls_opts.identity(identity);
488 }
489 etcd_tls_opts = etcd_tls_opts.with_native_roots();
491 Ok(Some(ConnectOptions::new().with_tls(etcd_tls_opts)))
492}
493
494pub async fn create_etcd_client_with_tls(
495 store_addrs: &[String],
496 tls_config: Option<&TlsOption>,
497) -> Result<Client> {
498 let etcd_endpoints = store_addrs
499 .iter()
500 .map(|x| x.trim())
501 .filter(|x| !x.is_empty())
502 .collect::<Vec<_>>();
503
504 let connect_options = build_connection_options(tls_config)?;
505
506 Client::connect(&etcd_endpoints, connect_options)
507 .await
508 .context(error::ConnectEtcdSnafu)
509}
510
511#[cfg(feature = "pg_kvbackend")]
512fn convert_tls_option(tls_option: &TlsOption) -> PgTlsOption {
514 let mode = match tls_option.mode {
515 servers::tls::TlsMode::Disable => PgTlsMode::Disable,
516 servers::tls::TlsMode::Prefer => PgTlsMode::Prefer,
517 servers::tls::TlsMode::Require => PgTlsMode::Require,
518 servers::tls::TlsMode::VerifyCa => PgTlsMode::VerifyCa,
519 servers::tls::TlsMode::VerifyFull => PgTlsMode::VerifyFull,
520 };
521
522 PgTlsOption {
523 mode,
524 cert_path: tls_option.cert_path.clone(),
525 key_path: tls_option.key_path.clone(),
526 ca_cert_path: tls_option.ca_cert_path.clone(),
527 watch: tls_option.watch,
528 }
529}
530
531#[cfg(feature = "pg_kvbackend")]
532pub async fn create_postgres_pool(
536 store_addrs: &[String],
537 tls_config: Option<TlsOption>,
538) -> Result<deadpool_postgres::Pool> {
539 create_postgres_pool_with(store_addrs, Config::new(), tls_config).await
540}
541
542#[cfg(feature = "pg_kvbackend")]
543pub async fn create_postgres_pool_with(
547 store_addrs: &[String],
548 mut cfg: Config,
549 tls_config: Option<TlsOption>,
550) -> Result<deadpool_postgres::Pool> {
551 let postgres_url = store_addrs.first().context(error::InvalidArgumentsSnafu {
552 err_msg: "empty store addrs",
553 })?;
554 cfg.url = Some(postgres_url.to_string());
555
556 let pool = if let Some(tls_config) = tls_config {
557 let pg_tls_config = convert_tls_option(&tls_config);
558 let tls_connector =
559 create_postgres_tls_connector(&pg_tls_config).map_err(|e| error::Error::Other {
560 source: BoxedError::new(e),
561 location: snafu::Location::new(file!(), line!(), 0),
562 })?;
563 cfg.create_pool(Some(Runtime::Tokio1), tls_connector)
564 .context(error::CreatePostgresPoolSnafu)?
565 } else {
566 cfg.create_pool(Some(Runtime::Tokio1), NoTls)
567 .context(error::CreatePostgresPoolSnafu)?
568 };
569
570 Ok(pool)
571}
572
573#[cfg(feature = "mysql_kvbackend")]
574async fn setup_mysql_options(store_addrs: &[String]) -> Result<MySqlConnectOptions> {
575 let mysql_url = store_addrs.first().context(error::InvalidArgumentsSnafu {
576 err_msg: "empty store addrs",
577 })?;
578 let opts: MySqlConnectOptions = mysql_url
580 .parse()
581 .context(error::ParseMySqlUrlSnafu { mysql_url })?;
582 let opts = opts
583 .no_engine_substitution(false)
584 .pipes_as_concat(false)
585 .timezone(None)
586 .set_names(false);
587 Ok(opts)
588}
589
590#[cfg(feature = "mysql_kvbackend")]
591pub async fn create_mysql_pool(store_addrs: &[String]) -> Result<MySqlPool> {
592 let opts = setup_mysql_options(store_addrs).await?;
593 let pool = MySqlPool::connect_with(opts)
594 .await
595 .context(error::CreateMySqlPoolSnafu)?;
596
597 Ok(pool)
598}
599
600#[cfg(test)]
601mod tests {
602 use servers::tls::TlsMode;
603
604 use super::*;
605
606 #[tokio::test]
607 async fn test_create_etcd_client_tls_without_certs() {
608 let endpoints: Vec<String> = match std::env::var("GT_ETCD_TLS_ENDPOINTS") {
609 Ok(endpoints_str) => endpoints_str
610 .split(',')
611 .map(|s| s.trim().to_string())
612 .collect(),
613 Err(_) => return,
614 };
615
616 let tls_config = TlsOption {
617 mode: TlsMode::Require,
618 ca_cert_path: String::new(),
619 cert_path: String::new(),
620 key_path: String::new(),
621 watch: false,
622 };
623
624 let _client = create_etcd_client_with_tls(&endpoints, Some(&tls_config))
625 .await
626 .unwrap();
627 }
628
629 #[tokio::test]
630 async fn test_create_etcd_client_tls_with_client_certs() {
631 let endpoints: Vec<String> = match std::env::var("GT_ETCD_TLS_ENDPOINTS") {
632 Ok(endpoints_str) => endpoints_str
633 .split(',')
634 .map(|s| s.trim().to_string())
635 .collect(),
636 Err(_) => return,
637 };
638
639 let cert_dir = std::env::current_dir()
640 .unwrap()
641 .join("tests-integration")
642 .join("fixtures")
643 .join("etcd-tls-certs");
644
645 if cert_dir.join("client.crt").exists() && cert_dir.join("client-key.pem").exists() {
646 let tls_config = TlsOption {
647 mode: TlsMode::Require,
648 ca_cert_path: String::new(),
649 cert_path: cert_dir.join("client.crt").to_string_lossy().to_string(),
650 key_path: cert_dir
651 .join("client-key.pem")
652 .to_string_lossy()
653 .to_string(),
654 watch: false,
655 };
656
657 let _client = create_etcd_client_with_tls(&endpoints, Some(&tls_config))
658 .await
659 .unwrap();
660 }
661 }
662
663 #[tokio::test]
664 async fn test_create_etcd_client_tls_with_full_certs() {
665 let endpoints: Vec<String> = match std::env::var("GT_ETCD_TLS_ENDPOINTS") {
666 Ok(endpoints_str) => endpoints_str
667 .split(',')
668 .map(|s| s.trim().to_string())
669 .collect(),
670 Err(_) => return,
671 };
672
673 let cert_dir = std::env::current_dir()
674 .unwrap()
675 .join("tests-integration")
676 .join("fixtures")
677 .join("etcd-tls-certs");
678
679 if cert_dir.join("ca.crt").exists()
680 && cert_dir.join("client.crt").exists()
681 && cert_dir.join("client-key.pem").exists()
682 {
683 let tls_config = TlsOption {
684 mode: TlsMode::Require,
685 ca_cert_path: cert_dir.join("ca.crt").to_string_lossy().to_string(),
686 cert_path: cert_dir.join("client.crt").to_string_lossy().to_string(),
687 key_path: cert_dir
688 .join("client-key.pem")
689 .to_string_lossy()
690 .to_string(),
691 watch: false,
692 };
693
694 let _client = create_etcd_client_with_tls(&endpoints, Some(&tls_config))
695 .await
696 .unwrap();
697 }
698 }
699}