1use std::any::Any;
16use std::sync::Arc;
17
18use common_telemetry::info;
19use etcd_client::{
20 Client, DeleteOptions, GetOptions, PutOptions, Txn, TxnOp, TxnOpResponse, TxnResponse,
21};
22use snafu::{ensure, ResultExt};
23
24use crate::error::{self, Error, Result};
25use crate::kv_backend::txn::{Txn as KvTxn, TxnResponse as KvTxnResponse};
26use crate::kv_backend::{KvBackend, KvBackendRef, TxnService};
27use crate::metrics::METRIC_META_TXN_REQUEST;
28use crate::rpc::store::{
29 BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
30 BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse,
31 RangeRequest, RangeResponse,
32};
33use crate::rpc::KeyValue;
34
35pub struct EtcdStore {
36 client: Client,
37 max_txn_ops: usize,
42}
43
44impl EtcdStore {
45 pub async fn with_endpoints<E, S>(endpoints: S, max_txn_ops: usize) -> Result<KvBackendRef>
46 where
47 E: AsRef<str>,
48 S: AsRef<[E]>,
49 {
50 let client = Client::connect(endpoints, None)
51 .await
52 .context(error::ConnectEtcdSnafu)?;
53
54 Ok(Self::with_etcd_client(client, max_txn_ops))
55 }
56
57 pub fn with_etcd_client(client: Client, max_txn_ops: usize) -> KvBackendRef {
58 info!("Connected to etcd");
59 Arc::new(Self {
60 client,
61 max_txn_ops,
62 })
63 }
64
65 async fn do_multi_txn(&self, txn_ops: Vec<TxnOp>) -> Result<Vec<TxnResponse>> {
66 let max_txn_ops = self.max_txn_ops();
67 if txn_ops.len() < max_txn_ops {
68 let _timer = METRIC_META_TXN_REQUEST
70 .with_label_values(&["etcd", "txn"])
71 .start_timer();
72 let txn = Txn::new().and_then(txn_ops);
73 let txn_res = self
74 .client
75 .kv_client()
76 .txn(txn)
77 .await
78 .context(error::EtcdFailedSnafu)?;
79 return Ok(vec![txn_res]);
80 }
81
82 let txns = txn_ops
83 .chunks(max_txn_ops)
84 .map(|part| async move {
85 let _timer = METRIC_META_TXN_REQUEST
86 .with_label_values(&["etcd", "txn"])
87 .start_timer();
88 let txn = Txn::new().and_then(part);
89 self.client.kv_client().txn(txn).await
90 })
91 .collect::<Vec<_>>();
92
93 futures::future::try_join_all(txns)
94 .await
95 .context(error::EtcdFailedSnafu)
96 }
97}
98
99#[async_trait::async_trait]
100impl KvBackend for EtcdStore {
101 fn name(&self) -> &str {
102 "Etcd"
103 }
104
105 fn as_any(&self) -> &dyn Any {
106 self
107 }
108
109 async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
110 let Get { key, options } = req.try_into()?;
111
112 let mut res = self
113 .client
114 .kv_client()
115 .get(key, options)
116 .await
117 .context(error::EtcdFailedSnafu)?;
118
119 let kvs = res
120 .take_kvs()
121 .into_iter()
122 .map(KeyValue::from)
123 .collect::<Vec<_>>();
124
125 Ok(RangeResponse {
126 kvs,
127 more: res.more(),
128 })
129 }
130
131 async fn put(&self, req: PutRequest) -> Result<PutResponse> {
132 let Put {
133 key,
134 value,
135 options,
136 } = req.try_into()?;
137
138 let mut res = self
139 .client
140 .kv_client()
141 .put(key, value, options)
142 .await
143 .context(error::EtcdFailedSnafu)?;
144
145 let prev_kv = res.take_prev_key().map(KeyValue::from);
146 Ok(PutResponse { prev_kv })
147 }
148
149 async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
150 let BatchPut { kvs, options } = req.try_into()?;
151
152 let put_ops = kvs
153 .into_iter()
154 .map(|kv| TxnOp::put(kv.key, kv.value, options.clone()))
155 .collect::<Vec<_>>();
156
157 let txn_responses = self.do_multi_txn(put_ops).await?;
158
159 let mut prev_kvs = vec![];
160 for txn_res in txn_responses {
161 for op_res in txn_res.op_responses() {
162 match op_res {
163 TxnOpResponse::Put(mut put_res) => {
164 if let Some(prev_kv) = put_res.take_prev_key().map(KeyValue::from) {
165 prev_kvs.push(prev_kv);
166 }
167 }
168 _ => unreachable!(),
169 }
170 }
171 }
172
173 Ok(BatchPutResponse { prev_kvs })
174 }
175
176 async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
177 let BatchGet { keys, options } = req.try_into()?;
178
179 let get_ops: Vec<_> = keys
180 .into_iter()
181 .map(|key| TxnOp::get(key, options.clone()))
182 .collect();
183
184 let txn_responses = self.do_multi_txn(get_ops).await?;
185
186 let mut kvs = vec![];
187 for txn_res in txn_responses {
188 for op_res in txn_res.op_responses() {
189 let mut get_res = match op_res {
190 TxnOpResponse::Get(get_res) => get_res,
191 _ => unreachable!(),
192 };
193 kvs.extend(get_res.take_kvs().into_iter().map(KeyValue::from));
194 }
195 }
196
197 Ok(BatchGetResponse { kvs })
198 }
199
200 async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
201 let Delete { key, options } = req.try_into()?;
202
203 let mut res = self
204 .client
205 .kv_client()
206 .delete(key, options)
207 .await
208 .context(error::EtcdFailedSnafu)?;
209
210 let prev_kvs = res
211 .take_prev_kvs()
212 .into_iter()
213 .map(KeyValue::from)
214 .collect::<Vec<_>>();
215
216 Ok(DeleteRangeResponse {
217 deleted: res.deleted(),
218 prev_kvs,
219 })
220 }
221
222 async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
223 let BatchDelete { keys, options } = req.try_into()?;
224
225 let mut prev_kvs = Vec::with_capacity(keys.len());
226
227 let delete_ops = keys
228 .into_iter()
229 .map(|key| TxnOp::delete(key, options.clone()))
230 .collect::<Vec<_>>();
231
232 let txn_responses = self.do_multi_txn(delete_ops).await?;
233
234 for txn_res in txn_responses {
235 for op_res in txn_res.op_responses() {
236 match op_res {
237 TxnOpResponse::Delete(mut delete_res) => {
238 delete_res
239 .take_prev_kvs()
240 .into_iter()
241 .map(KeyValue::from)
242 .for_each(|kv| {
243 prev_kvs.push(kv);
244 });
245 }
246 _ => unreachable!(),
247 }
248 }
249 }
250
251 Ok(BatchDeleteResponse { prev_kvs })
252 }
253}
254
255#[async_trait::async_trait]
256impl TxnService for EtcdStore {
257 type Error = Error;
258
259 async fn txn(&self, txn: KvTxn) -> Result<KvTxnResponse> {
260 let _timer = METRIC_META_TXN_REQUEST
261 .with_label_values(&["etcd", "txn"])
262 .start_timer();
263
264 let max_operations = txn.max_operations();
265
266 let etcd_txn: Txn = txn.into();
267 let txn_res = self
268 .client
269 .kv_client()
270 .txn(etcd_txn)
271 .await
272 .context(error::EtcdTxnFailedSnafu { max_operations })?;
273 txn_res.try_into()
274 }
275
276 fn max_txn_ops(&self) -> usize {
277 self.max_txn_ops
278 }
279}
280
281struct Get {
282 key: Vec<u8>,
283 options: Option<GetOptions>,
284}
285
286impl TryFrom<RangeRequest> for Get {
287 type Error = Error;
288
289 fn try_from(req: RangeRequest) -> Result<Self> {
290 let RangeRequest {
291 key,
292 range_end,
293 limit,
294 keys_only,
295 } = req;
296
297 ensure!(!key.is_empty(), error::EmptyKeySnafu);
298
299 let mut options = GetOptions::default();
300 if !range_end.is_empty() {
301 options = options.with_range(range_end);
302 if limit > 0 {
303 options = options.with_limit(limit);
304 }
305 }
306 if keys_only {
307 options = options.with_keys_only();
308 }
309
310 Ok(Get {
311 key,
312 options: Some(options),
313 })
314 }
315}
316
317struct Put {
318 key: Vec<u8>,
319 value: Vec<u8>,
320 options: Option<PutOptions>,
321}
322
323impl TryFrom<PutRequest> for Put {
324 type Error = Error;
325
326 fn try_from(req: PutRequest) -> Result<Self> {
327 let PutRequest {
328 key,
329 value,
330 prev_kv,
331 } = req;
332
333 let mut options = PutOptions::default();
334 if prev_kv {
335 options = options.with_prev_key();
336 }
337
338 Ok(Put {
339 key,
340 value,
341 options: Some(options),
342 })
343 }
344}
345
346struct BatchGet {
347 keys: Vec<Vec<u8>>,
348 options: Option<GetOptions>,
349}
350
351impl TryFrom<BatchGetRequest> for BatchGet {
352 type Error = Error;
353
354 fn try_from(req: BatchGetRequest) -> Result<Self> {
355 let BatchGetRequest { keys } = req;
356
357 let options = GetOptions::default();
358
359 Ok(BatchGet {
360 keys,
361 options: Some(options),
362 })
363 }
364}
365
366struct BatchPut {
367 kvs: Vec<KeyValue>,
368 options: Option<PutOptions>,
369}
370
371impl TryFrom<BatchPutRequest> for BatchPut {
372 type Error = Error;
373
374 fn try_from(req: BatchPutRequest) -> Result<Self> {
375 let BatchPutRequest { kvs, prev_kv } = req;
376
377 let mut options = PutOptions::default();
378 if prev_kv {
379 options = options.with_prev_key();
380 }
381
382 Ok(BatchPut {
383 kvs,
384 options: Some(options),
385 })
386 }
387}
388
389struct BatchDelete {
390 keys: Vec<Vec<u8>>,
391 options: Option<DeleteOptions>,
392}
393
394impl TryFrom<BatchDeleteRequest> for BatchDelete {
395 type Error = Error;
396
397 fn try_from(req: BatchDeleteRequest) -> Result<Self> {
398 let BatchDeleteRequest { keys, prev_kv } = req;
399
400 let mut options = DeleteOptions::default();
401 if prev_kv {
402 options = options.with_prev_key();
403 }
404
405 Ok(BatchDelete {
406 keys,
407 options: Some(options),
408 })
409 }
410}
411
412struct Delete {
413 key: Vec<u8>,
414 options: Option<DeleteOptions>,
415}
416
417impl TryFrom<DeleteRangeRequest> for Delete {
418 type Error = Error;
419
420 fn try_from(req: DeleteRangeRequest) -> Result<Self> {
421 let DeleteRangeRequest {
422 key,
423 range_end,
424 prev_kv,
425 } = req;
426
427 ensure!(!key.is_empty(), error::EmptyKeySnafu);
428
429 let mut options = DeleteOptions::default();
430 if !range_end.is_empty() {
431 options = options.with_range(range_end);
432 }
433 if prev_kv {
434 options = options.with_prev_key();
435 }
436
437 Ok(Delete {
438 key,
439 options: Some(options),
440 })
441 }
442}
443
444#[cfg(test)]
445mod tests {
446 use super::*;
447
448 #[test]
449 fn test_parse_get() {
450 let req = RangeRequest {
451 key: b"test_key".to_vec(),
452 range_end: b"test_range_end".to_vec(),
453 limit: 64,
454 keys_only: true,
455 };
456
457 let get: Get = req.try_into().unwrap();
458
459 assert_eq!(b"test_key".to_vec(), get.key);
460 let _ = get.options.unwrap();
461 }
462
463 #[test]
464 fn test_parse_put() {
465 let req = PutRequest {
466 key: b"test_key".to_vec(),
467 value: b"test_value".to_vec(),
468 prev_kv: true,
469 };
470
471 let put: Put = req.try_into().unwrap();
472
473 assert_eq!(b"test_key".to_vec(), put.key);
474 assert_eq!(b"test_value".to_vec(), put.value);
475 let _ = put.options.unwrap();
476 }
477
478 #[test]
479 fn test_parse_batch_get() {
480 let req = BatchGetRequest {
481 keys: vec![b"k1".to_vec(), b"k2".to_vec(), b"k3".to_vec()],
482 };
483
484 let batch_get: BatchGet = req.try_into().unwrap();
485 let keys = batch_get.keys;
486
487 assert_eq!(b"k1".to_vec(), keys.first().unwrap().clone());
488 assert_eq!(b"k2".to_vec(), keys.get(1).unwrap().clone());
489 assert_eq!(b"k3".to_vec(), keys.get(2).unwrap().clone());
490 }
491
492 #[test]
493 fn test_parse_batch_put() {
494 let req = BatchPutRequest {
495 kvs: vec![KeyValue {
496 key: b"test_key".to_vec(),
497 value: b"test_value".to_vec(),
498 }],
499 prev_kv: true,
500 };
501
502 let batch_put: BatchPut = req.try_into().unwrap();
503
504 let kv = batch_put.kvs.first().unwrap();
505 assert_eq!(b"test_key", kv.key());
506 assert_eq!(b"test_value", kv.value());
507 let _ = batch_put.options.unwrap();
508 }
509
510 #[test]
511 fn test_parse_batch_delete() {
512 let req = BatchDeleteRequest {
513 keys: vec![b"k1".to_vec(), b"k2".to_vec(), b"k3".to_vec()],
514 prev_kv: true,
515 };
516
517 let batch_delete: BatchDelete = req.try_into().unwrap();
518
519 assert_eq!(batch_delete.keys.len(), 3);
520 assert_eq!(b"k1".to_vec(), batch_delete.keys.first().unwrap().clone());
521 assert_eq!(b"k2".to_vec(), batch_delete.keys.get(1).unwrap().clone());
522 assert_eq!(b"k3".to_vec(), batch_delete.keys.get(2).unwrap().clone());
523 let _ = batch_delete.options.unwrap();
524 }
525
526 #[test]
527 fn test_parse_delete() {
528 let req = DeleteRangeRequest {
529 key: b"test_key".to_vec(),
530 range_end: b"test_range_end".to_vec(),
531 prev_kv: true,
532 };
533
534 let delete: Delete = req.try_into().unwrap();
535
536 assert_eq!(b"test_key".to_vec(), delete.key);
537 let _ = delete.options.unwrap();
538 }
539
540 use crate::kv_backend::test::{
541 prepare_kv_with_prefix, test_kv_batch_delete_with_prefix, test_kv_batch_get_with_prefix,
542 test_kv_compare_and_put_with_prefix, test_kv_delete_range_with_prefix,
543 test_kv_put_with_prefix, test_kv_range_2_with_prefix, test_kv_range_with_prefix,
544 test_txn_compare_equal, test_txn_compare_greater, test_txn_compare_less,
545 test_txn_compare_not_equal, test_txn_one_compare_op, text_txn_multi_compare_op,
546 unprepare_kv,
547 };
548
549 async fn build_kv_backend() -> Option<EtcdStore> {
550 let endpoints = std::env::var("GT_ETCD_ENDPOINTS").unwrap_or_default();
551 if endpoints.is_empty() {
552 return None;
553 }
554
555 let endpoints = endpoints
556 .split(',')
557 .map(|s| s.to_string())
558 .collect::<Vec<String>>();
559
560 let client = Client::connect(endpoints, None)
561 .await
562 .expect("malformed endpoints");
563
564 Some(EtcdStore {
565 client,
566 max_txn_ops: 128,
567 })
568 }
569
570 #[tokio::test]
571 async fn test_put() {
572 if let Some(kv_backend) = build_kv_backend().await {
573 let prefix = b"put/";
574 prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
575 test_kv_put_with_prefix(&kv_backend, prefix.to_vec()).await;
576 unprepare_kv(&kv_backend, prefix).await;
577 }
578 }
579
580 #[tokio::test]
581 async fn test_range() {
582 if let Some(kv_backend) = build_kv_backend().await {
583 let prefix = b"range/";
584 prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
585 test_kv_range_with_prefix(&kv_backend, prefix.to_vec()).await;
586 unprepare_kv(&kv_backend, prefix).await;
587 }
588 }
589
590 #[tokio::test]
591 async fn test_range_2() {
592 if let Some(kv_backend) = build_kv_backend().await {
593 test_kv_range_2_with_prefix(&kv_backend, b"range2/".to_vec()).await;
594 }
595 }
596
597 #[tokio::test]
598 async fn test_batch_get() {
599 if let Some(kv_backend) = build_kv_backend().await {
600 let prefix = b"batchGet/";
601 prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
602 test_kv_batch_get_with_prefix(&kv_backend, prefix.to_vec()).await;
603 unprepare_kv(&kv_backend, prefix).await;
604 }
605 }
606
607 #[tokio::test(flavor = "multi_thread")]
608 async fn test_compare_and_put() {
609 if let Some(kv_backend) = build_kv_backend().await {
610 let kv_backend = Arc::new(kv_backend);
611 test_kv_compare_and_put_with_prefix(kv_backend, b"compareAndPut/".to_vec()).await;
612 }
613 }
614
615 #[tokio::test]
616 async fn test_delete_range() {
617 if let Some(kv_backend) = build_kv_backend().await {
618 let prefix = b"deleteRange/";
619 prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
620 test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await;
621 unprepare_kv(&kv_backend, prefix).await;
622 }
623 }
624
625 #[tokio::test]
626 async fn test_batch_delete() {
627 if let Some(kv_backend) = build_kv_backend().await {
628 let prefix = b"batchDelete/";
629 prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
630 test_kv_batch_delete_with_prefix(&kv_backend, prefix.to_vec()).await;
631 unprepare_kv(&kv_backend, prefix).await;
632 }
633 }
634
635 #[tokio::test]
636 async fn test_etcd_txn() {
637 if let Some(kv_backend) = build_kv_backend().await {
638 test_txn_one_compare_op(&kv_backend).await;
639 text_txn_multi_compare_op(&kv_backend).await;
640 test_txn_compare_equal(&kv_backend).await;
641 test_txn_compare_greater(&kv_backend).await;
642 test_txn_compare_less(&kv_backend).await;
643 test_txn_compare_not_equal(&kv_backend).await;
644 }
645 }
646}