common_meta/kv_backend/
etcd.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::fs;
17use std::sync::Arc;
18
19use common_telemetry::{debug, info};
20use etcd_client::{
21    Certificate, Client, DeleteOptions, GetOptions, Identity, PutOptions, TlsOptions, Txn, TxnOp,
22    TxnOpResponse, TxnResponse,
23};
24use snafu::{ResultExt, ensure};
25
26use crate::error::{self, Error, LoadTlsCertificateSnafu, Result};
27use crate::kv_backend::txn::{Txn as KvTxn, TxnResponse as KvTxnResponse};
28use crate::kv_backend::{KvBackend, KvBackendRef, TxnService};
29use crate::metrics::METRIC_META_TXN_REQUEST;
30use crate::rpc::KeyValue;
31use crate::rpc::store::{
32    BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
33    BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse,
34    RangeRequest, RangeResponse,
35};
36
37const DEFAULT_MAX_DECODING_SIZE: usize = 32 * 1024 * 1024; // 32MB
38
39pub struct EtcdStore {
40    client: Client,
41    // Maximum number of operations permitted in a transaction.
42    // The etcd default configuration's `--max-txn-ops` is 128.
43    //
44    // For more detail, see: https://etcd.io/docs/v3.5/op-guide/configuration/
45    max_txn_ops: usize,
46    // Maximum decoding message size in bytes. Default 32MB.
47    max_decoding_size: usize,
48}
49
50impl EtcdStore {
51    pub async fn with_endpoints<E, S>(endpoints: S, max_txn_ops: usize) -> Result<KvBackendRef>
52    where
53        E: AsRef<str>,
54        S: AsRef<[E]>,
55    {
56        let client = Client::connect(endpoints, None)
57            .await
58            .context(error::ConnectEtcdSnafu)?;
59
60        Ok(Self::with_etcd_client(client, max_txn_ops))
61    }
62
63    pub fn with_etcd_client(client: Client, max_txn_ops: usize) -> KvBackendRef {
64        info!("Connected to etcd");
65        Arc::new(Self {
66            client,
67            max_txn_ops,
68            max_decoding_size: DEFAULT_MAX_DECODING_SIZE,
69        })
70    }
71
72    pub fn set_max_decoding_size(&mut self, max_decoding_size: usize) {
73        self.max_decoding_size = max_decoding_size;
74    }
75
76    fn kv_client(&self) -> etcd_client::KvClient {
77        self.client
78            .kv_client()
79            .max_decoding_message_size(self.max_decoding_size)
80    }
81
82    async fn do_multi_txn(&self, txn_ops: Vec<TxnOp>) -> Result<Vec<TxnResponse>> {
83        let max_txn_ops = self.max_txn_ops();
84        if txn_ops.len() < max_txn_ops {
85            // fast path
86            let _timer = METRIC_META_TXN_REQUEST
87                .with_label_values(&["etcd", "txn"])
88                .start_timer();
89            let txn = Txn::new().and_then(txn_ops);
90            let txn_res = self
91                .kv_client()
92                .txn(txn)
93                .await
94                .context(error::EtcdFailedSnafu)?;
95            return Ok(vec![txn_res]);
96        }
97
98        let txns = txn_ops
99            .chunks(max_txn_ops)
100            .map(|part| async move {
101                let _timer = METRIC_META_TXN_REQUEST
102                    .with_label_values(&["etcd", "txn"])
103                    .start_timer();
104                let txn = Txn::new().and_then(part);
105                self.kv_client().txn(txn).await
106            })
107            .collect::<Vec<_>>();
108
109        futures::future::try_join_all(txns)
110            .await
111            .context(error::EtcdFailedSnafu)
112    }
113}
114
115#[async_trait::async_trait]
116impl KvBackend for EtcdStore {
117    fn name(&self) -> &str {
118        "Etcd"
119    }
120
121    fn as_any(&self) -> &dyn Any {
122        self
123    }
124
125    async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
126        let Get { key, options } = req.try_into()?;
127
128        let mut res = self
129            .kv_client()
130            .get(key, options)
131            .await
132            .context(error::EtcdFailedSnafu)?;
133
134        let kvs = res
135            .take_kvs()
136            .into_iter()
137            .map(KeyValue::from)
138            .collect::<Vec<_>>();
139
140        Ok(RangeResponse {
141            kvs,
142            more: res.more(),
143        })
144    }
145
146    async fn put(&self, req: PutRequest) -> Result<PutResponse> {
147        let Put {
148            key,
149            value,
150            options,
151        } = req.try_into()?;
152
153        let mut res = self
154            .kv_client()
155            .put(key, value, options)
156            .await
157            .context(error::EtcdFailedSnafu)?;
158
159        let prev_kv = res.take_prev_key().map(KeyValue::from);
160        Ok(PutResponse { prev_kv })
161    }
162
163    async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
164        let BatchPut { kvs, options } = req.try_into()?;
165
166        let put_ops = kvs
167            .into_iter()
168            .map(|kv| TxnOp::put(kv.key, kv.value, options.clone()))
169            .collect::<Vec<_>>();
170
171        let txn_responses = self.do_multi_txn(put_ops).await?;
172
173        let mut prev_kvs = vec![];
174        for txn_res in txn_responses {
175            for op_res in txn_res.op_responses() {
176                match op_res {
177                    TxnOpResponse::Put(mut put_res) => {
178                        if let Some(prev_kv) = put_res.take_prev_key().map(KeyValue::from) {
179                            prev_kvs.push(prev_kv);
180                        }
181                    }
182                    _ => unreachable!(),
183                }
184            }
185        }
186
187        Ok(BatchPutResponse { prev_kvs })
188    }
189
190    async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
191        let BatchGet { keys, options } = req.try_into()?;
192
193        let get_ops: Vec<_> = keys
194            .into_iter()
195            .map(|key| TxnOp::get(key, options.clone()))
196            .collect();
197
198        let txn_responses = self.do_multi_txn(get_ops).await?;
199
200        let mut kvs = vec![];
201        for txn_res in txn_responses {
202            for op_res in txn_res.op_responses() {
203                let mut get_res = match op_res {
204                    TxnOpResponse::Get(get_res) => get_res,
205                    _ => unreachable!(),
206                };
207                kvs.extend(get_res.take_kvs().into_iter().map(KeyValue::from));
208            }
209        }
210
211        Ok(BatchGetResponse { kvs })
212    }
213
214    async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
215        let Delete { key, options } = req.try_into()?;
216
217        let mut res = self
218            .kv_client()
219            .delete(key, options)
220            .await
221            .context(error::EtcdFailedSnafu)?;
222
223        let prev_kvs = res
224            .take_prev_kvs()
225            .into_iter()
226            .map(KeyValue::from)
227            .collect::<Vec<_>>();
228
229        Ok(DeleteRangeResponse {
230            deleted: res.deleted(),
231            prev_kvs,
232        })
233    }
234
235    async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
236        let BatchDelete { keys, options } = req.try_into()?;
237
238        let mut prev_kvs = Vec::with_capacity(keys.len());
239
240        let delete_ops = keys
241            .into_iter()
242            .map(|key| TxnOp::delete(key, options.clone()))
243            .collect::<Vec<_>>();
244
245        let txn_responses = self.do_multi_txn(delete_ops).await?;
246
247        for txn_res in txn_responses {
248            for op_res in txn_res.op_responses() {
249                match op_res {
250                    TxnOpResponse::Delete(mut delete_res) => {
251                        delete_res
252                            .take_prev_kvs()
253                            .into_iter()
254                            .map(KeyValue::from)
255                            .for_each(|kv| {
256                                prev_kvs.push(kv);
257                            });
258                    }
259                    _ => unreachable!(),
260                }
261            }
262        }
263
264        Ok(BatchDeleteResponse { prev_kvs })
265    }
266}
267
268#[async_trait::async_trait]
269impl TxnService for EtcdStore {
270    type Error = Error;
271
272    async fn txn(&self, txn: KvTxn) -> Result<KvTxnResponse> {
273        let _timer = METRIC_META_TXN_REQUEST
274            .with_label_values(&["etcd", "txn"])
275            .start_timer();
276
277        let max_operations = txn.max_operations();
278
279        let etcd_txn: Txn = txn.into();
280        let txn_res = self
281            .kv_client()
282            .txn(etcd_txn)
283            .await
284            .context(error::EtcdTxnFailedSnafu { max_operations })?;
285        txn_res.try_into()
286    }
287
288    fn max_txn_ops(&self) -> usize {
289        self.max_txn_ops
290    }
291}
292
293struct Get {
294    key: Vec<u8>,
295    options: Option<GetOptions>,
296}
297
298impl TryFrom<RangeRequest> for Get {
299    type Error = Error;
300
301    fn try_from(req: RangeRequest) -> Result<Self> {
302        let RangeRequest {
303            key,
304            range_end,
305            limit,
306            keys_only,
307        } = req;
308
309        ensure!(!key.is_empty(), error::EmptyKeySnafu);
310
311        let mut options = GetOptions::default();
312        if !range_end.is_empty() {
313            options = options.with_range(range_end);
314            if limit > 0 {
315                options = options.with_limit(limit);
316            }
317        }
318        if keys_only {
319            options = options.with_keys_only();
320        }
321
322        Ok(Get {
323            key,
324            options: Some(options),
325        })
326    }
327}
328
329struct Put {
330    key: Vec<u8>,
331    value: Vec<u8>,
332    options: Option<PutOptions>,
333}
334
335impl TryFrom<PutRequest> for Put {
336    type Error = Error;
337
338    fn try_from(req: PutRequest) -> Result<Self> {
339        let PutRequest {
340            key,
341            value,
342            prev_kv,
343        } = req;
344
345        let mut options = PutOptions::default();
346        if prev_kv {
347            options = options.with_prev_key();
348        }
349
350        Ok(Put {
351            key,
352            value,
353            options: Some(options),
354        })
355    }
356}
357
358struct BatchGet {
359    keys: Vec<Vec<u8>>,
360    options: Option<GetOptions>,
361}
362
363impl TryFrom<BatchGetRequest> for BatchGet {
364    type Error = Error;
365
366    fn try_from(req: BatchGetRequest) -> Result<Self> {
367        let BatchGetRequest { keys } = req;
368
369        let options = GetOptions::default();
370
371        Ok(BatchGet {
372            keys,
373            options: Some(options),
374        })
375    }
376}
377
378struct BatchPut {
379    kvs: Vec<KeyValue>,
380    options: Option<PutOptions>,
381}
382
383impl TryFrom<BatchPutRequest> for BatchPut {
384    type Error = Error;
385
386    fn try_from(req: BatchPutRequest) -> Result<Self> {
387        let BatchPutRequest { kvs, prev_kv } = req;
388
389        let mut options = PutOptions::default();
390        if prev_kv {
391            options = options.with_prev_key();
392        }
393
394        Ok(BatchPut {
395            kvs,
396            options: Some(options),
397        })
398    }
399}
400
401struct BatchDelete {
402    keys: Vec<Vec<u8>>,
403    options: Option<DeleteOptions>,
404}
405
406impl TryFrom<BatchDeleteRequest> for BatchDelete {
407    type Error = Error;
408
409    fn try_from(req: BatchDeleteRequest) -> Result<Self> {
410        let BatchDeleteRequest { keys, prev_kv } = req;
411
412        let mut options = DeleteOptions::default();
413        if prev_kv {
414            options = options.with_prev_key();
415        }
416
417        Ok(BatchDelete {
418            keys,
419            options: Some(options),
420        })
421    }
422}
423
424struct Delete {
425    key: Vec<u8>,
426    options: Option<DeleteOptions>,
427}
428
429impl TryFrom<DeleteRangeRequest> for Delete {
430    type Error = Error;
431
432    fn try_from(req: DeleteRangeRequest) -> Result<Self> {
433        let DeleteRangeRequest {
434            key,
435            range_end,
436            prev_kv,
437        } = req;
438
439        ensure!(!key.is_empty(), error::EmptyKeySnafu);
440
441        let mut options = DeleteOptions::default();
442        if !range_end.is_empty() {
443            options = options.with_range(range_end);
444        }
445        if prev_kv {
446            options = options.with_prev_key();
447        }
448
449        Ok(Delete {
450            key,
451            options: Some(options),
452        })
453    }
454}
455
456#[derive(Debug, Clone, PartialEq, Eq, Default)]
457pub enum TlsMode {
458    #[default]
459    Disable,
460    Require,
461}
462
463/// TLS configuration for Etcd connections.
464#[derive(Debug, Clone, PartialEq, Eq)]
465pub struct TlsOption {
466    pub mode: TlsMode,
467    pub cert_path: String,
468    pub key_path: String,
469    pub ca_cert_path: String,
470}
471
472/// Creates a Etcd [`TlsOptions`] from a [`TlsOption`].
473///
474/// This function builds the TLS options for etcd client connections based on the provided
475/// [`TlsOption`]. It supports disabling TLS, setting a custom CA certificate, and configuring
476/// client identity for mutual TLS authentication.
477///
478/// Note: All TlsMode variants except [`TlsMode::Disable`] will be treated as enabling TLS.
479pub fn create_etcd_tls_options(tls_config: &TlsOption) -> Result<Option<TlsOptions>> {
480    // If TLS mode is disabled, return None to indicate no TLS configuration.
481    if matches!(tls_config.mode, TlsMode::Disable) {
482        return Ok(None);
483    }
484
485    info!("Creating etcd TLS with mode: {:?}", tls_config.mode);
486    // Start with default TLS options.
487    let mut etcd_tls_opts = TlsOptions::new();
488
489    // If a CA certificate path is provided, load the CA certificate and add it to the options.
490    if !tls_config.ca_cert_path.is_empty() {
491        debug!("Using CA certificate from {}", tls_config.ca_cert_path);
492        let ca_cert_pem = fs::read(&tls_config.ca_cert_path).context(LoadTlsCertificateSnafu {
493            path: &tls_config.ca_cert_path,
494        })?;
495        let ca_cert = Certificate::from_pem(ca_cert_pem);
496        etcd_tls_opts = etcd_tls_opts.ca_certificate(ca_cert);
497    }
498
499    // If both client certificate and key paths are provided, load them and set the client identity.
500    if !tls_config.cert_path.is_empty() && !tls_config.key_path.is_empty() {
501        info!("Loading client certificate for mutual TLS");
502        debug!(
503            "Using client certificate from {} and key from {}",
504            tls_config.cert_path, tls_config.key_path
505        );
506        let cert_pem = fs::read(&tls_config.cert_path).context(LoadTlsCertificateSnafu {
507            path: &tls_config.cert_path,
508        })?;
509        let key_pem = fs::read(&tls_config.key_path).context(LoadTlsCertificateSnafu {
510            path: &tls_config.key_path,
511        })?;
512        let identity = Identity::from_pem(cert_pem, key_pem);
513        etcd_tls_opts = etcd_tls_opts.identity(identity);
514    }
515
516    // Always enable native TLS roots for additional trust anchors.
517    etcd_tls_opts = etcd_tls_opts.with_native_roots();
518
519    Ok(Some(etcd_tls_opts))
520}
521
522#[cfg(test)]
523mod tests {
524    use etcd_client::ConnectOptions;
525
526    use super::*;
527
528    #[test]
529    fn test_parse_get() {
530        let req = RangeRequest {
531            key: b"test_key".to_vec(),
532            range_end: b"test_range_end".to_vec(),
533            limit: 64,
534            keys_only: true,
535        };
536
537        let get: Get = req.try_into().unwrap();
538
539        assert_eq!(b"test_key".to_vec(), get.key);
540        let _ = get.options.unwrap();
541    }
542
543    #[test]
544    fn test_parse_put() {
545        let req = PutRequest {
546            key: b"test_key".to_vec(),
547            value: b"test_value".to_vec(),
548            prev_kv: true,
549        };
550
551        let put: Put = req.try_into().unwrap();
552
553        assert_eq!(b"test_key".to_vec(), put.key);
554        assert_eq!(b"test_value".to_vec(), put.value);
555        let _ = put.options.unwrap();
556    }
557
558    #[test]
559    fn test_parse_batch_get() {
560        let req = BatchGetRequest {
561            keys: vec![b"k1".to_vec(), b"k2".to_vec(), b"k3".to_vec()],
562        };
563
564        let batch_get: BatchGet = req.try_into().unwrap();
565        let keys = batch_get.keys;
566
567        assert_eq!(b"k1".to_vec(), keys.first().unwrap().clone());
568        assert_eq!(b"k2".to_vec(), keys.get(1).unwrap().clone());
569        assert_eq!(b"k3".to_vec(), keys.get(2).unwrap().clone());
570    }
571
572    #[test]
573    fn test_parse_batch_put() {
574        let req = BatchPutRequest {
575            kvs: vec![KeyValue {
576                key: b"test_key".to_vec(),
577                value: b"test_value".to_vec(),
578            }],
579            prev_kv: true,
580        };
581
582        let batch_put: BatchPut = req.try_into().unwrap();
583
584        let kv = batch_put.kvs.first().unwrap();
585        assert_eq!(b"test_key", kv.key());
586        assert_eq!(b"test_value", kv.value());
587        let _ = batch_put.options.unwrap();
588    }
589
590    #[test]
591    fn test_parse_batch_delete() {
592        let req = BatchDeleteRequest {
593            keys: vec![b"k1".to_vec(), b"k2".to_vec(), b"k3".to_vec()],
594            prev_kv: true,
595        };
596
597        let batch_delete: BatchDelete = req.try_into().unwrap();
598
599        assert_eq!(batch_delete.keys.len(), 3);
600        assert_eq!(b"k1".to_vec(), batch_delete.keys.first().unwrap().clone());
601        assert_eq!(b"k2".to_vec(), batch_delete.keys.get(1).unwrap().clone());
602        assert_eq!(b"k3".to_vec(), batch_delete.keys.get(2).unwrap().clone());
603        let _ = batch_delete.options.unwrap();
604    }
605
606    #[test]
607    fn test_parse_delete() {
608        let req = DeleteRangeRequest {
609            key: b"test_key".to_vec(),
610            range_end: b"test_range_end".to_vec(),
611            prev_kv: true,
612        };
613
614        let delete: Delete = req.try_into().unwrap();
615
616        assert_eq!(b"test_key".to_vec(), delete.key);
617        let _ = delete.options.unwrap();
618    }
619
620    use crate::kv_backend::test::{
621        prepare_kv_with_prefix, test_kv_batch_delete_with_prefix, test_kv_batch_get_with_prefix,
622        test_kv_compare_and_put_with_prefix, test_kv_delete_range_with_prefix,
623        test_kv_put_with_prefix, test_kv_range_2_with_prefix, test_kv_range_with_prefix,
624        test_txn_compare_equal, test_txn_compare_greater, test_txn_compare_less,
625        test_txn_compare_not_equal, test_txn_one_compare_op, text_txn_multi_compare_op,
626        unprepare_kv,
627    };
628    use crate::maybe_skip_etcd_tls_integration_test;
629    use crate::test_util::etcd_certs_dir;
630
631    async fn build_kv_backend() -> Option<EtcdStore> {
632        let endpoints = std::env::var("GT_ETCD_ENDPOINTS").unwrap_or_default();
633        if endpoints.is_empty() {
634            return None;
635        }
636
637        let endpoints = endpoints
638            .split(',')
639            .map(|s| s.to_string())
640            .collect::<Vec<String>>();
641
642        let client = Client::connect(endpoints, None)
643            .await
644            .expect("malformed endpoints");
645
646        Some(EtcdStore {
647            client,
648            max_txn_ops: 128,
649            max_decoding_size: DEFAULT_MAX_DECODING_SIZE,
650        })
651    }
652
653    #[tokio::test]
654    async fn test_put() {
655        if let Some(kv_backend) = build_kv_backend().await {
656            let prefix = b"put/";
657            prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
658            test_kv_put_with_prefix(&kv_backend, prefix.to_vec()).await;
659            unprepare_kv(&kv_backend, prefix).await;
660        }
661    }
662
663    #[tokio::test]
664    async fn test_range() {
665        if let Some(kv_backend) = build_kv_backend().await {
666            let prefix = b"range/";
667            prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
668            test_kv_range_with_prefix(&kv_backend, prefix.to_vec()).await;
669            unprepare_kv(&kv_backend, prefix).await;
670        }
671    }
672
673    #[tokio::test]
674    async fn test_range_2() {
675        if let Some(kv_backend) = build_kv_backend().await {
676            test_kv_range_2_with_prefix(&kv_backend, b"range2/".to_vec()).await;
677        }
678    }
679
680    #[tokio::test]
681    async fn test_batch_get() {
682        if let Some(kv_backend) = build_kv_backend().await {
683            let prefix = b"batchGet/";
684            prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
685            test_kv_batch_get_with_prefix(&kv_backend, prefix.to_vec()).await;
686            unprepare_kv(&kv_backend, prefix).await;
687        }
688    }
689
690    #[tokio::test(flavor = "multi_thread")]
691    async fn test_compare_and_put() {
692        if let Some(kv_backend) = build_kv_backend().await {
693            let kv_backend = Arc::new(kv_backend);
694            test_kv_compare_and_put_with_prefix(kv_backend, b"compareAndPut/".to_vec()).await;
695        }
696    }
697
698    #[tokio::test]
699    async fn test_delete_range() {
700        if let Some(kv_backend) = build_kv_backend().await {
701            let prefix = b"deleteRange/";
702            prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
703            test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await;
704            unprepare_kv(&kv_backend, prefix).await;
705        }
706    }
707
708    #[tokio::test]
709    async fn test_batch_delete() {
710        if let Some(kv_backend) = build_kv_backend().await {
711            let prefix = b"batchDelete/";
712            prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
713            test_kv_batch_delete_with_prefix(&kv_backend, prefix.to_vec()).await;
714            unprepare_kv(&kv_backend, prefix).await;
715        }
716    }
717
718    #[tokio::test]
719    async fn test_etcd_txn() {
720        if let Some(kv_backend) = build_kv_backend().await {
721            test_txn_one_compare_op(&kv_backend).await;
722            text_txn_multi_compare_op(&kv_backend).await;
723            test_txn_compare_equal(&kv_backend).await;
724            test_txn_compare_greater(&kv_backend).await;
725            test_txn_compare_less(&kv_backend).await;
726            test_txn_compare_not_equal(&kv_backend).await;
727        }
728    }
729
730    async fn create_etcd_client_with_tls(endpoints: &[String], tls_config: &TlsOption) -> Client {
731        let endpoints = endpoints
732            .iter()
733            .map(|s| s.trim())
734            .filter(|s| !s.is_empty())
735            .collect::<Vec<_>>();
736        let connect_options =
737            ConnectOptions::new().with_tls(create_etcd_tls_options(tls_config).unwrap().unwrap());
738
739        Client::connect(&endpoints, Some(connect_options))
740            .await
741            .unwrap()
742    }
743
744    #[tokio::test]
745    async fn test_create_etcd_client_with_mtls_and_ca() {
746        maybe_skip_etcd_tls_integration_test!();
747        let endpoints = std::env::var("GT_ETCD_TLS_ENDPOINTS")
748            .unwrap()
749            .split(',')
750            .map(|s| s.to_string())
751            .collect::<Vec<_>>();
752
753        let cert_dir = etcd_certs_dir();
754        let tls_config = TlsOption {
755            mode: TlsMode::Require,
756            ca_cert_path: cert_dir.join("ca.crt").to_string_lossy().to_string(),
757            cert_path: cert_dir.join("client.crt").to_string_lossy().to_string(),
758            key_path: cert_dir
759                .join("client-key.pem")
760                .to_string_lossy()
761                .to_string(),
762        };
763        let mut client = create_etcd_client_with_tls(&endpoints, &tls_config).await;
764        let _ = client.get(b"hello", None).await.unwrap();
765    }
766}