common_meta/kv_backend/
etcd.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::sync::Arc;
17
18use common_telemetry::info;
19use etcd_client::{
20    Client, DeleteOptions, GetOptions, PutOptions, Txn, TxnOp, TxnOpResponse, TxnResponse,
21};
22use snafu::{ensure, ResultExt};
23
24use crate::error::{self, Error, Result};
25use crate::kv_backend::txn::{Txn as KvTxn, TxnResponse as KvTxnResponse};
26use crate::kv_backend::{KvBackend, KvBackendRef, TxnService};
27use crate::metrics::METRIC_META_TXN_REQUEST;
28use crate::rpc::store::{
29    BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
30    BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse,
31    RangeRequest, RangeResponse,
32};
33use crate::rpc::KeyValue;
34
35pub struct EtcdStore {
36    client: Client,
37    // Maximum number of operations permitted in a transaction.
38    // The etcd default configuration's `--max-txn-ops` is 128.
39    //
40    // For more detail, see: https://etcd.io/docs/v3.5/op-guide/configuration/
41    max_txn_ops: usize,
42}
43
44impl EtcdStore {
45    pub async fn with_endpoints<E, S>(endpoints: S, max_txn_ops: usize) -> Result<KvBackendRef>
46    where
47        E: AsRef<str>,
48        S: AsRef<[E]>,
49    {
50        let client = Client::connect(endpoints, None)
51            .await
52            .context(error::ConnectEtcdSnafu)?;
53
54        Ok(Self::with_etcd_client(client, max_txn_ops))
55    }
56
57    pub fn with_etcd_client(client: Client, max_txn_ops: usize) -> KvBackendRef {
58        info!("Connected to etcd");
59        Arc::new(Self {
60            client,
61            max_txn_ops,
62        })
63    }
64
65    async fn do_multi_txn(&self, txn_ops: Vec<TxnOp>) -> Result<Vec<TxnResponse>> {
66        let max_txn_ops = self.max_txn_ops();
67        if txn_ops.len() < max_txn_ops {
68            // fast path
69            let _timer = METRIC_META_TXN_REQUEST
70                .with_label_values(&["etcd", "txn"])
71                .start_timer();
72            let txn = Txn::new().and_then(txn_ops);
73            let txn_res = self
74                .client
75                .kv_client()
76                .txn(txn)
77                .await
78                .context(error::EtcdFailedSnafu)?;
79            return Ok(vec![txn_res]);
80        }
81
82        let txns = txn_ops
83            .chunks(max_txn_ops)
84            .map(|part| async move {
85                let _timer = METRIC_META_TXN_REQUEST
86                    .with_label_values(&["etcd", "txn"])
87                    .start_timer();
88                let txn = Txn::new().and_then(part);
89                self.client.kv_client().txn(txn).await
90            })
91            .collect::<Vec<_>>();
92
93        futures::future::try_join_all(txns)
94            .await
95            .context(error::EtcdFailedSnafu)
96    }
97}
98
99#[async_trait::async_trait]
100impl KvBackend for EtcdStore {
101    fn name(&self) -> &str {
102        "Etcd"
103    }
104
105    fn as_any(&self) -> &dyn Any {
106        self
107    }
108
109    async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
110        let Get { key, options } = req.try_into()?;
111
112        let mut res = self
113            .client
114            .kv_client()
115            .get(key, options)
116            .await
117            .context(error::EtcdFailedSnafu)?;
118
119        let kvs = res
120            .take_kvs()
121            .into_iter()
122            .map(KeyValue::from)
123            .collect::<Vec<_>>();
124
125        Ok(RangeResponse {
126            kvs,
127            more: res.more(),
128        })
129    }
130
131    async fn put(&self, req: PutRequest) -> Result<PutResponse> {
132        let Put {
133            key,
134            value,
135            options,
136        } = req.try_into()?;
137
138        let mut res = self
139            .client
140            .kv_client()
141            .put(key, value, options)
142            .await
143            .context(error::EtcdFailedSnafu)?;
144
145        let prev_kv = res.take_prev_key().map(KeyValue::from);
146        Ok(PutResponse { prev_kv })
147    }
148
149    async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
150        let BatchPut { kvs, options } = req.try_into()?;
151
152        let put_ops = kvs
153            .into_iter()
154            .map(|kv| TxnOp::put(kv.key, kv.value, options.clone()))
155            .collect::<Vec<_>>();
156
157        let txn_responses = self.do_multi_txn(put_ops).await?;
158
159        let mut prev_kvs = vec![];
160        for txn_res in txn_responses {
161            for op_res in txn_res.op_responses() {
162                match op_res {
163                    TxnOpResponse::Put(mut put_res) => {
164                        if let Some(prev_kv) = put_res.take_prev_key().map(KeyValue::from) {
165                            prev_kvs.push(prev_kv);
166                        }
167                    }
168                    _ => unreachable!(),
169                }
170            }
171        }
172
173        Ok(BatchPutResponse { prev_kvs })
174    }
175
176    async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
177        let BatchGet { keys, options } = req.try_into()?;
178
179        let get_ops: Vec<_> = keys
180            .into_iter()
181            .map(|key| TxnOp::get(key, options.clone()))
182            .collect();
183
184        let txn_responses = self.do_multi_txn(get_ops).await?;
185
186        let mut kvs = vec![];
187        for txn_res in txn_responses {
188            for op_res in txn_res.op_responses() {
189                let mut get_res = match op_res {
190                    TxnOpResponse::Get(get_res) => get_res,
191                    _ => unreachable!(),
192                };
193                kvs.extend(get_res.take_kvs().into_iter().map(KeyValue::from));
194            }
195        }
196
197        Ok(BatchGetResponse { kvs })
198    }
199
200    async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
201        let Delete { key, options } = req.try_into()?;
202
203        let mut res = self
204            .client
205            .kv_client()
206            .delete(key, options)
207            .await
208            .context(error::EtcdFailedSnafu)?;
209
210        let prev_kvs = res
211            .take_prev_kvs()
212            .into_iter()
213            .map(KeyValue::from)
214            .collect::<Vec<_>>();
215
216        Ok(DeleteRangeResponse {
217            deleted: res.deleted(),
218            prev_kvs,
219        })
220    }
221
222    async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
223        let BatchDelete { keys, options } = req.try_into()?;
224
225        let mut prev_kvs = Vec::with_capacity(keys.len());
226
227        let delete_ops = keys
228            .into_iter()
229            .map(|key| TxnOp::delete(key, options.clone()))
230            .collect::<Vec<_>>();
231
232        let txn_responses = self.do_multi_txn(delete_ops).await?;
233
234        for txn_res in txn_responses {
235            for op_res in txn_res.op_responses() {
236                match op_res {
237                    TxnOpResponse::Delete(mut delete_res) => {
238                        delete_res
239                            .take_prev_kvs()
240                            .into_iter()
241                            .map(KeyValue::from)
242                            .for_each(|kv| {
243                                prev_kvs.push(kv);
244                            });
245                    }
246                    _ => unreachable!(),
247                }
248            }
249        }
250
251        Ok(BatchDeleteResponse { prev_kvs })
252    }
253}
254
255#[async_trait::async_trait]
256impl TxnService for EtcdStore {
257    type Error = Error;
258
259    async fn txn(&self, txn: KvTxn) -> Result<KvTxnResponse> {
260        let _timer = METRIC_META_TXN_REQUEST
261            .with_label_values(&["etcd", "txn"])
262            .start_timer();
263
264        let max_operations = txn.max_operations();
265
266        let etcd_txn: Txn = txn.into();
267        let txn_res = self
268            .client
269            .kv_client()
270            .txn(etcd_txn)
271            .await
272            .context(error::EtcdTxnFailedSnafu { max_operations })?;
273        txn_res.try_into()
274    }
275
276    fn max_txn_ops(&self) -> usize {
277        self.max_txn_ops
278    }
279}
280
281struct Get {
282    key: Vec<u8>,
283    options: Option<GetOptions>,
284}
285
286impl TryFrom<RangeRequest> for Get {
287    type Error = Error;
288
289    fn try_from(req: RangeRequest) -> Result<Self> {
290        let RangeRequest {
291            key,
292            range_end,
293            limit,
294            keys_only,
295        } = req;
296
297        ensure!(!key.is_empty(), error::EmptyKeySnafu);
298
299        let mut options = GetOptions::default();
300        if !range_end.is_empty() {
301            options = options.with_range(range_end);
302            if limit > 0 {
303                options = options.with_limit(limit);
304            }
305        }
306        if keys_only {
307            options = options.with_keys_only();
308        }
309
310        Ok(Get {
311            key,
312            options: Some(options),
313        })
314    }
315}
316
317struct Put {
318    key: Vec<u8>,
319    value: Vec<u8>,
320    options: Option<PutOptions>,
321}
322
323impl TryFrom<PutRequest> for Put {
324    type Error = Error;
325
326    fn try_from(req: PutRequest) -> Result<Self> {
327        let PutRequest {
328            key,
329            value,
330            prev_kv,
331        } = req;
332
333        let mut options = PutOptions::default();
334        if prev_kv {
335            options = options.with_prev_key();
336        }
337
338        Ok(Put {
339            key,
340            value,
341            options: Some(options),
342        })
343    }
344}
345
346struct BatchGet {
347    keys: Vec<Vec<u8>>,
348    options: Option<GetOptions>,
349}
350
351impl TryFrom<BatchGetRequest> for BatchGet {
352    type Error = Error;
353
354    fn try_from(req: BatchGetRequest) -> Result<Self> {
355        let BatchGetRequest { keys } = req;
356
357        let options = GetOptions::default();
358
359        Ok(BatchGet {
360            keys,
361            options: Some(options),
362        })
363    }
364}
365
366struct BatchPut {
367    kvs: Vec<KeyValue>,
368    options: Option<PutOptions>,
369}
370
371impl TryFrom<BatchPutRequest> for BatchPut {
372    type Error = Error;
373
374    fn try_from(req: BatchPutRequest) -> Result<Self> {
375        let BatchPutRequest { kvs, prev_kv } = req;
376
377        let mut options = PutOptions::default();
378        if prev_kv {
379            options = options.with_prev_key();
380        }
381
382        Ok(BatchPut {
383            kvs,
384            options: Some(options),
385        })
386    }
387}
388
389struct BatchDelete {
390    keys: Vec<Vec<u8>>,
391    options: Option<DeleteOptions>,
392}
393
394impl TryFrom<BatchDeleteRequest> for BatchDelete {
395    type Error = Error;
396
397    fn try_from(req: BatchDeleteRequest) -> Result<Self> {
398        let BatchDeleteRequest { keys, prev_kv } = req;
399
400        let mut options = DeleteOptions::default();
401        if prev_kv {
402            options = options.with_prev_key();
403        }
404
405        Ok(BatchDelete {
406            keys,
407            options: Some(options),
408        })
409    }
410}
411
412struct Delete {
413    key: Vec<u8>,
414    options: Option<DeleteOptions>,
415}
416
417impl TryFrom<DeleteRangeRequest> for Delete {
418    type Error = Error;
419
420    fn try_from(req: DeleteRangeRequest) -> Result<Self> {
421        let DeleteRangeRequest {
422            key,
423            range_end,
424            prev_kv,
425        } = req;
426
427        ensure!(!key.is_empty(), error::EmptyKeySnafu);
428
429        let mut options = DeleteOptions::default();
430        if !range_end.is_empty() {
431            options = options.with_range(range_end);
432        }
433        if prev_kv {
434            options = options.with_prev_key();
435        }
436
437        Ok(Delete {
438            key,
439            options: Some(options),
440        })
441    }
442}
443
444#[cfg(test)]
445mod tests {
446    use super::*;
447
448    #[test]
449    fn test_parse_get() {
450        let req = RangeRequest {
451            key: b"test_key".to_vec(),
452            range_end: b"test_range_end".to_vec(),
453            limit: 64,
454            keys_only: true,
455        };
456
457        let get: Get = req.try_into().unwrap();
458
459        assert_eq!(b"test_key".to_vec(), get.key);
460        let _ = get.options.unwrap();
461    }
462
463    #[test]
464    fn test_parse_put() {
465        let req = PutRequest {
466            key: b"test_key".to_vec(),
467            value: b"test_value".to_vec(),
468            prev_kv: true,
469        };
470
471        let put: Put = req.try_into().unwrap();
472
473        assert_eq!(b"test_key".to_vec(), put.key);
474        assert_eq!(b"test_value".to_vec(), put.value);
475        let _ = put.options.unwrap();
476    }
477
478    #[test]
479    fn test_parse_batch_get() {
480        let req = BatchGetRequest {
481            keys: vec![b"k1".to_vec(), b"k2".to_vec(), b"k3".to_vec()],
482        };
483
484        let batch_get: BatchGet = req.try_into().unwrap();
485        let keys = batch_get.keys;
486
487        assert_eq!(b"k1".to_vec(), keys.first().unwrap().clone());
488        assert_eq!(b"k2".to_vec(), keys.get(1).unwrap().clone());
489        assert_eq!(b"k3".to_vec(), keys.get(2).unwrap().clone());
490    }
491
492    #[test]
493    fn test_parse_batch_put() {
494        let req = BatchPutRequest {
495            kvs: vec![KeyValue {
496                key: b"test_key".to_vec(),
497                value: b"test_value".to_vec(),
498            }],
499            prev_kv: true,
500        };
501
502        let batch_put: BatchPut = req.try_into().unwrap();
503
504        let kv = batch_put.kvs.first().unwrap();
505        assert_eq!(b"test_key", kv.key());
506        assert_eq!(b"test_value", kv.value());
507        let _ = batch_put.options.unwrap();
508    }
509
510    #[test]
511    fn test_parse_batch_delete() {
512        let req = BatchDeleteRequest {
513            keys: vec![b"k1".to_vec(), b"k2".to_vec(), b"k3".to_vec()],
514            prev_kv: true,
515        };
516
517        let batch_delete: BatchDelete = req.try_into().unwrap();
518
519        assert_eq!(batch_delete.keys.len(), 3);
520        assert_eq!(b"k1".to_vec(), batch_delete.keys.first().unwrap().clone());
521        assert_eq!(b"k2".to_vec(), batch_delete.keys.get(1).unwrap().clone());
522        assert_eq!(b"k3".to_vec(), batch_delete.keys.get(2).unwrap().clone());
523        let _ = batch_delete.options.unwrap();
524    }
525
526    #[test]
527    fn test_parse_delete() {
528        let req = DeleteRangeRequest {
529            key: b"test_key".to_vec(),
530            range_end: b"test_range_end".to_vec(),
531            prev_kv: true,
532        };
533
534        let delete: Delete = req.try_into().unwrap();
535
536        assert_eq!(b"test_key".to_vec(), delete.key);
537        let _ = delete.options.unwrap();
538    }
539
540    use crate::kv_backend::test::{
541        prepare_kv_with_prefix, test_kv_batch_delete_with_prefix, test_kv_batch_get_with_prefix,
542        test_kv_compare_and_put_with_prefix, test_kv_delete_range_with_prefix,
543        test_kv_put_with_prefix, test_kv_range_2_with_prefix, test_kv_range_with_prefix,
544        test_txn_compare_equal, test_txn_compare_greater, test_txn_compare_less,
545        test_txn_compare_not_equal, test_txn_one_compare_op, text_txn_multi_compare_op,
546        unprepare_kv,
547    };
548
549    async fn build_kv_backend() -> Option<EtcdStore> {
550        let endpoints = std::env::var("GT_ETCD_ENDPOINTS").unwrap_or_default();
551        if endpoints.is_empty() {
552            return None;
553        }
554
555        let endpoints = endpoints
556            .split(',')
557            .map(|s| s.to_string())
558            .collect::<Vec<String>>();
559
560        let client = Client::connect(endpoints, None)
561            .await
562            .expect("malformed endpoints");
563
564        Some(EtcdStore {
565            client,
566            max_txn_ops: 128,
567        })
568    }
569
570    #[tokio::test]
571    async fn test_put() {
572        if let Some(kv_backend) = build_kv_backend().await {
573            let prefix = b"put/";
574            prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
575            test_kv_put_with_prefix(&kv_backend, prefix.to_vec()).await;
576            unprepare_kv(&kv_backend, prefix).await;
577        }
578    }
579
580    #[tokio::test]
581    async fn test_range() {
582        if let Some(kv_backend) = build_kv_backend().await {
583            let prefix = b"range/";
584            prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
585            test_kv_range_with_prefix(&kv_backend, prefix.to_vec()).await;
586            unprepare_kv(&kv_backend, prefix).await;
587        }
588    }
589
590    #[tokio::test]
591    async fn test_range_2() {
592        if let Some(kv_backend) = build_kv_backend().await {
593            test_kv_range_2_with_prefix(&kv_backend, b"range2/".to_vec()).await;
594        }
595    }
596
597    #[tokio::test]
598    async fn test_batch_get() {
599        if let Some(kv_backend) = build_kv_backend().await {
600            let prefix = b"batchGet/";
601            prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
602            test_kv_batch_get_with_prefix(&kv_backend, prefix.to_vec()).await;
603            unprepare_kv(&kv_backend, prefix).await;
604        }
605    }
606
607    #[tokio::test(flavor = "multi_thread")]
608    async fn test_compare_and_put() {
609        if let Some(kv_backend) = build_kv_backend().await {
610            let kv_backend = Arc::new(kv_backend);
611            test_kv_compare_and_put_with_prefix(kv_backend, b"compareAndPut/".to_vec()).await;
612        }
613    }
614
615    #[tokio::test]
616    async fn test_delete_range() {
617        if let Some(kv_backend) = build_kv_backend().await {
618            let prefix = b"deleteRange/";
619            prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
620            test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await;
621            unprepare_kv(&kv_backend, prefix).await;
622        }
623    }
624
625    #[tokio::test]
626    async fn test_batch_delete() {
627        if let Some(kv_backend) = build_kv_backend().await {
628            let prefix = b"batchDelete/";
629            prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
630            test_kv_batch_delete_with_prefix(&kv_backend, prefix.to_vec()).await;
631            unprepare_kv(&kv_backend, prefix).await;
632        }
633    }
634
635    #[tokio::test]
636    async fn test_etcd_txn() {
637        if let Some(kv_backend) = build_kv_backend().await {
638            test_txn_one_compare_op(&kv_backend).await;
639            text_txn_multi_compare_op(&kv_backend).await;
640            test_txn_compare_equal(&kv_backend).await;
641            test_txn_compare_greater(&kv_backend).await;
642            test_txn_compare_less(&kv_backend).await;
643            test_txn_compare_not_equal(&kv_backend).await;
644        }
645    }
646}