1use std::any::Any;
16use std::fs;
17use std::sync::Arc;
18
19use common_telemetry::{debug, info};
20use etcd_client::{
21 Certificate, Client, DeleteOptions, GetOptions, Identity, PutOptions, TlsOptions, Txn, TxnOp,
22 TxnOpResponse, TxnResponse,
23};
24use snafu::{ResultExt, ensure};
25
26use crate::error::{self, Error, LoadTlsCertificateSnafu, Result};
27use crate::kv_backend::txn::{Txn as KvTxn, TxnResponse as KvTxnResponse};
28use crate::kv_backend::{KvBackend, KvBackendRef, TxnService};
29use crate::metrics::METRIC_META_TXN_REQUEST;
30use crate::rpc::KeyValue;
31use crate::rpc::store::{
32 BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
33 BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse,
34 RangeRequest, RangeResponse,
35};
36
37const DEFAULT_MAX_DECODING_SIZE: usize = 32 * 1024 * 1024; pub struct EtcdStore {
40 client: Client,
41 max_txn_ops: usize,
46 max_decoding_size: usize,
48}
49
50impl EtcdStore {
51 pub async fn with_endpoints<E, S>(endpoints: S, max_txn_ops: usize) -> Result<KvBackendRef>
52 where
53 E: AsRef<str>,
54 S: AsRef<[E]>,
55 {
56 let client = Client::connect(endpoints, None)
57 .await
58 .context(error::ConnectEtcdSnafu)?;
59
60 Ok(Self::with_etcd_client(client, max_txn_ops))
61 }
62
63 pub fn with_etcd_client(client: Client, max_txn_ops: usize) -> KvBackendRef {
64 info!("Connected to etcd");
65 Arc::new(Self {
66 client,
67 max_txn_ops,
68 max_decoding_size: DEFAULT_MAX_DECODING_SIZE,
69 })
70 }
71
72 pub fn set_max_decoding_size(&mut self, max_decoding_size: usize) {
73 self.max_decoding_size = max_decoding_size;
74 }
75
76 fn kv_client(&self) -> etcd_client::KvClient {
77 self.client
78 .kv_client()
79 .max_decoding_message_size(self.max_decoding_size)
80 }
81
82 async fn do_multi_txn(&self, txn_ops: Vec<TxnOp>) -> Result<Vec<TxnResponse>> {
83 let max_txn_ops = self.max_txn_ops();
84 if txn_ops.len() < max_txn_ops {
85 let _timer = METRIC_META_TXN_REQUEST
87 .with_label_values(&["etcd", "txn"])
88 .start_timer();
89 let txn = Txn::new().and_then(txn_ops);
90 let txn_res = self
91 .kv_client()
92 .txn(txn)
93 .await
94 .context(error::EtcdFailedSnafu)?;
95 return Ok(vec![txn_res]);
96 }
97
98 let txns = txn_ops
99 .chunks(max_txn_ops)
100 .map(|part| async move {
101 let _timer = METRIC_META_TXN_REQUEST
102 .with_label_values(&["etcd", "txn"])
103 .start_timer();
104 let txn = Txn::new().and_then(part);
105 self.kv_client().txn(txn).await
106 })
107 .collect::<Vec<_>>();
108
109 futures::future::try_join_all(txns)
110 .await
111 .context(error::EtcdFailedSnafu)
112 }
113}
114
115#[async_trait::async_trait]
116impl KvBackend for EtcdStore {
117 fn name(&self) -> &str {
118 "Etcd"
119 }
120
121 fn as_any(&self) -> &dyn Any {
122 self
123 }
124
125 async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
126 let Get { key, options } = req.try_into()?;
127
128 let mut res = self
129 .kv_client()
130 .get(key, options)
131 .await
132 .context(error::EtcdFailedSnafu)?;
133
134 let kvs = res
135 .take_kvs()
136 .into_iter()
137 .map(KeyValue::from)
138 .collect::<Vec<_>>();
139
140 Ok(RangeResponse {
141 kvs,
142 more: res.more(),
143 })
144 }
145
146 async fn put(&self, req: PutRequest) -> Result<PutResponse> {
147 let Put {
148 key,
149 value,
150 options,
151 } = req.try_into()?;
152
153 let mut res = self
154 .kv_client()
155 .put(key, value, options)
156 .await
157 .context(error::EtcdFailedSnafu)?;
158
159 let prev_kv = res.take_prev_key().map(KeyValue::from);
160 Ok(PutResponse { prev_kv })
161 }
162
163 async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
164 let BatchPut { kvs, options } = req.try_into()?;
165
166 let put_ops = kvs
167 .into_iter()
168 .map(|kv| TxnOp::put(kv.key, kv.value, options.clone()))
169 .collect::<Vec<_>>();
170
171 let txn_responses = self.do_multi_txn(put_ops).await?;
172
173 let mut prev_kvs = vec![];
174 for txn_res in txn_responses {
175 for op_res in txn_res.op_responses() {
176 match op_res {
177 TxnOpResponse::Put(mut put_res) => {
178 if let Some(prev_kv) = put_res.take_prev_key().map(KeyValue::from) {
179 prev_kvs.push(prev_kv);
180 }
181 }
182 _ => unreachable!(),
183 }
184 }
185 }
186
187 Ok(BatchPutResponse { prev_kvs })
188 }
189
190 async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
191 let BatchGet { keys, options } = req.try_into()?;
192
193 let get_ops: Vec<_> = keys
194 .into_iter()
195 .map(|key| TxnOp::get(key, options.clone()))
196 .collect();
197
198 let txn_responses = self.do_multi_txn(get_ops).await?;
199
200 let mut kvs = vec![];
201 for txn_res in txn_responses {
202 for op_res in txn_res.op_responses() {
203 let mut get_res = match op_res {
204 TxnOpResponse::Get(get_res) => get_res,
205 _ => unreachable!(),
206 };
207 kvs.extend(get_res.take_kvs().into_iter().map(KeyValue::from));
208 }
209 }
210
211 Ok(BatchGetResponse { kvs })
212 }
213
214 async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
215 let Delete { key, options } = req.try_into()?;
216
217 let mut res = self
218 .kv_client()
219 .delete(key, options)
220 .await
221 .context(error::EtcdFailedSnafu)?;
222
223 let prev_kvs = res
224 .take_prev_kvs()
225 .into_iter()
226 .map(KeyValue::from)
227 .collect::<Vec<_>>();
228
229 Ok(DeleteRangeResponse {
230 deleted: res.deleted(),
231 prev_kvs,
232 })
233 }
234
235 async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
236 let BatchDelete { keys, options } = req.try_into()?;
237
238 let mut prev_kvs = Vec::with_capacity(keys.len());
239
240 let delete_ops = keys
241 .into_iter()
242 .map(|key| TxnOp::delete(key, options.clone()))
243 .collect::<Vec<_>>();
244
245 let txn_responses = self.do_multi_txn(delete_ops).await?;
246
247 for txn_res in txn_responses {
248 for op_res in txn_res.op_responses() {
249 match op_res {
250 TxnOpResponse::Delete(mut delete_res) => {
251 delete_res
252 .take_prev_kvs()
253 .into_iter()
254 .map(KeyValue::from)
255 .for_each(|kv| {
256 prev_kvs.push(kv);
257 });
258 }
259 _ => unreachable!(),
260 }
261 }
262 }
263
264 Ok(BatchDeleteResponse { prev_kvs })
265 }
266}
267
268#[async_trait::async_trait]
269impl TxnService for EtcdStore {
270 type Error = Error;
271
272 async fn txn(&self, txn: KvTxn) -> Result<KvTxnResponse> {
273 let _timer = METRIC_META_TXN_REQUEST
274 .with_label_values(&["etcd", "txn"])
275 .start_timer();
276
277 let max_operations = txn.max_operations();
278
279 let etcd_txn: Txn = txn.into();
280 let txn_res = self
281 .kv_client()
282 .txn(etcd_txn)
283 .await
284 .context(error::EtcdTxnFailedSnafu { max_operations })?;
285 txn_res.try_into()
286 }
287
288 fn max_txn_ops(&self) -> usize {
289 self.max_txn_ops
290 }
291}
292
293struct Get {
294 key: Vec<u8>,
295 options: Option<GetOptions>,
296}
297
298impl TryFrom<RangeRequest> for Get {
299 type Error = Error;
300
301 fn try_from(req: RangeRequest) -> Result<Self> {
302 let RangeRequest {
303 key,
304 range_end,
305 limit,
306 keys_only,
307 } = req;
308
309 ensure!(!key.is_empty(), error::EmptyKeySnafu);
310
311 let mut options = GetOptions::default();
312 if !range_end.is_empty() {
313 options = options.with_range(range_end);
314 if limit > 0 {
315 options = options.with_limit(limit);
316 }
317 }
318 if keys_only {
319 options = options.with_keys_only();
320 }
321
322 Ok(Get {
323 key,
324 options: Some(options),
325 })
326 }
327}
328
329struct Put {
330 key: Vec<u8>,
331 value: Vec<u8>,
332 options: Option<PutOptions>,
333}
334
335impl TryFrom<PutRequest> for Put {
336 type Error = Error;
337
338 fn try_from(req: PutRequest) -> Result<Self> {
339 let PutRequest {
340 key,
341 value,
342 prev_kv,
343 } = req;
344
345 let mut options = PutOptions::default();
346 if prev_kv {
347 options = options.with_prev_key();
348 }
349
350 Ok(Put {
351 key,
352 value,
353 options: Some(options),
354 })
355 }
356}
357
358struct BatchGet {
359 keys: Vec<Vec<u8>>,
360 options: Option<GetOptions>,
361}
362
363impl TryFrom<BatchGetRequest> for BatchGet {
364 type Error = Error;
365
366 fn try_from(req: BatchGetRequest) -> Result<Self> {
367 let BatchGetRequest { keys } = req;
368
369 let options = GetOptions::default();
370
371 Ok(BatchGet {
372 keys,
373 options: Some(options),
374 })
375 }
376}
377
378struct BatchPut {
379 kvs: Vec<KeyValue>,
380 options: Option<PutOptions>,
381}
382
383impl TryFrom<BatchPutRequest> for BatchPut {
384 type Error = Error;
385
386 fn try_from(req: BatchPutRequest) -> Result<Self> {
387 let BatchPutRequest { kvs, prev_kv } = req;
388
389 let mut options = PutOptions::default();
390 if prev_kv {
391 options = options.with_prev_key();
392 }
393
394 Ok(BatchPut {
395 kvs,
396 options: Some(options),
397 })
398 }
399}
400
401struct BatchDelete {
402 keys: Vec<Vec<u8>>,
403 options: Option<DeleteOptions>,
404}
405
406impl TryFrom<BatchDeleteRequest> for BatchDelete {
407 type Error = Error;
408
409 fn try_from(req: BatchDeleteRequest) -> Result<Self> {
410 let BatchDeleteRequest { keys, prev_kv } = req;
411
412 let mut options = DeleteOptions::default();
413 if prev_kv {
414 options = options.with_prev_key();
415 }
416
417 Ok(BatchDelete {
418 keys,
419 options: Some(options),
420 })
421 }
422}
423
424struct Delete {
425 key: Vec<u8>,
426 options: Option<DeleteOptions>,
427}
428
429impl TryFrom<DeleteRangeRequest> for Delete {
430 type Error = Error;
431
432 fn try_from(req: DeleteRangeRequest) -> Result<Self> {
433 let DeleteRangeRequest {
434 key,
435 range_end,
436 prev_kv,
437 } = req;
438
439 ensure!(!key.is_empty(), error::EmptyKeySnafu);
440
441 let mut options = DeleteOptions::default();
442 if !range_end.is_empty() {
443 options = options.with_range(range_end);
444 }
445 if prev_kv {
446 options = options.with_prev_key();
447 }
448
449 Ok(Delete {
450 key,
451 options: Some(options),
452 })
453 }
454}
455
456#[derive(Debug, Clone, PartialEq, Eq, Default)]
457pub enum TlsMode {
458 #[default]
459 Disable,
460 Require,
461}
462
463#[derive(Debug, Clone, PartialEq, Eq)]
465pub struct TlsOption {
466 pub mode: TlsMode,
467 pub cert_path: String,
468 pub key_path: String,
469 pub ca_cert_path: String,
470}
471
472pub fn create_etcd_tls_options(tls_config: &TlsOption) -> Result<Option<TlsOptions>> {
480 if matches!(tls_config.mode, TlsMode::Disable) {
482 return Ok(None);
483 }
484
485 info!("Creating etcd TLS with mode: {:?}", tls_config.mode);
486 let mut etcd_tls_opts = TlsOptions::new();
488
489 if !tls_config.ca_cert_path.is_empty() {
491 debug!("Using CA certificate from {}", tls_config.ca_cert_path);
492 let ca_cert_pem = fs::read(&tls_config.ca_cert_path).context(LoadTlsCertificateSnafu {
493 path: &tls_config.ca_cert_path,
494 })?;
495 let ca_cert = Certificate::from_pem(ca_cert_pem);
496 etcd_tls_opts = etcd_tls_opts.ca_certificate(ca_cert);
497 }
498
499 if !tls_config.cert_path.is_empty() && !tls_config.key_path.is_empty() {
501 info!("Loading client certificate for mutual TLS");
502 debug!(
503 "Using client certificate from {} and key from {}",
504 tls_config.cert_path, tls_config.key_path
505 );
506 let cert_pem = fs::read(&tls_config.cert_path).context(LoadTlsCertificateSnafu {
507 path: &tls_config.cert_path,
508 })?;
509 let key_pem = fs::read(&tls_config.key_path).context(LoadTlsCertificateSnafu {
510 path: &tls_config.key_path,
511 })?;
512 let identity = Identity::from_pem(cert_pem, key_pem);
513 etcd_tls_opts = etcd_tls_opts.identity(identity);
514 }
515
516 etcd_tls_opts = etcd_tls_opts.with_native_roots();
518
519 Ok(Some(etcd_tls_opts))
520}
521
522#[cfg(test)]
523mod tests {
524 use etcd_client::ConnectOptions;
525
526 use super::*;
527
528 #[test]
529 fn test_parse_get() {
530 let req = RangeRequest {
531 key: b"test_key".to_vec(),
532 range_end: b"test_range_end".to_vec(),
533 limit: 64,
534 keys_only: true,
535 };
536
537 let get: Get = req.try_into().unwrap();
538
539 assert_eq!(b"test_key".to_vec(), get.key);
540 let _ = get.options.unwrap();
541 }
542
543 #[test]
544 fn test_parse_put() {
545 let req = PutRequest {
546 key: b"test_key".to_vec(),
547 value: b"test_value".to_vec(),
548 prev_kv: true,
549 };
550
551 let put: Put = req.try_into().unwrap();
552
553 assert_eq!(b"test_key".to_vec(), put.key);
554 assert_eq!(b"test_value".to_vec(), put.value);
555 let _ = put.options.unwrap();
556 }
557
558 #[test]
559 fn test_parse_batch_get() {
560 let req = BatchGetRequest {
561 keys: vec![b"k1".to_vec(), b"k2".to_vec(), b"k3".to_vec()],
562 };
563
564 let batch_get: BatchGet = req.try_into().unwrap();
565 let keys = batch_get.keys;
566
567 assert_eq!(b"k1".to_vec(), keys.first().unwrap().clone());
568 assert_eq!(b"k2".to_vec(), keys.get(1).unwrap().clone());
569 assert_eq!(b"k3".to_vec(), keys.get(2).unwrap().clone());
570 }
571
572 #[test]
573 fn test_parse_batch_put() {
574 let req = BatchPutRequest {
575 kvs: vec![KeyValue {
576 key: b"test_key".to_vec(),
577 value: b"test_value".to_vec(),
578 }],
579 prev_kv: true,
580 };
581
582 let batch_put: BatchPut = req.try_into().unwrap();
583
584 let kv = batch_put.kvs.first().unwrap();
585 assert_eq!(b"test_key", kv.key());
586 assert_eq!(b"test_value", kv.value());
587 let _ = batch_put.options.unwrap();
588 }
589
590 #[test]
591 fn test_parse_batch_delete() {
592 let req = BatchDeleteRequest {
593 keys: vec![b"k1".to_vec(), b"k2".to_vec(), b"k3".to_vec()],
594 prev_kv: true,
595 };
596
597 let batch_delete: BatchDelete = req.try_into().unwrap();
598
599 assert_eq!(batch_delete.keys.len(), 3);
600 assert_eq!(b"k1".to_vec(), batch_delete.keys.first().unwrap().clone());
601 assert_eq!(b"k2".to_vec(), batch_delete.keys.get(1).unwrap().clone());
602 assert_eq!(b"k3".to_vec(), batch_delete.keys.get(2).unwrap().clone());
603 let _ = batch_delete.options.unwrap();
604 }
605
606 #[test]
607 fn test_parse_delete() {
608 let req = DeleteRangeRequest {
609 key: b"test_key".to_vec(),
610 range_end: b"test_range_end".to_vec(),
611 prev_kv: true,
612 };
613
614 let delete: Delete = req.try_into().unwrap();
615
616 assert_eq!(b"test_key".to_vec(), delete.key);
617 let _ = delete.options.unwrap();
618 }
619
620 use crate::kv_backend::test::{
621 prepare_kv_with_prefix, test_kv_batch_delete_with_prefix, test_kv_batch_get_with_prefix,
622 test_kv_compare_and_put_with_prefix, test_kv_delete_range_with_prefix,
623 test_kv_put_with_prefix, test_kv_range_2_with_prefix, test_kv_range_with_prefix,
624 test_txn_compare_equal, test_txn_compare_greater, test_txn_compare_less,
625 test_txn_compare_not_equal, test_txn_one_compare_op, text_txn_multi_compare_op,
626 unprepare_kv,
627 };
628 use crate::maybe_skip_etcd_tls_integration_test;
629 use crate::test_util::etcd_certs_dir;
630
631 async fn build_kv_backend() -> Option<EtcdStore> {
632 let endpoints = std::env::var("GT_ETCD_ENDPOINTS").unwrap_or_default();
633 if endpoints.is_empty() {
634 return None;
635 }
636
637 let endpoints = endpoints
638 .split(',')
639 .map(|s| s.to_string())
640 .collect::<Vec<String>>();
641
642 let client = Client::connect(endpoints, None)
643 .await
644 .expect("malformed endpoints");
645
646 Some(EtcdStore {
647 client,
648 max_txn_ops: 128,
649 max_decoding_size: DEFAULT_MAX_DECODING_SIZE,
650 })
651 }
652
653 #[tokio::test]
654 async fn test_put() {
655 if let Some(kv_backend) = build_kv_backend().await {
656 let prefix = b"put/";
657 prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
658 test_kv_put_with_prefix(&kv_backend, prefix.to_vec()).await;
659 unprepare_kv(&kv_backend, prefix).await;
660 }
661 }
662
663 #[tokio::test]
664 async fn test_range() {
665 if let Some(kv_backend) = build_kv_backend().await {
666 let prefix = b"range/";
667 prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
668 test_kv_range_with_prefix(&kv_backend, prefix.to_vec()).await;
669 unprepare_kv(&kv_backend, prefix).await;
670 }
671 }
672
673 #[tokio::test]
674 async fn test_range_2() {
675 if let Some(kv_backend) = build_kv_backend().await {
676 test_kv_range_2_with_prefix(&kv_backend, b"range2/".to_vec()).await;
677 }
678 }
679
680 #[tokio::test]
681 async fn test_batch_get() {
682 if let Some(kv_backend) = build_kv_backend().await {
683 let prefix = b"batchGet/";
684 prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
685 test_kv_batch_get_with_prefix(&kv_backend, prefix.to_vec()).await;
686 unprepare_kv(&kv_backend, prefix).await;
687 }
688 }
689
690 #[tokio::test(flavor = "multi_thread")]
691 async fn test_compare_and_put() {
692 if let Some(kv_backend) = build_kv_backend().await {
693 let kv_backend = Arc::new(kv_backend);
694 test_kv_compare_and_put_with_prefix(kv_backend, b"compareAndPut/".to_vec()).await;
695 }
696 }
697
698 #[tokio::test]
699 async fn test_delete_range() {
700 if let Some(kv_backend) = build_kv_backend().await {
701 let prefix = b"deleteRange/";
702 prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
703 test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await;
704 unprepare_kv(&kv_backend, prefix).await;
705 }
706 }
707
708 #[tokio::test]
709 async fn test_batch_delete() {
710 if let Some(kv_backend) = build_kv_backend().await {
711 let prefix = b"batchDelete/";
712 prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
713 test_kv_batch_delete_with_prefix(&kv_backend, prefix.to_vec()).await;
714 unprepare_kv(&kv_backend, prefix).await;
715 }
716 }
717
718 #[tokio::test]
719 async fn test_etcd_txn() {
720 if let Some(kv_backend) = build_kv_backend().await {
721 test_txn_one_compare_op(&kv_backend).await;
722 text_txn_multi_compare_op(&kv_backend).await;
723 test_txn_compare_equal(&kv_backend).await;
724 test_txn_compare_greater(&kv_backend).await;
725 test_txn_compare_less(&kv_backend).await;
726 test_txn_compare_not_equal(&kv_backend).await;
727 }
728 }
729
730 async fn create_etcd_client_with_tls(endpoints: &[String], tls_config: &TlsOption) -> Client {
731 let endpoints = endpoints
732 .iter()
733 .map(|s| s.trim())
734 .filter(|s| !s.is_empty())
735 .collect::<Vec<_>>();
736 let connect_options =
737 ConnectOptions::new().with_tls(create_etcd_tls_options(tls_config).unwrap().unwrap());
738
739 Client::connect(&endpoints, Some(connect_options))
740 .await
741 .unwrap()
742 }
743
744 #[tokio::test]
745 async fn test_create_etcd_client_with_mtls_and_ca() {
746 maybe_skip_etcd_tls_integration_test!();
747 let endpoints = std::env::var("GT_ETCD_TLS_ENDPOINTS")
748 .unwrap()
749 .split(',')
750 .map(|s| s.to_string())
751 .collect::<Vec<_>>();
752
753 let cert_dir = etcd_certs_dir();
754 let tls_config = TlsOption {
755 mode: TlsMode::Require,
756 ca_cert_path: cert_dir.join("ca.crt").to_string_lossy().to_string(),
757 cert_path: cert_dir.join("client.crt").to_string_lossy().to_string(),
758 key_path: cert_dir
759 .join("client-key.pem")
760 .to_string_lossy()
761 .to_string(),
762 };
763 let mut client = create_etcd_client_with_tls(&endpoints, &tls_config).await;
764 let _ = client.get(b"hello", None).await.unwrap();
765 }
766}