common_meta/kv_backend/
etcd.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::sync::Arc;
17
18use common_telemetry::info;
19use etcd_client::{
20    Client, DeleteOptions, GetOptions, PutOptions, Txn, TxnOp, TxnOpResponse, TxnResponse,
21};
22use snafu::{ResultExt, ensure};
23
24use crate::error::{self, Error, Result};
25use crate::kv_backend::txn::{Txn as KvTxn, TxnResponse as KvTxnResponse};
26use crate::kv_backend::{KvBackend, KvBackendRef, TxnService};
27use crate::metrics::METRIC_META_TXN_REQUEST;
28use crate::rpc::KeyValue;
29use crate::rpc::store::{
30    BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
31    BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse,
32    RangeRequest, RangeResponse,
33};
34
35const DEFAULT_MAX_DECODING_SIZE: usize = 32 * 1024 * 1024; // 32MB
36
37pub struct EtcdStore {
38    client: Client,
39    // Maximum number of operations permitted in a transaction.
40    // The etcd default configuration's `--max-txn-ops` is 128.
41    //
42    // For more detail, see: https://etcd.io/docs/v3.5/op-guide/configuration/
43    max_txn_ops: usize,
44    // Maximum decoding message size in bytes. Default 32MB.
45    max_decoding_size: usize,
46}
47
48impl EtcdStore {
49    pub async fn with_endpoints<E, S>(endpoints: S, max_txn_ops: usize) -> Result<KvBackendRef>
50    where
51        E: AsRef<str>,
52        S: AsRef<[E]>,
53    {
54        let client = Client::connect(endpoints, None)
55            .await
56            .context(error::ConnectEtcdSnafu)?;
57
58        Ok(Self::with_etcd_client(client, max_txn_ops))
59    }
60
61    pub fn with_etcd_client(client: Client, max_txn_ops: usize) -> KvBackendRef {
62        info!("Connected to etcd");
63        Arc::new(Self {
64            client,
65            max_txn_ops,
66            max_decoding_size: DEFAULT_MAX_DECODING_SIZE,
67        })
68    }
69
70    pub fn set_max_decoding_size(&mut self, max_decoding_size: usize) {
71        self.max_decoding_size = max_decoding_size;
72    }
73
74    fn kv_client(&self) -> etcd_client::KvClient {
75        self.client
76            .kv_client()
77            .max_decoding_message_size(self.max_decoding_size)
78    }
79
80    async fn do_multi_txn(&self, txn_ops: Vec<TxnOp>) -> Result<Vec<TxnResponse>> {
81        let max_txn_ops = self.max_txn_ops();
82        if txn_ops.len() < max_txn_ops {
83            // fast path
84            let _timer = METRIC_META_TXN_REQUEST
85                .with_label_values(&["etcd", "txn"])
86                .start_timer();
87            let txn = Txn::new().and_then(txn_ops);
88            let txn_res = self
89                .kv_client()
90                .txn(txn)
91                .await
92                .context(error::EtcdFailedSnafu)?;
93            return Ok(vec![txn_res]);
94        }
95
96        let txns = txn_ops
97            .chunks(max_txn_ops)
98            .map(|part| async move {
99                let _timer = METRIC_META_TXN_REQUEST
100                    .with_label_values(&["etcd", "txn"])
101                    .start_timer();
102                let txn = Txn::new().and_then(part);
103                self.kv_client().txn(txn).await
104            })
105            .collect::<Vec<_>>();
106
107        futures::future::try_join_all(txns)
108            .await
109            .context(error::EtcdFailedSnafu)
110    }
111}
112
113#[async_trait::async_trait]
114impl KvBackend for EtcdStore {
115    fn name(&self) -> &str {
116        "Etcd"
117    }
118
119    fn as_any(&self) -> &dyn Any {
120        self
121    }
122
123    async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
124        let Get { key, options } = req.try_into()?;
125
126        let mut res = self
127            .kv_client()
128            .get(key, options)
129            .await
130            .context(error::EtcdFailedSnafu)?;
131
132        let kvs = res
133            .take_kvs()
134            .into_iter()
135            .map(KeyValue::from)
136            .collect::<Vec<_>>();
137
138        Ok(RangeResponse {
139            kvs,
140            more: res.more(),
141        })
142    }
143
144    async fn put(&self, req: PutRequest) -> Result<PutResponse> {
145        let Put {
146            key,
147            value,
148            options,
149        } = req.try_into()?;
150
151        let mut res = self
152            .kv_client()
153            .put(key, value, options)
154            .await
155            .context(error::EtcdFailedSnafu)?;
156
157        let prev_kv = res.take_prev_key().map(KeyValue::from);
158        Ok(PutResponse { prev_kv })
159    }
160
161    async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
162        let BatchPut { kvs, options } = req.try_into()?;
163
164        let put_ops = kvs
165            .into_iter()
166            .map(|kv| TxnOp::put(kv.key, kv.value, options.clone()))
167            .collect::<Vec<_>>();
168
169        let txn_responses = self.do_multi_txn(put_ops).await?;
170
171        let mut prev_kvs = vec![];
172        for txn_res in txn_responses {
173            for op_res in txn_res.op_responses() {
174                match op_res {
175                    TxnOpResponse::Put(mut put_res) => {
176                        if let Some(prev_kv) = put_res.take_prev_key().map(KeyValue::from) {
177                            prev_kvs.push(prev_kv);
178                        }
179                    }
180                    _ => unreachable!(),
181                }
182            }
183        }
184
185        Ok(BatchPutResponse { prev_kvs })
186    }
187
188    async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
189        let BatchGet { keys, options } = req.try_into()?;
190
191        let get_ops: Vec<_> = keys
192            .into_iter()
193            .map(|key| TxnOp::get(key, options.clone()))
194            .collect();
195
196        let txn_responses = self.do_multi_txn(get_ops).await?;
197
198        let mut kvs = vec![];
199        for txn_res in txn_responses {
200            for op_res in txn_res.op_responses() {
201                let mut get_res = match op_res {
202                    TxnOpResponse::Get(get_res) => get_res,
203                    _ => unreachable!(),
204                };
205                kvs.extend(get_res.take_kvs().into_iter().map(KeyValue::from));
206            }
207        }
208
209        Ok(BatchGetResponse { kvs })
210    }
211
212    async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
213        let Delete { key, options } = req.try_into()?;
214
215        let mut res = self
216            .kv_client()
217            .delete(key, options)
218            .await
219            .context(error::EtcdFailedSnafu)?;
220
221        let prev_kvs = res
222            .take_prev_kvs()
223            .into_iter()
224            .map(KeyValue::from)
225            .collect::<Vec<_>>();
226
227        Ok(DeleteRangeResponse {
228            deleted: res.deleted(),
229            prev_kvs,
230        })
231    }
232
233    async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
234        let BatchDelete { keys, options } = req.try_into()?;
235
236        let mut prev_kvs = Vec::with_capacity(keys.len());
237
238        let delete_ops = keys
239            .into_iter()
240            .map(|key| TxnOp::delete(key, options.clone()))
241            .collect::<Vec<_>>();
242
243        let txn_responses = self.do_multi_txn(delete_ops).await?;
244
245        for txn_res in txn_responses {
246            for op_res in txn_res.op_responses() {
247                match op_res {
248                    TxnOpResponse::Delete(mut delete_res) => {
249                        delete_res
250                            .take_prev_kvs()
251                            .into_iter()
252                            .map(KeyValue::from)
253                            .for_each(|kv| {
254                                prev_kvs.push(kv);
255                            });
256                    }
257                    _ => unreachable!(),
258                }
259            }
260        }
261
262        Ok(BatchDeleteResponse { prev_kvs })
263    }
264}
265
266#[async_trait::async_trait]
267impl TxnService for EtcdStore {
268    type Error = Error;
269
270    async fn txn(&self, txn: KvTxn) -> Result<KvTxnResponse> {
271        let _timer = METRIC_META_TXN_REQUEST
272            .with_label_values(&["etcd", "txn"])
273            .start_timer();
274
275        let max_operations = txn.max_operations();
276
277        let etcd_txn: Txn = txn.into();
278        let txn_res = self
279            .kv_client()
280            .txn(etcd_txn)
281            .await
282            .context(error::EtcdTxnFailedSnafu { max_operations })?;
283        txn_res.try_into()
284    }
285
286    fn max_txn_ops(&self) -> usize {
287        self.max_txn_ops
288    }
289}
290
291struct Get {
292    key: Vec<u8>,
293    options: Option<GetOptions>,
294}
295
296impl TryFrom<RangeRequest> for Get {
297    type Error = Error;
298
299    fn try_from(req: RangeRequest) -> Result<Self> {
300        let RangeRequest {
301            key,
302            range_end,
303            limit,
304            keys_only,
305        } = req;
306
307        ensure!(!key.is_empty(), error::EmptyKeySnafu);
308
309        let mut options = GetOptions::default();
310        if !range_end.is_empty() {
311            options = options.with_range(range_end);
312            if limit > 0 {
313                options = options.with_limit(limit);
314            }
315        }
316        if keys_only {
317            options = options.with_keys_only();
318        }
319
320        Ok(Get {
321            key,
322            options: Some(options),
323        })
324    }
325}
326
327struct Put {
328    key: Vec<u8>,
329    value: Vec<u8>,
330    options: Option<PutOptions>,
331}
332
333impl TryFrom<PutRequest> for Put {
334    type Error = Error;
335
336    fn try_from(req: PutRequest) -> Result<Self> {
337        let PutRequest {
338            key,
339            value,
340            prev_kv,
341        } = req;
342
343        let mut options = PutOptions::default();
344        if prev_kv {
345            options = options.with_prev_key();
346        }
347
348        Ok(Put {
349            key,
350            value,
351            options: Some(options),
352        })
353    }
354}
355
356struct BatchGet {
357    keys: Vec<Vec<u8>>,
358    options: Option<GetOptions>,
359}
360
361impl TryFrom<BatchGetRequest> for BatchGet {
362    type Error = Error;
363
364    fn try_from(req: BatchGetRequest) -> Result<Self> {
365        let BatchGetRequest { keys } = req;
366
367        let options = GetOptions::default();
368
369        Ok(BatchGet {
370            keys,
371            options: Some(options),
372        })
373    }
374}
375
376struct BatchPut {
377    kvs: Vec<KeyValue>,
378    options: Option<PutOptions>,
379}
380
381impl TryFrom<BatchPutRequest> for BatchPut {
382    type Error = Error;
383
384    fn try_from(req: BatchPutRequest) -> Result<Self> {
385        let BatchPutRequest { kvs, prev_kv } = req;
386
387        let mut options = PutOptions::default();
388        if prev_kv {
389            options = options.with_prev_key();
390        }
391
392        Ok(BatchPut {
393            kvs,
394            options: Some(options),
395        })
396    }
397}
398
399struct BatchDelete {
400    keys: Vec<Vec<u8>>,
401    options: Option<DeleteOptions>,
402}
403
404impl TryFrom<BatchDeleteRequest> for BatchDelete {
405    type Error = Error;
406
407    fn try_from(req: BatchDeleteRequest) -> Result<Self> {
408        let BatchDeleteRequest { keys, prev_kv } = req;
409
410        let mut options = DeleteOptions::default();
411        if prev_kv {
412            options = options.with_prev_key();
413        }
414
415        Ok(BatchDelete {
416            keys,
417            options: Some(options),
418        })
419    }
420}
421
422struct Delete {
423    key: Vec<u8>,
424    options: Option<DeleteOptions>,
425}
426
427impl TryFrom<DeleteRangeRequest> for Delete {
428    type Error = Error;
429
430    fn try_from(req: DeleteRangeRequest) -> Result<Self> {
431        let DeleteRangeRequest {
432            key,
433            range_end,
434            prev_kv,
435        } = req;
436
437        ensure!(!key.is_empty(), error::EmptyKeySnafu);
438
439        let mut options = DeleteOptions::default();
440        if !range_end.is_empty() {
441            options = options.with_range(range_end);
442        }
443        if prev_kv {
444            options = options.with_prev_key();
445        }
446
447        Ok(Delete {
448            key,
449            options: Some(options),
450        })
451    }
452}
453
454#[cfg(test)]
455mod tests {
456    use super::*;
457
458    #[test]
459    fn test_parse_get() {
460        let req = RangeRequest {
461            key: b"test_key".to_vec(),
462            range_end: b"test_range_end".to_vec(),
463            limit: 64,
464            keys_only: true,
465        };
466
467        let get: Get = req.try_into().unwrap();
468
469        assert_eq!(b"test_key".to_vec(), get.key);
470        let _ = get.options.unwrap();
471    }
472
473    #[test]
474    fn test_parse_put() {
475        let req = PutRequest {
476            key: b"test_key".to_vec(),
477            value: b"test_value".to_vec(),
478            prev_kv: true,
479        };
480
481        let put: Put = req.try_into().unwrap();
482
483        assert_eq!(b"test_key".to_vec(), put.key);
484        assert_eq!(b"test_value".to_vec(), put.value);
485        let _ = put.options.unwrap();
486    }
487
488    #[test]
489    fn test_parse_batch_get() {
490        let req = BatchGetRequest {
491            keys: vec![b"k1".to_vec(), b"k2".to_vec(), b"k3".to_vec()],
492        };
493
494        let batch_get: BatchGet = req.try_into().unwrap();
495        let keys = batch_get.keys;
496
497        assert_eq!(b"k1".to_vec(), keys.first().unwrap().clone());
498        assert_eq!(b"k2".to_vec(), keys.get(1).unwrap().clone());
499        assert_eq!(b"k3".to_vec(), keys.get(2).unwrap().clone());
500    }
501
502    #[test]
503    fn test_parse_batch_put() {
504        let req = BatchPutRequest {
505            kvs: vec![KeyValue {
506                key: b"test_key".to_vec(),
507                value: b"test_value".to_vec(),
508            }],
509            prev_kv: true,
510        };
511
512        let batch_put: BatchPut = req.try_into().unwrap();
513
514        let kv = batch_put.kvs.first().unwrap();
515        assert_eq!(b"test_key", kv.key());
516        assert_eq!(b"test_value", kv.value());
517        let _ = batch_put.options.unwrap();
518    }
519
520    #[test]
521    fn test_parse_batch_delete() {
522        let req = BatchDeleteRequest {
523            keys: vec![b"k1".to_vec(), b"k2".to_vec(), b"k3".to_vec()],
524            prev_kv: true,
525        };
526
527        let batch_delete: BatchDelete = req.try_into().unwrap();
528
529        assert_eq!(batch_delete.keys.len(), 3);
530        assert_eq!(b"k1".to_vec(), batch_delete.keys.first().unwrap().clone());
531        assert_eq!(b"k2".to_vec(), batch_delete.keys.get(1).unwrap().clone());
532        assert_eq!(b"k3".to_vec(), batch_delete.keys.get(2).unwrap().clone());
533        let _ = batch_delete.options.unwrap();
534    }
535
536    #[test]
537    fn test_parse_delete() {
538        let req = DeleteRangeRequest {
539            key: b"test_key".to_vec(),
540            range_end: b"test_range_end".to_vec(),
541            prev_kv: true,
542        };
543
544        let delete: Delete = req.try_into().unwrap();
545
546        assert_eq!(b"test_key".to_vec(), delete.key);
547        let _ = delete.options.unwrap();
548    }
549
550    use crate::kv_backend::test::{
551        prepare_kv_with_prefix, test_kv_batch_delete_with_prefix, test_kv_batch_get_with_prefix,
552        test_kv_compare_and_put_with_prefix, test_kv_delete_range_with_prefix,
553        test_kv_put_with_prefix, test_kv_range_2_with_prefix, test_kv_range_with_prefix,
554        test_txn_compare_equal, test_txn_compare_greater, test_txn_compare_less,
555        test_txn_compare_not_equal, test_txn_one_compare_op, text_txn_multi_compare_op,
556        unprepare_kv,
557    };
558
559    async fn build_kv_backend() -> Option<EtcdStore> {
560        let endpoints = std::env::var("GT_ETCD_ENDPOINTS").unwrap_or_default();
561        if endpoints.is_empty() {
562            return None;
563        }
564
565        let endpoints = endpoints
566            .split(',')
567            .map(|s| s.to_string())
568            .collect::<Vec<String>>();
569
570        let client = Client::connect(endpoints, None)
571            .await
572            .expect("malformed endpoints");
573
574        Some(EtcdStore {
575            client,
576            max_txn_ops: 128,
577            max_decoding_size: DEFAULT_MAX_DECODING_SIZE,
578        })
579    }
580
581    #[tokio::test]
582    async fn test_put() {
583        if let Some(kv_backend) = build_kv_backend().await {
584            let prefix = b"put/";
585            prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
586            test_kv_put_with_prefix(&kv_backend, prefix.to_vec()).await;
587            unprepare_kv(&kv_backend, prefix).await;
588        }
589    }
590
591    #[tokio::test]
592    async fn test_range() {
593        if let Some(kv_backend) = build_kv_backend().await {
594            let prefix = b"range/";
595            prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
596            test_kv_range_with_prefix(&kv_backend, prefix.to_vec()).await;
597            unprepare_kv(&kv_backend, prefix).await;
598        }
599    }
600
601    #[tokio::test]
602    async fn test_range_2() {
603        if let Some(kv_backend) = build_kv_backend().await {
604            test_kv_range_2_with_prefix(&kv_backend, b"range2/".to_vec()).await;
605        }
606    }
607
608    #[tokio::test]
609    async fn test_batch_get() {
610        if let Some(kv_backend) = build_kv_backend().await {
611            let prefix = b"batchGet/";
612            prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
613            test_kv_batch_get_with_prefix(&kv_backend, prefix.to_vec()).await;
614            unprepare_kv(&kv_backend, prefix).await;
615        }
616    }
617
618    #[tokio::test(flavor = "multi_thread")]
619    async fn test_compare_and_put() {
620        if let Some(kv_backend) = build_kv_backend().await {
621            let kv_backend = Arc::new(kv_backend);
622            test_kv_compare_and_put_with_prefix(kv_backend, b"compareAndPut/".to_vec()).await;
623        }
624    }
625
626    #[tokio::test]
627    async fn test_delete_range() {
628        if let Some(kv_backend) = build_kv_backend().await {
629            let prefix = b"deleteRange/";
630            prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
631            test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await;
632            unprepare_kv(&kv_backend, prefix).await;
633        }
634    }
635
636    #[tokio::test]
637    async fn test_batch_delete() {
638        if let Some(kv_backend) = build_kv_backend().await {
639            let prefix = b"batchDelete/";
640            prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
641            test_kv_batch_delete_with_prefix(&kv_backend, prefix.to_vec()).await;
642            unprepare_kv(&kv_backend, prefix).await;
643        }
644    }
645
646    #[tokio::test]
647    async fn test_etcd_txn() {
648        if let Some(kv_backend) = build_kv_backend().await {
649            test_txn_one_compare_op(&kv_backend).await;
650            text_txn_multi_compare_op(&kv_backend).await;
651            test_txn_compare_equal(&kv_backend).await;
652            test_txn_compare_greater(&kv_backend).await;
653            test_txn_compare_less(&kv_backend).await;
654            test_txn_compare_not_equal(&kv_backend).await;
655        }
656    }
657}