1use std::any::Any;
16use std::collections::HashMap;
17use std::marker::PhantomData;
18use std::time::Duration;
19
20use backon::{BackoffBuilder, ExponentialBuilder};
21use common_telemetry::debug;
22
23use crate::error::{Error, RdsTransactionRetryFailedSnafu, Result};
24use crate::kv_backend::txn::{
25 Compare, Txn as KvTxn, TxnOp, TxnOpResponse, TxnResponse as KvTxnResponse,
26};
27use crate::kv_backend::{KvBackend, TxnService};
28use crate::metrics::METRIC_META_TXN_REQUEST;
29use crate::rpc::store::{
30 BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
31 BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse,
32 RangeRequest, RangeResponse,
33};
34use crate::rpc::KeyValue;
35
36const RDS_STORE_OP_BATCH_GET: &str = "batch_get";
37const RDS_STORE_OP_BATCH_PUT: &str = "batch_put";
38const RDS_STORE_OP_RANGE_QUERY: &str = "range_query";
39const RDS_STORE_OP_RANGE_DELETE: &str = "range_delete";
40const RDS_STORE_OP_BATCH_DELETE: &str = "batch_delete";
41
42#[cfg(feature = "pg_kvbackend")]
43mod postgres;
44#[cfg(feature = "pg_kvbackend")]
45pub use postgres::PgStore;
46
47#[cfg(feature = "mysql_kvbackend")]
48mod mysql;
49#[cfg(feature = "mysql_kvbackend")]
50pub use mysql::MySqlStore;
51
52const RDS_STORE_TXN_RETRY_COUNT: usize = 3;
53
54#[async_trait::async_trait]
56pub trait Executor: Send + Sync {
57 type Transaction<'a>: 'a + Transaction<'a>
58 where
59 Self: 'a;
60
61 fn name() -> &'static str;
62
63 async fn query(&mut self, query: &str, params: &[&Vec<u8>]) -> Result<Vec<KeyValue>>;
64
65 async fn execute(&mut self, query: &str, params: &[&Vec<u8>]) -> Result<()> {
67 self.query(query, params).await?;
68 Ok(())
69 }
70
71 async fn txn_executor<'a>(&'a mut self) -> Result<Self::Transaction<'a>>;
72}
73
74#[async_trait::async_trait]
76pub trait Transaction<'a>: Send + Sync {
77 async fn query(&mut self, query: &str, params: &[&Vec<u8>]) -> Result<Vec<KeyValue>>;
78
79 async fn execute(&mut self, query: &str, params: &[&Vec<u8>]) -> Result<()> {
80 self.query(query, params).await?;
81 Ok(())
82 }
83
84 async fn commit(self) -> Result<()>;
85}
86
87#[async_trait::async_trait]
89pub trait ExecutorFactory<T: Executor>: Send + Sync {
90 async fn default_executor(&self) -> Result<T>;
91
92 async fn txn_executor<'a>(&self, default_executor: &'a mut T) -> Result<T::Transaction<'a>>;
93}
94
95pub struct RdsStore<T, S, R>
97where
98 T: Executor + Send + Sync,
99 S: ExecutorFactory<T> + Send + Sync,
100{
101 max_txn_ops: usize,
102 txn_retry_count: usize,
103 executor_factory: S,
104 sql_template_set: R,
105 _phantom: PhantomData<T>,
106}
107
108pub enum ExecutorImpl<'a, T: Executor + 'a> {
109 Default(T),
110 Txn(T::Transaction<'a>),
111}
112
113impl<T: Executor> ExecutorImpl<'_, T> {
114 async fn query(&mut self, query: &str, params: &Vec<&Vec<u8>>) -> Result<Vec<KeyValue>> {
115 match self {
116 Self::Default(executor) => executor.query(query, params).await,
117 Self::Txn(executor) => executor.query(query, params).await,
118 }
119 }
120
121 #[warn(dead_code)] async fn execute(&mut self, query: &str, params: &Vec<&Vec<u8>>) -> Result<()> {
123 match self {
124 Self::Default(executor) => executor.execute(query, params).await,
125 Self::Txn(executor) => executor.execute(query, params).await,
126 }
127 }
128
129 async fn commit(self) -> Result<()> {
130 match self {
131 Self::Txn(executor) => executor.commit().await,
132 _ => Ok(()),
133 }
134 }
135}
136
137#[async_trait::async_trait]
138pub trait KvQueryExecutor<T: Executor> {
139 async fn range_with_query_executor(
140 &self,
141 query_executor: &mut ExecutorImpl<'_, T>,
142 req: RangeRequest,
143 ) -> Result<RangeResponse>;
144
145 async fn put_with_query_executor(
146 &self,
147 query_executor: &mut ExecutorImpl<'_, T>,
148 req: PutRequest,
149 ) -> Result<PutResponse> {
150 let kv = KeyValue {
151 key: req.key,
152 value: req.value,
153 };
154 let mut res = self
155 .batch_put_with_query_executor(
156 query_executor,
157 BatchPutRequest {
158 kvs: vec![kv],
159 prev_kv: req.prev_kv,
160 },
161 )
162 .await?;
163
164 if !res.prev_kvs.is_empty() {
165 debug_assert!(req.prev_kv);
166 return Ok(PutResponse {
167 prev_kv: Some(res.prev_kvs.remove(0)),
168 });
169 }
170 Ok(PutResponse::default())
171 }
172
173 async fn batch_put_with_query_executor(
174 &self,
175 query_executor: &mut ExecutorImpl<'_, T>,
176 req: BatchPutRequest,
177 ) -> Result<BatchPutResponse>;
178
179 async fn batch_get_with_query_executor(
181 &self,
182 query_executor: &mut ExecutorImpl<'_, T>,
183 req: BatchGetRequest,
184 ) -> Result<BatchGetResponse>;
185
186 async fn delete_range_with_query_executor(
187 &self,
188 query_executor: &mut ExecutorImpl<'_, T>,
189 req: DeleteRangeRequest,
190 ) -> Result<DeleteRangeResponse>;
191
192 async fn batch_delete_with_query_executor(
193 &self,
194 query_executor: &mut ExecutorImpl<'_, T>,
195 req: BatchDeleteRequest,
196 ) -> Result<BatchDeleteResponse>;
197}
198
199impl<T, S, R> RdsStore<T, S, R>
200where
201 Self: KvQueryExecutor<T> + Send + Sync,
202 T: Executor + Send + Sync,
203 S: ExecutorFactory<T> + Send + Sync,
204{
205 async fn execute_txn_cmp(
206 &self,
207 query_executor: &mut ExecutorImpl<'_, T>,
208 cmp: &[Compare],
209 ) -> Result<bool> {
210 let batch_get_req = BatchGetRequest {
211 keys: cmp.iter().map(|c| c.key.clone()).collect(),
212 };
213 let res = self
214 .batch_get_with_query_executor(query_executor, batch_get_req)
215 .await?;
216 debug!("batch get res: {:?}", res);
217 let res_map = res
218 .kvs
219 .into_iter()
220 .map(|kv| (kv.key, kv.value))
221 .collect::<HashMap<Vec<u8>, Vec<u8>>>();
222 for c in cmp {
223 let value = res_map.get(&c.key);
224 if !c.compare_value(value) {
225 return Ok(false);
226 }
227 }
228 Ok(true)
229 }
230
231 async fn try_batch_txn(
233 &self,
234 query_executor: &mut ExecutorImpl<'_, T>,
235 txn_ops: &[TxnOp],
236 ) -> Result<Option<Vec<TxnOpResponse>>> {
237 if !check_txn_ops(txn_ops)? {
238 return Ok(None);
239 }
240 match txn_ops.first().unwrap() {
242 TxnOp::Delete(_) => self.handle_batch_delete(query_executor, txn_ops).await,
243 TxnOp::Put(_, _) => self.handle_batch_put(query_executor, txn_ops).await,
244 TxnOp::Get(_) => self.handle_batch_get(query_executor, txn_ops).await,
245 }
246 }
247
248 async fn handle_batch_delete(
249 &self,
250 query_executor: &mut ExecutorImpl<'_, T>,
251 txn_ops: &[TxnOp],
252 ) -> Result<Option<Vec<TxnOpResponse>>> {
253 let mut batch_del_req = BatchDeleteRequest {
254 keys: vec![],
255 prev_kv: true,
256 };
257 for op in txn_ops {
258 if let TxnOp::Delete(key) = op {
259 batch_del_req.keys.push(key.clone());
260 }
261 }
262 let res = self
263 .batch_delete_with_query_executor(query_executor, batch_del_req)
264 .await?;
265 let res_map = res
266 .prev_kvs
267 .into_iter()
268 .map(|kv| (kv.key, kv.value))
269 .collect::<HashMap<Vec<u8>, Vec<u8>>>();
270 let mut resps = Vec::with_capacity(txn_ops.len());
271 for op in txn_ops {
272 if let TxnOp::Delete(key) = op {
273 let value = res_map.get(key);
274 resps.push(TxnOpResponse::ResponseDelete(DeleteRangeResponse {
275 deleted: if value.is_some() { 1 } else { 0 },
276 prev_kvs: vec![],
277 }));
278 }
279 }
280 Ok(Some(resps))
281 }
282
283 async fn handle_batch_put(
284 &self,
285 query_executor: &mut ExecutorImpl<'_, T>,
286 txn_ops: &[TxnOp],
287 ) -> Result<Option<Vec<TxnOpResponse>>> {
288 let mut batch_put_req = BatchPutRequest {
289 kvs: vec![],
290 prev_kv: false,
291 };
292 for op in txn_ops {
293 if let TxnOp::Put(key, value) = op {
294 batch_put_req.kvs.push(KeyValue {
295 key: key.clone(),
296 value: value.clone(),
297 });
298 }
299 }
300 let _ = self
301 .batch_put_with_query_executor(query_executor, batch_put_req)
302 .await?;
303 let mut resps = Vec::with_capacity(txn_ops.len());
304 for op in txn_ops {
305 if let TxnOp::Put(_, _) = op {
306 resps.push(TxnOpResponse::ResponsePut(PutResponse { prev_kv: None }));
307 }
308 }
309 Ok(Some(resps))
310 }
311
312 async fn handle_batch_get(
313 &self,
314 query_executor: &mut ExecutorImpl<'_, T>,
315 txn_ops: &[TxnOp],
316 ) -> Result<Option<Vec<TxnOpResponse>>> {
317 let mut batch_get_req = BatchGetRequest { keys: vec![] };
318 for op in txn_ops {
319 if let TxnOp::Get(key) = op {
320 batch_get_req.keys.push(key.clone());
321 }
322 }
323 let res = self
324 .batch_get_with_query_executor(query_executor, batch_get_req)
325 .await?;
326 let res_map = res
327 .kvs
328 .into_iter()
329 .map(|kv| (kv.key, kv.value))
330 .collect::<HashMap<Vec<u8>, Vec<u8>>>();
331 let mut resps = Vec::with_capacity(txn_ops.len());
332 for op in txn_ops {
333 if let TxnOp::Get(key) = op {
334 let value = res_map.get(key);
335 resps.push(TxnOpResponse::ResponseGet(RangeResponse {
336 kvs: value
337 .map(|v| {
338 vec![KeyValue {
339 key: key.clone(),
340 value: v.clone(),
341 }]
342 })
343 .unwrap_or_default(),
344 more: false,
345 }));
346 }
347 }
348 Ok(Some(resps))
349 }
350
351 async fn execute_txn_op(
352 &self,
353 query_executor: &mut ExecutorImpl<'_, T>,
354 op: &TxnOp,
355 ) -> Result<TxnOpResponse> {
356 match op {
357 TxnOp::Put(key, value) => {
358 let res = self
359 .put_with_query_executor(
360 query_executor,
361 PutRequest {
362 key: key.clone(),
363 value: value.clone(),
364 prev_kv: false,
365 },
366 )
367 .await?;
368 Ok(TxnOpResponse::ResponsePut(res))
369 }
370 TxnOp::Get(key) => {
371 let res = self
372 .range_with_query_executor(
373 query_executor,
374 RangeRequest {
375 key: key.clone(),
376 range_end: vec![],
377 limit: 1,
378 keys_only: false,
379 },
380 )
381 .await?;
382 Ok(TxnOpResponse::ResponseGet(res))
383 }
384 TxnOp::Delete(key) => {
385 let res = self
386 .delete_range_with_query_executor(
387 query_executor,
388 DeleteRangeRequest {
389 key: key.clone(),
390 range_end: vec![],
391 prev_kv: false,
392 },
393 )
394 .await?;
395 Ok(TxnOpResponse::ResponseDelete(res))
396 }
397 }
398 }
399
400 async fn txn_inner(&self, txn: &KvTxn) -> Result<KvTxnResponse> {
401 let mut default_executor = self.executor_factory.default_executor().await?;
402 let mut txn_executor = ExecutorImpl::Txn(
403 self.executor_factory
404 .txn_executor(&mut default_executor)
405 .await?,
406 );
407 let mut success = true;
408 if txn.c_when {
409 success = self
410 .execute_txn_cmp(&mut txn_executor, &txn.req.compare)
411 .await?;
412 }
413 let mut responses = vec![];
414 if success && txn.c_then {
415 match self
416 .try_batch_txn(&mut txn_executor, &txn.req.success)
417 .await?
418 {
419 Some(res) => responses.extend(res),
420 None => {
421 for txnop in &txn.req.success {
422 let res = self.execute_txn_op(&mut txn_executor, txnop).await?;
423 responses.push(res);
424 }
425 }
426 }
427 } else if !success && txn.c_else {
428 match self
429 .try_batch_txn(&mut txn_executor, &txn.req.failure)
430 .await?
431 {
432 Some(res) => responses.extend(res),
433 None => {
434 for txnop in &txn.req.failure {
435 let res = self.execute_txn_op(&mut txn_executor, txnop).await?;
436 responses.push(res);
437 }
438 }
439 }
440 }
441
442 txn_executor.commit().await?;
443 Ok(KvTxnResponse {
444 responses,
445 succeeded: success,
446 })
447 }
448}
449
450#[async_trait::async_trait]
451impl<T, S, R> KvBackend for RdsStore<T, S, R>
452where
453 R: 'static,
454 Self: KvQueryExecutor<T> + Send + Sync,
455 T: Executor + 'static,
456 S: ExecutorFactory<T> + 'static,
457{
458 fn name(&self) -> &str {
459 T::name()
460 }
461
462 fn as_any(&self) -> &dyn Any {
463 self
464 }
465
466 async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
467 let client = self.executor_factory.default_executor().await?;
468 let mut query_executor = ExecutorImpl::Default(client);
469 self.range_with_query_executor(&mut query_executor, req)
470 .await
471 }
472
473 async fn put(&self, req: PutRequest) -> Result<PutResponse> {
474 let client = self.executor_factory.default_executor().await?;
475 let mut query_executor = ExecutorImpl::Default(client);
476 self.put_with_query_executor(&mut query_executor, req).await
477 }
478
479 async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
480 let client = self.executor_factory.default_executor().await?;
481 let mut query_executor = ExecutorImpl::Default(client);
482 self.batch_put_with_query_executor(&mut query_executor, req)
483 .await
484 }
485
486 async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
487 let client = self.executor_factory.default_executor().await?;
488 let mut query_executor = ExecutorImpl::Default(client);
489 self.batch_get_with_query_executor(&mut query_executor, req)
490 .await
491 }
492
493 async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
494 let client = self.executor_factory.default_executor().await?;
495 let mut query_executor = ExecutorImpl::Default(client);
496 self.delete_range_with_query_executor(&mut query_executor, req)
497 .await
498 }
499
500 async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
501 let client = self.executor_factory.default_executor().await?;
502 let mut query_executor = ExecutorImpl::Default(client);
503 self.batch_delete_with_query_executor(&mut query_executor, req)
504 .await
505 }
506}
507
508#[async_trait::async_trait]
509impl<T, S, R> TxnService for RdsStore<T, S, R>
510where
511 Self: KvQueryExecutor<T> + Send + Sync,
512 T: Executor + 'static,
513 S: ExecutorFactory<T> + 'static,
514{
515 type Error = Error;
516
517 async fn txn(&self, txn: KvTxn) -> Result<KvTxnResponse> {
518 let _timer = METRIC_META_TXN_REQUEST
519 .with_label_values(&[T::name(), "txn"])
520 .start_timer();
521
522 let mut backoff = ExponentialBuilder::default()
523 .with_min_delay(Duration::from_millis(10))
524 .with_max_delay(Duration::from_millis(200))
525 .with_max_times(self.txn_retry_count)
526 .build();
527
528 loop {
529 match self.txn_inner(&txn).await {
530 Ok(res) => return Ok(res),
531 Err(e) => {
532 if e.is_serialization_error() {
533 let d = backoff.next();
534 if let Some(d) = d {
535 tokio::time::sleep(d).await;
536 continue;
537 }
538 break;
539 } else {
540 return Err(e);
541 }
542 }
543 }
544 }
545
546 RdsTransactionRetryFailedSnafu {}.fail()
547 }
548
549 fn max_txn_ops(&self) -> usize {
550 self.max_txn_ops
551 }
552}
553
554fn check_txn_ops(txn_ops: &[TxnOp]) -> Result<bool> {
556 if txn_ops.is_empty() {
557 return Ok(false);
558 }
559 let same = txn_ops.windows(2).all(|a| {
560 matches!(
561 (&a[0], &a[1]),
562 (TxnOp::Put(_, _), TxnOp::Put(_, _))
563 | (TxnOp::Get(_), TxnOp::Get(_))
564 | (TxnOp::Delete(_), TxnOp::Delete(_))
565 )
566 });
567 Ok(same)
568}
569
570#[macro_export]
571macro_rules! record_rds_sql_execute_elapsed {
572 ($result:expr, $label_store:expr,$label_op:expr,$label_type:expr) => {{
573 let timer = std::time::Instant::now();
574 $result
575 .inspect(|_| {
576 $crate::metrics::RDS_SQL_EXECUTE_ELAPSED
577 .with_label_values(&[$label_store, "success", $label_op, $label_type])
578 .observe(timer.elapsed().as_millis_f64())
579 })
580 .inspect_err(|_| {
581 $crate::metrics::RDS_SQL_EXECUTE_ELAPSED
582 .with_label_values(&[$label_store, "error", $label_op, $label_type])
583 .observe(timer.elapsed().as_millis_f64());
584 })
585 }};
586}