common_meta/kv_backend/
rds.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::HashMap;
17use std::marker::PhantomData;
18use std::time::Duration;
19
20use backon::{BackoffBuilder, ExponentialBuilder};
21use common_telemetry::debug;
22
23use crate::error::{Error, RdsTransactionRetryFailedSnafu, Result};
24use crate::kv_backend::txn::{
25    Compare, Txn as KvTxn, TxnOp, TxnOpResponse, TxnResponse as KvTxnResponse,
26};
27use crate::kv_backend::{KvBackend, TxnService};
28use crate::metrics::METRIC_META_TXN_REQUEST;
29use crate::rpc::store::{
30    BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
31    BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse,
32    RangeRequest, RangeResponse,
33};
34use crate::rpc::KeyValue;
35
36const RDS_STORE_OP_BATCH_GET: &str = "batch_get";
37const RDS_STORE_OP_BATCH_PUT: &str = "batch_put";
38const RDS_STORE_OP_RANGE_QUERY: &str = "range_query";
39const RDS_STORE_OP_RANGE_DELETE: &str = "range_delete";
40const RDS_STORE_OP_BATCH_DELETE: &str = "batch_delete";
41
42#[cfg(feature = "pg_kvbackend")]
43mod postgres;
44#[cfg(feature = "pg_kvbackend")]
45pub use postgres::PgStore;
46
47#[cfg(feature = "mysql_kvbackend")]
48mod mysql;
49#[cfg(feature = "mysql_kvbackend")]
50pub use mysql::MySqlStore;
51
52const RDS_STORE_TXN_RETRY_COUNT: usize = 3;
53
54/// Query executor for rds. It can execute queries or generate a transaction executor.
55#[async_trait::async_trait]
56pub trait Executor: Send + Sync {
57    type Transaction<'a>: 'a + Transaction<'a>
58    where
59        Self: 'a;
60
61    fn name() -> &'static str;
62
63    async fn query(&mut self, query: &str, params: &[&Vec<u8>]) -> Result<Vec<KeyValue>>;
64
65    /// Some queries don't need to return any result, such as `DELETE`.
66    async fn execute(&mut self, query: &str, params: &[&Vec<u8>]) -> Result<()> {
67        self.query(query, params).await?;
68        Ok(())
69    }
70
71    async fn txn_executor<'a>(&'a mut self) -> Result<Self::Transaction<'a>>;
72}
73
74/// Transaction query executor for rds. It can execute queries in transaction or commit the transaction.
75#[async_trait::async_trait]
76pub trait Transaction<'a>: Send + Sync {
77    async fn query(&mut self, query: &str, params: &[&Vec<u8>]) -> Result<Vec<KeyValue>>;
78
79    async fn execute(&mut self, query: &str, params: &[&Vec<u8>]) -> Result<()> {
80        self.query(query, params).await?;
81        Ok(())
82    }
83
84    async fn commit(self) -> Result<()>;
85}
86
87/// Factory for creating default and transaction query executors.
88#[async_trait::async_trait]
89pub trait ExecutorFactory<T: Executor>: Send + Sync {
90    async fn default_executor(&self) -> Result<T>;
91
92    async fn txn_executor<'a>(&self, default_executor: &'a mut T) -> Result<T::Transaction<'a>>;
93}
94
95/// Rds backed store for metsrv
96pub struct RdsStore<T, S, R>
97where
98    T: Executor + Send + Sync,
99    S: ExecutorFactory<T> + Send + Sync,
100{
101    max_txn_ops: usize,
102    txn_retry_count: usize,
103    executor_factory: S,
104    sql_template_set: R,
105    _phantom: PhantomData<T>,
106}
107
108pub enum ExecutorImpl<'a, T: Executor + 'a> {
109    Default(T),
110    Txn(T::Transaction<'a>),
111}
112
113impl<T: Executor> ExecutorImpl<'_, T> {
114    async fn query(&mut self, query: &str, params: &Vec<&Vec<u8>>) -> Result<Vec<KeyValue>> {
115        match self {
116            Self::Default(executor) => executor.query(query, params).await,
117            Self::Txn(executor) => executor.query(query, params).await,
118        }
119    }
120
121    #[warn(dead_code)] // Used in #[cfg(feature = "mysql_kvbackend")]
122    async fn execute(&mut self, query: &str, params: &Vec<&Vec<u8>>) -> Result<()> {
123        match self {
124            Self::Default(executor) => executor.execute(query, params).await,
125            Self::Txn(executor) => executor.execute(query, params).await,
126        }
127    }
128
129    async fn commit(self) -> Result<()> {
130        match self {
131            Self::Txn(executor) => executor.commit().await,
132            _ => Ok(()),
133        }
134    }
135}
136
137#[async_trait::async_trait]
138pub trait KvQueryExecutor<T: Executor> {
139    async fn range_with_query_executor(
140        &self,
141        query_executor: &mut ExecutorImpl<'_, T>,
142        req: RangeRequest,
143    ) -> Result<RangeResponse>;
144
145    async fn put_with_query_executor(
146        &self,
147        query_executor: &mut ExecutorImpl<'_, T>,
148        req: PutRequest,
149    ) -> Result<PutResponse> {
150        let kv = KeyValue {
151            key: req.key,
152            value: req.value,
153        };
154        let mut res = self
155            .batch_put_with_query_executor(
156                query_executor,
157                BatchPutRequest {
158                    kvs: vec![kv],
159                    prev_kv: req.prev_kv,
160                },
161            )
162            .await?;
163
164        if !res.prev_kvs.is_empty() {
165            debug_assert!(req.prev_kv);
166            return Ok(PutResponse {
167                prev_kv: Some(res.prev_kvs.remove(0)),
168            });
169        }
170        Ok(PutResponse::default())
171    }
172
173    async fn batch_put_with_query_executor(
174        &self,
175        query_executor: &mut ExecutorImpl<'_, T>,
176        req: BatchPutRequest,
177    ) -> Result<BatchPutResponse>;
178
179    /// Batch get with certain client. It's needed for a client with transaction.
180    async fn batch_get_with_query_executor(
181        &self,
182        query_executor: &mut ExecutorImpl<'_, T>,
183        req: BatchGetRequest,
184    ) -> Result<BatchGetResponse>;
185
186    async fn delete_range_with_query_executor(
187        &self,
188        query_executor: &mut ExecutorImpl<'_, T>,
189        req: DeleteRangeRequest,
190    ) -> Result<DeleteRangeResponse>;
191
192    async fn batch_delete_with_query_executor(
193        &self,
194        query_executor: &mut ExecutorImpl<'_, T>,
195        req: BatchDeleteRequest,
196    ) -> Result<BatchDeleteResponse>;
197}
198
199impl<T, S, R> RdsStore<T, S, R>
200where
201    Self: KvQueryExecutor<T> + Send + Sync,
202    T: Executor + Send + Sync,
203    S: ExecutorFactory<T> + Send + Sync,
204{
205    async fn execute_txn_cmp(
206        &self,
207        query_executor: &mut ExecutorImpl<'_, T>,
208        cmp: &[Compare],
209    ) -> Result<bool> {
210        let batch_get_req = BatchGetRequest {
211            keys: cmp.iter().map(|c| c.key.clone()).collect(),
212        };
213        let res = self
214            .batch_get_with_query_executor(query_executor, batch_get_req)
215            .await?;
216        debug!("batch get res: {:?}", res);
217        let res_map = res
218            .kvs
219            .into_iter()
220            .map(|kv| (kv.key, kv.value))
221            .collect::<HashMap<Vec<u8>, Vec<u8>>>();
222        for c in cmp {
223            let value = res_map.get(&c.key);
224            if !c.compare_value(value) {
225                return Ok(false);
226            }
227        }
228        Ok(true)
229    }
230
231    /// Execute a batch of transaction operations. This function is only used for transactions with the same operation type.
232    async fn try_batch_txn(
233        &self,
234        query_executor: &mut ExecutorImpl<'_, T>,
235        txn_ops: &[TxnOp],
236    ) -> Result<Option<Vec<TxnOpResponse>>> {
237        if !check_txn_ops(txn_ops)? {
238            return Ok(None);
239        }
240        // Safety: txn_ops is not empty
241        match txn_ops.first().unwrap() {
242            TxnOp::Delete(_) => self.handle_batch_delete(query_executor, txn_ops).await,
243            TxnOp::Put(_, _) => self.handle_batch_put(query_executor, txn_ops).await,
244            TxnOp::Get(_) => self.handle_batch_get(query_executor, txn_ops).await,
245        }
246    }
247
248    async fn handle_batch_delete(
249        &self,
250        query_executor: &mut ExecutorImpl<'_, T>,
251        txn_ops: &[TxnOp],
252    ) -> Result<Option<Vec<TxnOpResponse>>> {
253        let mut batch_del_req = BatchDeleteRequest {
254            keys: vec![],
255            prev_kv: true,
256        };
257        for op in txn_ops {
258            if let TxnOp::Delete(key) = op {
259                batch_del_req.keys.push(key.clone());
260            }
261        }
262        let res = self
263            .batch_delete_with_query_executor(query_executor, batch_del_req)
264            .await?;
265        let res_map = res
266            .prev_kvs
267            .into_iter()
268            .map(|kv| (kv.key, kv.value))
269            .collect::<HashMap<Vec<u8>, Vec<u8>>>();
270        let mut resps = Vec::with_capacity(txn_ops.len());
271        for op in txn_ops {
272            if let TxnOp::Delete(key) = op {
273                let value = res_map.get(key);
274                resps.push(TxnOpResponse::ResponseDelete(DeleteRangeResponse {
275                    deleted: if value.is_some() { 1 } else { 0 },
276                    prev_kvs: vec![],
277                }));
278            }
279        }
280        Ok(Some(resps))
281    }
282
283    async fn handle_batch_put(
284        &self,
285        query_executor: &mut ExecutorImpl<'_, T>,
286        txn_ops: &[TxnOp],
287    ) -> Result<Option<Vec<TxnOpResponse>>> {
288        let mut batch_put_req = BatchPutRequest {
289            kvs: vec![],
290            prev_kv: false,
291        };
292        for op in txn_ops {
293            if let TxnOp::Put(key, value) = op {
294                batch_put_req.kvs.push(KeyValue {
295                    key: key.clone(),
296                    value: value.clone(),
297                });
298            }
299        }
300        let _ = self
301            .batch_put_with_query_executor(query_executor, batch_put_req)
302            .await?;
303        let mut resps = Vec::with_capacity(txn_ops.len());
304        for op in txn_ops {
305            if let TxnOp::Put(_, _) = op {
306                resps.push(TxnOpResponse::ResponsePut(PutResponse { prev_kv: None }));
307            }
308        }
309        Ok(Some(resps))
310    }
311
312    async fn handle_batch_get(
313        &self,
314        query_executor: &mut ExecutorImpl<'_, T>,
315        txn_ops: &[TxnOp],
316    ) -> Result<Option<Vec<TxnOpResponse>>> {
317        let mut batch_get_req = BatchGetRequest { keys: vec![] };
318        for op in txn_ops {
319            if let TxnOp::Get(key) = op {
320                batch_get_req.keys.push(key.clone());
321            }
322        }
323        let res = self
324            .batch_get_with_query_executor(query_executor, batch_get_req)
325            .await?;
326        let res_map = res
327            .kvs
328            .into_iter()
329            .map(|kv| (kv.key, kv.value))
330            .collect::<HashMap<Vec<u8>, Vec<u8>>>();
331        let mut resps = Vec::with_capacity(txn_ops.len());
332        for op in txn_ops {
333            if let TxnOp::Get(key) = op {
334                let value = res_map.get(key);
335                resps.push(TxnOpResponse::ResponseGet(RangeResponse {
336                    kvs: value
337                        .map(|v| {
338                            vec![KeyValue {
339                                key: key.clone(),
340                                value: v.clone(),
341                            }]
342                        })
343                        .unwrap_or_default(),
344                    more: false,
345                }));
346            }
347        }
348        Ok(Some(resps))
349    }
350
351    async fn execute_txn_op(
352        &self,
353        query_executor: &mut ExecutorImpl<'_, T>,
354        op: &TxnOp,
355    ) -> Result<TxnOpResponse> {
356        match op {
357            TxnOp::Put(key, value) => {
358                let res = self
359                    .put_with_query_executor(
360                        query_executor,
361                        PutRequest {
362                            key: key.clone(),
363                            value: value.clone(),
364                            prev_kv: false,
365                        },
366                    )
367                    .await?;
368                Ok(TxnOpResponse::ResponsePut(res))
369            }
370            TxnOp::Get(key) => {
371                let res = self
372                    .range_with_query_executor(
373                        query_executor,
374                        RangeRequest {
375                            key: key.clone(),
376                            range_end: vec![],
377                            limit: 1,
378                            keys_only: false,
379                        },
380                    )
381                    .await?;
382                Ok(TxnOpResponse::ResponseGet(res))
383            }
384            TxnOp::Delete(key) => {
385                let res = self
386                    .delete_range_with_query_executor(
387                        query_executor,
388                        DeleteRangeRequest {
389                            key: key.clone(),
390                            range_end: vec![],
391                            prev_kv: false,
392                        },
393                    )
394                    .await?;
395                Ok(TxnOpResponse::ResponseDelete(res))
396            }
397        }
398    }
399
400    async fn txn_inner(&self, txn: &KvTxn) -> Result<KvTxnResponse> {
401        let mut default_executor = self.executor_factory.default_executor().await?;
402        let mut txn_executor = ExecutorImpl::Txn(
403            self.executor_factory
404                .txn_executor(&mut default_executor)
405                .await?,
406        );
407        let mut success = true;
408        if txn.c_when {
409            success = self
410                .execute_txn_cmp(&mut txn_executor, &txn.req.compare)
411                .await?;
412        }
413        let mut responses = vec![];
414        if success && txn.c_then {
415            match self
416                .try_batch_txn(&mut txn_executor, &txn.req.success)
417                .await?
418            {
419                Some(res) => responses.extend(res),
420                None => {
421                    for txnop in &txn.req.success {
422                        let res = self.execute_txn_op(&mut txn_executor, txnop).await?;
423                        responses.push(res);
424                    }
425                }
426            }
427        } else if !success && txn.c_else {
428            match self
429                .try_batch_txn(&mut txn_executor, &txn.req.failure)
430                .await?
431            {
432                Some(res) => responses.extend(res),
433                None => {
434                    for txnop in &txn.req.failure {
435                        let res = self.execute_txn_op(&mut txn_executor, txnop).await?;
436                        responses.push(res);
437                    }
438                }
439            }
440        }
441
442        txn_executor.commit().await?;
443        Ok(KvTxnResponse {
444            responses,
445            succeeded: success,
446        })
447    }
448}
449
450#[async_trait::async_trait]
451impl<T, S, R> KvBackend for RdsStore<T, S, R>
452where
453    R: 'static,
454    Self: KvQueryExecutor<T> + Send + Sync,
455    T: Executor + 'static,
456    S: ExecutorFactory<T> + 'static,
457{
458    fn name(&self) -> &str {
459        T::name()
460    }
461
462    fn as_any(&self) -> &dyn Any {
463        self
464    }
465
466    async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
467        let client = self.executor_factory.default_executor().await?;
468        let mut query_executor = ExecutorImpl::Default(client);
469        self.range_with_query_executor(&mut query_executor, req)
470            .await
471    }
472
473    async fn put(&self, req: PutRequest) -> Result<PutResponse> {
474        let client = self.executor_factory.default_executor().await?;
475        let mut query_executor = ExecutorImpl::Default(client);
476        self.put_with_query_executor(&mut query_executor, req).await
477    }
478
479    async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
480        let client = self.executor_factory.default_executor().await?;
481        let mut query_executor = ExecutorImpl::Default(client);
482        self.batch_put_with_query_executor(&mut query_executor, req)
483            .await
484    }
485
486    async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
487        let client = self.executor_factory.default_executor().await?;
488        let mut query_executor = ExecutorImpl::Default(client);
489        self.batch_get_with_query_executor(&mut query_executor, req)
490            .await
491    }
492
493    async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
494        let client = self.executor_factory.default_executor().await?;
495        let mut query_executor = ExecutorImpl::Default(client);
496        self.delete_range_with_query_executor(&mut query_executor, req)
497            .await
498    }
499
500    async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
501        let client = self.executor_factory.default_executor().await?;
502        let mut query_executor = ExecutorImpl::Default(client);
503        self.batch_delete_with_query_executor(&mut query_executor, req)
504            .await
505    }
506}
507
508#[async_trait::async_trait]
509impl<T, S, R> TxnService for RdsStore<T, S, R>
510where
511    Self: KvQueryExecutor<T> + Send + Sync,
512    T: Executor + 'static,
513    S: ExecutorFactory<T> + 'static,
514{
515    type Error = Error;
516
517    async fn txn(&self, txn: KvTxn) -> Result<KvTxnResponse> {
518        let _timer = METRIC_META_TXN_REQUEST
519            .with_label_values(&[T::name(), "txn"])
520            .start_timer();
521
522        let mut backoff = ExponentialBuilder::default()
523            .with_min_delay(Duration::from_millis(10))
524            .with_max_delay(Duration::from_millis(200))
525            .with_max_times(self.txn_retry_count)
526            .build();
527
528        loop {
529            match self.txn_inner(&txn).await {
530                Ok(res) => return Ok(res),
531                Err(e) => {
532                    if e.is_serialization_error() {
533                        let d = backoff.next();
534                        if let Some(d) = d {
535                            tokio::time::sleep(d).await;
536                            continue;
537                        }
538                        break;
539                    } else {
540                        return Err(e);
541                    }
542                }
543            }
544        }
545
546        RdsTransactionRetryFailedSnafu {}.fail()
547    }
548
549    fn max_txn_ops(&self) -> usize {
550        self.max_txn_ops
551    }
552}
553
554/// Checks if the transaction operations are the same type.
555fn check_txn_ops(txn_ops: &[TxnOp]) -> Result<bool> {
556    if txn_ops.is_empty() {
557        return Ok(false);
558    }
559    let same = txn_ops.windows(2).all(|a| {
560        matches!(
561            (&a[0], &a[1]),
562            (TxnOp::Put(_, _), TxnOp::Put(_, _))
563                | (TxnOp::Get(_), TxnOp::Get(_))
564                | (TxnOp::Delete(_), TxnOp::Delete(_))
565        )
566    });
567    Ok(same)
568}
569
570#[macro_export]
571macro_rules! record_rds_sql_execute_elapsed {
572    ($result:expr, $label_store:expr,$label_op:expr,$label_type:expr) => {{
573        let timer = std::time::Instant::now();
574        $result
575            .inspect(|_| {
576                $crate::metrics::RDS_SQL_EXECUTE_ELAPSED
577                    .with_label_values(&[$label_store, "success", $label_op, $label_type])
578                    .observe(timer.elapsed().as_millis_f64())
579            })
580            .inspect_err(|_| {
581                $crate::metrics::RDS_SQL_EXECUTE_ELAPSED
582                    .with_label_values(&[$label_store, "error", $label_op, $label_type])
583                    .observe(timer.elapsed().as_millis_f64());
584            })
585    }};
586}