1use std::any::Any;
16use std::sync::Arc;
17
18use common_telemetry::info;
19use etcd_client::{
20 Client, DeleteOptions, GetOptions, PutOptions, Txn, TxnOp, TxnOpResponse, TxnResponse,
21};
22use snafu::{ResultExt, ensure};
23
24use crate::error::{self, Error, Result};
25use crate::kv_backend::txn::{Txn as KvTxn, TxnResponse as KvTxnResponse};
26use crate::kv_backend::{KvBackend, KvBackendRef, TxnService};
27use crate::metrics::METRIC_META_TXN_REQUEST;
28use crate::rpc::KeyValue;
29use crate::rpc::store::{
30 BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
31 BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse,
32 RangeRequest, RangeResponse,
33};
34
35const DEFAULT_MAX_DECODING_SIZE: usize = 32 * 1024 * 1024; pub struct EtcdStore {
38 client: Client,
39 max_txn_ops: usize,
44 max_decoding_size: usize,
46}
47
48impl EtcdStore {
49 pub async fn with_endpoints<E, S>(endpoints: S, max_txn_ops: usize) -> Result<KvBackendRef>
50 where
51 E: AsRef<str>,
52 S: AsRef<[E]>,
53 {
54 let client = Client::connect(endpoints, None)
55 .await
56 .context(error::ConnectEtcdSnafu)?;
57
58 Ok(Self::with_etcd_client(client, max_txn_ops))
59 }
60
61 pub fn with_etcd_client(client: Client, max_txn_ops: usize) -> KvBackendRef {
62 info!("Connected to etcd");
63 Arc::new(Self {
64 client,
65 max_txn_ops,
66 max_decoding_size: DEFAULT_MAX_DECODING_SIZE,
67 })
68 }
69
70 pub fn set_max_decoding_size(&mut self, max_decoding_size: usize) {
71 self.max_decoding_size = max_decoding_size;
72 }
73
74 fn kv_client(&self) -> etcd_client::KvClient {
75 self.client
76 .kv_client()
77 .max_decoding_message_size(self.max_decoding_size)
78 }
79
80 async fn do_multi_txn(&self, txn_ops: Vec<TxnOp>) -> Result<Vec<TxnResponse>> {
81 let max_txn_ops = self.max_txn_ops();
82 if txn_ops.len() < max_txn_ops {
83 let _timer = METRIC_META_TXN_REQUEST
85 .with_label_values(&["etcd", "txn"])
86 .start_timer();
87 let txn = Txn::new().and_then(txn_ops);
88 let txn_res = self
89 .kv_client()
90 .txn(txn)
91 .await
92 .context(error::EtcdFailedSnafu)?;
93 return Ok(vec![txn_res]);
94 }
95
96 let txns = txn_ops
97 .chunks(max_txn_ops)
98 .map(|part| async move {
99 let _timer = METRIC_META_TXN_REQUEST
100 .with_label_values(&["etcd", "txn"])
101 .start_timer();
102 let txn = Txn::new().and_then(part);
103 self.kv_client().txn(txn).await
104 })
105 .collect::<Vec<_>>();
106
107 futures::future::try_join_all(txns)
108 .await
109 .context(error::EtcdFailedSnafu)
110 }
111}
112
113#[async_trait::async_trait]
114impl KvBackend for EtcdStore {
115 fn name(&self) -> &str {
116 "Etcd"
117 }
118
119 fn as_any(&self) -> &dyn Any {
120 self
121 }
122
123 async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
124 let Get { key, options } = req.try_into()?;
125
126 let mut res = self
127 .kv_client()
128 .get(key, options)
129 .await
130 .context(error::EtcdFailedSnafu)?;
131
132 let kvs = res
133 .take_kvs()
134 .into_iter()
135 .map(KeyValue::from)
136 .collect::<Vec<_>>();
137
138 Ok(RangeResponse {
139 kvs,
140 more: res.more(),
141 })
142 }
143
144 async fn put(&self, req: PutRequest) -> Result<PutResponse> {
145 let Put {
146 key,
147 value,
148 options,
149 } = req.try_into()?;
150
151 let mut res = self
152 .kv_client()
153 .put(key, value, options)
154 .await
155 .context(error::EtcdFailedSnafu)?;
156
157 let prev_kv = res.take_prev_key().map(KeyValue::from);
158 Ok(PutResponse { prev_kv })
159 }
160
161 async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
162 let BatchPut { kvs, options } = req.try_into()?;
163
164 let put_ops = kvs
165 .into_iter()
166 .map(|kv| TxnOp::put(kv.key, kv.value, options.clone()))
167 .collect::<Vec<_>>();
168
169 let txn_responses = self.do_multi_txn(put_ops).await?;
170
171 let mut prev_kvs = vec![];
172 for txn_res in txn_responses {
173 for op_res in txn_res.op_responses() {
174 match op_res {
175 TxnOpResponse::Put(mut put_res) => {
176 if let Some(prev_kv) = put_res.take_prev_key().map(KeyValue::from) {
177 prev_kvs.push(prev_kv);
178 }
179 }
180 _ => unreachable!(),
181 }
182 }
183 }
184
185 Ok(BatchPutResponse { prev_kvs })
186 }
187
188 async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
189 let BatchGet { keys, options } = req.try_into()?;
190
191 let get_ops: Vec<_> = keys
192 .into_iter()
193 .map(|key| TxnOp::get(key, options.clone()))
194 .collect();
195
196 let txn_responses = self.do_multi_txn(get_ops).await?;
197
198 let mut kvs = vec![];
199 for txn_res in txn_responses {
200 for op_res in txn_res.op_responses() {
201 let mut get_res = match op_res {
202 TxnOpResponse::Get(get_res) => get_res,
203 _ => unreachable!(),
204 };
205 kvs.extend(get_res.take_kvs().into_iter().map(KeyValue::from));
206 }
207 }
208
209 Ok(BatchGetResponse { kvs })
210 }
211
212 async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
213 let Delete { key, options } = req.try_into()?;
214
215 let mut res = self
216 .kv_client()
217 .delete(key, options)
218 .await
219 .context(error::EtcdFailedSnafu)?;
220
221 let prev_kvs = res
222 .take_prev_kvs()
223 .into_iter()
224 .map(KeyValue::from)
225 .collect::<Vec<_>>();
226
227 Ok(DeleteRangeResponse {
228 deleted: res.deleted(),
229 prev_kvs,
230 })
231 }
232
233 async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
234 let BatchDelete { keys, options } = req.try_into()?;
235
236 let mut prev_kvs = Vec::with_capacity(keys.len());
237
238 let delete_ops = keys
239 .into_iter()
240 .map(|key| TxnOp::delete(key, options.clone()))
241 .collect::<Vec<_>>();
242
243 let txn_responses = self.do_multi_txn(delete_ops).await?;
244
245 for txn_res in txn_responses {
246 for op_res in txn_res.op_responses() {
247 match op_res {
248 TxnOpResponse::Delete(mut delete_res) => {
249 delete_res
250 .take_prev_kvs()
251 .into_iter()
252 .map(KeyValue::from)
253 .for_each(|kv| {
254 prev_kvs.push(kv);
255 });
256 }
257 _ => unreachable!(),
258 }
259 }
260 }
261
262 Ok(BatchDeleteResponse { prev_kvs })
263 }
264}
265
266#[async_trait::async_trait]
267impl TxnService for EtcdStore {
268 type Error = Error;
269
270 async fn txn(&self, txn: KvTxn) -> Result<KvTxnResponse> {
271 let _timer = METRIC_META_TXN_REQUEST
272 .with_label_values(&["etcd", "txn"])
273 .start_timer();
274
275 let max_operations = txn.max_operations();
276
277 let etcd_txn: Txn = txn.into();
278 let txn_res = self
279 .kv_client()
280 .txn(etcd_txn)
281 .await
282 .context(error::EtcdTxnFailedSnafu { max_operations })?;
283 txn_res.try_into()
284 }
285
286 fn max_txn_ops(&self) -> usize {
287 self.max_txn_ops
288 }
289}
290
291struct Get {
292 key: Vec<u8>,
293 options: Option<GetOptions>,
294}
295
296impl TryFrom<RangeRequest> for Get {
297 type Error = Error;
298
299 fn try_from(req: RangeRequest) -> Result<Self> {
300 let RangeRequest {
301 key,
302 range_end,
303 limit,
304 keys_only,
305 } = req;
306
307 ensure!(!key.is_empty(), error::EmptyKeySnafu);
308
309 let mut options = GetOptions::default();
310 if !range_end.is_empty() {
311 options = options.with_range(range_end);
312 if limit > 0 {
313 options = options.with_limit(limit);
314 }
315 }
316 if keys_only {
317 options = options.with_keys_only();
318 }
319
320 Ok(Get {
321 key,
322 options: Some(options),
323 })
324 }
325}
326
327struct Put {
328 key: Vec<u8>,
329 value: Vec<u8>,
330 options: Option<PutOptions>,
331}
332
333impl TryFrom<PutRequest> for Put {
334 type Error = Error;
335
336 fn try_from(req: PutRequest) -> Result<Self> {
337 let PutRequest {
338 key,
339 value,
340 prev_kv,
341 } = req;
342
343 let mut options = PutOptions::default();
344 if prev_kv {
345 options = options.with_prev_key();
346 }
347
348 Ok(Put {
349 key,
350 value,
351 options: Some(options),
352 })
353 }
354}
355
356struct BatchGet {
357 keys: Vec<Vec<u8>>,
358 options: Option<GetOptions>,
359}
360
361impl TryFrom<BatchGetRequest> for BatchGet {
362 type Error = Error;
363
364 fn try_from(req: BatchGetRequest) -> Result<Self> {
365 let BatchGetRequest { keys } = req;
366
367 let options = GetOptions::default();
368
369 Ok(BatchGet {
370 keys,
371 options: Some(options),
372 })
373 }
374}
375
376struct BatchPut {
377 kvs: Vec<KeyValue>,
378 options: Option<PutOptions>,
379}
380
381impl TryFrom<BatchPutRequest> for BatchPut {
382 type Error = Error;
383
384 fn try_from(req: BatchPutRequest) -> Result<Self> {
385 let BatchPutRequest { kvs, prev_kv } = req;
386
387 let mut options = PutOptions::default();
388 if prev_kv {
389 options = options.with_prev_key();
390 }
391
392 Ok(BatchPut {
393 kvs,
394 options: Some(options),
395 })
396 }
397}
398
399struct BatchDelete {
400 keys: Vec<Vec<u8>>,
401 options: Option<DeleteOptions>,
402}
403
404impl TryFrom<BatchDeleteRequest> for BatchDelete {
405 type Error = Error;
406
407 fn try_from(req: BatchDeleteRequest) -> Result<Self> {
408 let BatchDeleteRequest { keys, prev_kv } = req;
409
410 let mut options = DeleteOptions::default();
411 if prev_kv {
412 options = options.with_prev_key();
413 }
414
415 Ok(BatchDelete {
416 keys,
417 options: Some(options),
418 })
419 }
420}
421
422struct Delete {
423 key: Vec<u8>,
424 options: Option<DeleteOptions>,
425}
426
427impl TryFrom<DeleteRangeRequest> for Delete {
428 type Error = Error;
429
430 fn try_from(req: DeleteRangeRequest) -> Result<Self> {
431 let DeleteRangeRequest {
432 key,
433 range_end,
434 prev_kv,
435 } = req;
436
437 ensure!(!key.is_empty(), error::EmptyKeySnafu);
438
439 let mut options = DeleteOptions::default();
440 if !range_end.is_empty() {
441 options = options.with_range(range_end);
442 }
443 if prev_kv {
444 options = options.with_prev_key();
445 }
446
447 Ok(Delete {
448 key,
449 options: Some(options),
450 })
451 }
452}
453
454#[cfg(test)]
455mod tests {
456 use super::*;
457
458 #[test]
459 fn test_parse_get() {
460 let req = RangeRequest {
461 key: b"test_key".to_vec(),
462 range_end: b"test_range_end".to_vec(),
463 limit: 64,
464 keys_only: true,
465 };
466
467 let get: Get = req.try_into().unwrap();
468
469 assert_eq!(b"test_key".to_vec(), get.key);
470 let _ = get.options.unwrap();
471 }
472
473 #[test]
474 fn test_parse_put() {
475 let req = PutRequest {
476 key: b"test_key".to_vec(),
477 value: b"test_value".to_vec(),
478 prev_kv: true,
479 };
480
481 let put: Put = req.try_into().unwrap();
482
483 assert_eq!(b"test_key".to_vec(), put.key);
484 assert_eq!(b"test_value".to_vec(), put.value);
485 let _ = put.options.unwrap();
486 }
487
488 #[test]
489 fn test_parse_batch_get() {
490 let req = BatchGetRequest {
491 keys: vec![b"k1".to_vec(), b"k2".to_vec(), b"k3".to_vec()],
492 };
493
494 let batch_get: BatchGet = req.try_into().unwrap();
495 let keys = batch_get.keys;
496
497 assert_eq!(b"k1".to_vec(), keys.first().unwrap().clone());
498 assert_eq!(b"k2".to_vec(), keys.get(1).unwrap().clone());
499 assert_eq!(b"k3".to_vec(), keys.get(2).unwrap().clone());
500 }
501
502 #[test]
503 fn test_parse_batch_put() {
504 let req = BatchPutRequest {
505 kvs: vec![KeyValue {
506 key: b"test_key".to_vec(),
507 value: b"test_value".to_vec(),
508 }],
509 prev_kv: true,
510 };
511
512 let batch_put: BatchPut = req.try_into().unwrap();
513
514 let kv = batch_put.kvs.first().unwrap();
515 assert_eq!(b"test_key", kv.key());
516 assert_eq!(b"test_value", kv.value());
517 let _ = batch_put.options.unwrap();
518 }
519
520 #[test]
521 fn test_parse_batch_delete() {
522 let req = BatchDeleteRequest {
523 keys: vec![b"k1".to_vec(), b"k2".to_vec(), b"k3".to_vec()],
524 prev_kv: true,
525 };
526
527 let batch_delete: BatchDelete = req.try_into().unwrap();
528
529 assert_eq!(batch_delete.keys.len(), 3);
530 assert_eq!(b"k1".to_vec(), batch_delete.keys.first().unwrap().clone());
531 assert_eq!(b"k2".to_vec(), batch_delete.keys.get(1).unwrap().clone());
532 assert_eq!(b"k3".to_vec(), batch_delete.keys.get(2).unwrap().clone());
533 let _ = batch_delete.options.unwrap();
534 }
535
536 #[test]
537 fn test_parse_delete() {
538 let req = DeleteRangeRequest {
539 key: b"test_key".to_vec(),
540 range_end: b"test_range_end".to_vec(),
541 prev_kv: true,
542 };
543
544 let delete: Delete = req.try_into().unwrap();
545
546 assert_eq!(b"test_key".to_vec(), delete.key);
547 let _ = delete.options.unwrap();
548 }
549
550 use crate::kv_backend::test::{
551 prepare_kv_with_prefix, test_kv_batch_delete_with_prefix, test_kv_batch_get_with_prefix,
552 test_kv_compare_and_put_with_prefix, test_kv_delete_range_with_prefix,
553 test_kv_put_with_prefix, test_kv_range_2_with_prefix, test_kv_range_with_prefix,
554 test_txn_compare_equal, test_txn_compare_greater, test_txn_compare_less,
555 test_txn_compare_not_equal, test_txn_one_compare_op, text_txn_multi_compare_op,
556 unprepare_kv,
557 };
558
559 async fn build_kv_backend() -> Option<EtcdStore> {
560 let endpoints = std::env::var("GT_ETCD_ENDPOINTS").unwrap_or_default();
561 if endpoints.is_empty() {
562 return None;
563 }
564
565 let endpoints = endpoints
566 .split(',')
567 .map(|s| s.to_string())
568 .collect::<Vec<String>>();
569
570 let client = Client::connect(endpoints, None)
571 .await
572 .expect("malformed endpoints");
573
574 Some(EtcdStore {
575 client,
576 max_txn_ops: 128,
577 max_decoding_size: DEFAULT_MAX_DECODING_SIZE,
578 })
579 }
580
581 #[tokio::test]
582 async fn test_put() {
583 if let Some(kv_backend) = build_kv_backend().await {
584 let prefix = b"put/";
585 prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
586 test_kv_put_with_prefix(&kv_backend, prefix.to_vec()).await;
587 unprepare_kv(&kv_backend, prefix).await;
588 }
589 }
590
591 #[tokio::test]
592 async fn test_range() {
593 if let Some(kv_backend) = build_kv_backend().await {
594 let prefix = b"range/";
595 prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
596 test_kv_range_with_prefix(&kv_backend, prefix.to_vec()).await;
597 unprepare_kv(&kv_backend, prefix).await;
598 }
599 }
600
601 #[tokio::test]
602 async fn test_range_2() {
603 if let Some(kv_backend) = build_kv_backend().await {
604 test_kv_range_2_with_prefix(&kv_backend, b"range2/".to_vec()).await;
605 }
606 }
607
608 #[tokio::test]
609 async fn test_batch_get() {
610 if let Some(kv_backend) = build_kv_backend().await {
611 let prefix = b"batchGet/";
612 prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
613 test_kv_batch_get_with_prefix(&kv_backend, prefix.to_vec()).await;
614 unprepare_kv(&kv_backend, prefix).await;
615 }
616 }
617
618 #[tokio::test(flavor = "multi_thread")]
619 async fn test_compare_and_put() {
620 if let Some(kv_backend) = build_kv_backend().await {
621 let kv_backend = Arc::new(kv_backend);
622 test_kv_compare_and_put_with_prefix(kv_backend, b"compareAndPut/".to_vec()).await;
623 }
624 }
625
626 #[tokio::test]
627 async fn test_delete_range() {
628 if let Some(kv_backend) = build_kv_backend().await {
629 let prefix = b"deleteRange/";
630 prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
631 test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await;
632 unprepare_kv(&kv_backend, prefix).await;
633 }
634 }
635
636 #[tokio::test]
637 async fn test_batch_delete() {
638 if let Some(kv_backend) = build_kv_backend().await {
639 let prefix = b"batchDelete/";
640 prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
641 test_kv_batch_delete_with_prefix(&kv_backend, prefix.to_vec()).await;
642 unprepare_kv(&kv_backend, prefix).await;
643 }
644 }
645
646 #[tokio::test]
647 async fn test_etcd_txn() {
648 if let Some(kv_backend) = build_kv_backend().await {
649 test_txn_one_compare_op(&kv_backend).await;
650 text_txn_multi_compare_op(&kv_backend).await;
651 test_txn_compare_equal(&kv_backend).await;
652 test_txn_compare_greater(&kv_backend).await;
653 test_txn_compare_less(&kv_backend).await;
654 test_txn_compare_not_equal(&kv_backend).await;
655 }
656 }
657}