1use std::marker::PhantomData;
16use std::sync::Arc;
17
18use common_telemetry::debug;
19use snafu::ResultExt;
20use sqlx::mysql::MySqlRow;
21use sqlx::pool::Pool;
22use sqlx::{MySql, MySqlPool, Row, Transaction as MySqlTransaction};
23use strum::AsRefStr;
24
25use crate::error::{CreateMySqlPoolSnafu, MySqlExecutionSnafu, MySqlTransactionSnafu, Result};
26use crate::kv_backend::rds::{
27 Executor, ExecutorFactory, ExecutorImpl, KvQueryExecutor, RdsStore, Transaction,
28 RDS_STORE_OP_BATCH_DELETE, RDS_STORE_OP_BATCH_GET, RDS_STORE_OP_BATCH_PUT,
29 RDS_STORE_OP_RANGE_DELETE, RDS_STORE_OP_RANGE_QUERY, RDS_STORE_TXN_RETRY_COUNT,
30};
31use crate::kv_backend::KvBackendRef;
32use crate::rpc::store::{
33 BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
34 BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, RangeRequest, RangeResponse,
35};
36use crate::rpc::KeyValue;
37
38const MYSQL_STORE_NAME: &str = "mysql_store";
39
40type MySqlClient = Arc<Pool<MySql>>;
41pub struct MySqlTxnClient(MySqlTransaction<'static, MySql>);
42
43fn key_value_from_row(row: MySqlRow) -> KeyValue {
44 KeyValue {
46 key: row.get_unchecked(0),
47 value: row.get_unchecked(1),
48 }
49}
50
51const EMPTY: &[u8] = &[0];
52
53#[derive(Debug, Clone, Copy, AsRefStr)]
55enum RangeTemplateType {
56 Point,
57 Range,
58 Full,
59 LeftBounded,
60 Prefix,
61}
62
63impl RangeTemplateType {
65 fn build_params(&self, mut key: Vec<u8>, range_end: Vec<u8>) -> Vec<Vec<u8>> {
68 match self {
69 RangeTemplateType::Point => vec![key],
70 RangeTemplateType::Range => vec![key, range_end],
71 RangeTemplateType::Full => vec![],
72 RangeTemplateType::LeftBounded => vec![key],
73 RangeTemplateType::Prefix => {
74 key.push(b'%');
75 vec![key]
76 }
77 }
78 }
79}
80
81#[derive(Debug, Clone)]
83struct RangeTemplate {
84 point: String,
85 range: String,
86 full: String,
87 left_bounded: String,
88 prefix: String,
89}
90
91impl RangeTemplate {
92 fn get(&self, typ: RangeTemplateType) -> &str {
94 match typ {
95 RangeTemplateType::Point => &self.point,
96 RangeTemplateType::Range => &self.range,
97 RangeTemplateType::Full => &self.full,
98 RangeTemplateType::LeftBounded => &self.left_bounded,
99 RangeTemplateType::Prefix => &self.prefix,
100 }
101 }
102
103 fn with_limit(template: &str, limit: i64) -> String {
105 if limit == 0 {
106 return format!("{};", template);
107 }
108 format!("{} LIMIT {};", template, limit)
109 }
110}
111
112fn is_prefix_range(start: &[u8], end: &[u8]) -> bool {
113 if start.len() != end.len() {
114 return false;
115 }
116 let l = start.len();
117 let same_prefix = start[0..l - 1] == end[0..l - 1];
118 if let (Some(rhs), Some(lhs)) = (start.last(), end.last()) {
119 return same_prefix && (*rhs + 1) == *lhs;
120 }
121 false
122}
123
124fn range_template(key: &[u8], range_end: &[u8]) -> RangeTemplateType {
126 match (key, range_end) {
127 (_, &[]) => RangeTemplateType::Point,
128 (EMPTY, EMPTY) => RangeTemplateType::Full,
129 (_, EMPTY) => RangeTemplateType::LeftBounded,
130 (start, end) => {
131 if is_prefix_range(start, end) {
132 RangeTemplateType::Prefix
133 } else {
134 RangeTemplateType::Range
135 }
136 }
137 }
138}
139
140fn mysql_generate_in_placeholders(from: usize, to: usize) -> Vec<String> {
142 (from..=to).map(|_| "?".to_string()).collect()
143}
144
145struct MySqlTemplateFactory<'a> {
147 table_name: &'a str,
148}
149
150impl<'a> MySqlTemplateFactory<'a> {
151 fn new(table_name: &'a str) -> Self {
153 Self { table_name }
154 }
155
156 fn build(&self) -> MySqlTemplateSet {
158 let table_name = self.table_name;
159 MySqlTemplateSet {
161 table_name: table_name.to_string(),
162 create_table_statement: format!(
163 "CREATE TABLE IF NOT EXISTS `{table_name}`(k VARBINARY(3072) PRIMARY KEY, v BLOB);",
165 ),
166 range_template: RangeTemplate {
167 point: format!("SELECT k, v FROM `{table_name}` WHERE k = ?"),
168 range: format!("SELECT k, v FROM `{table_name}` WHERE k >= ? AND k < ? ORDER BY k"),
169 full: format!("SELECT k, v FROM `{table_name}` ORDER BY k"),
170 left_bounded: format!("SELECT k, v FROM `{table_name}` WHERE k >= ? ORDER BY k"),
171 prefix: format!("SELECT k, v FROM `{table_name}` WHERE k LIKE ? ORDER BY k"),
172 },
173 delete_template: RangeTemplate {
174 point: format!("DELETE FROM `{table_name}` WHERE k = ?;"),
175 range: format!("DELETE FROM `{table_name}` WHERE k >= ? AND k < ?;"),
176 full: format!("DELETE FROM `{table_name}`"),
177 left_bounded: format!("DELETE FROM `{table_name}` WHERE k >= ?;"),
178 prefix: format!("DELETE FROM `{table_name}` WHERE k LIKE ?;"),
179 },
180 }
181 }
182}
183
184#[derive(Debug, Clone)]
186pub struct MySqlTemplateSet {
187 table_name: String,
188 create_table_statement: String,
189 range_template: RangeTemplate,
190 delete_template: RangeTemplate,
191}
192
193impl MySqlTemplateSet {
194 fn generate_batch_get_query(&self, key_len: usize) -> String {
196 let table_name = &self.table_name;
197 let in_clause = mysql_generate_in_placeholders(1, key_len).join(", ");
198 format!(
199 "SELECT k, v FROM `{table_name}` WHERE k in ({});",
200 in_clause
201 )
202 }
203
204 fn generate_batch_delete_query(&self, key_len: usize) -> String {
206 let table_name = &self.table_name;
207 let in_clause = mysql_generate_in_placeholders(1, key_len).join(", ");
208 format!("DELETE FROM `{table_name}` WHERE k in ({});", in_clause)
209 }
210
211 fn generate_batch_upsert_query(&self, kv_len: usize) -> (String, String) {
214 let table_name = &self.table_name;
215 let in_placeholders: Vec<String> = (1..=kv_len).map(|_| "?".to_string()).collect();
216 let in_clause = in_placeholders.join(", ");
217 let mut values_placeholders = Vec::new();
218 for _ in 0..kv_len {
219 values_placeholders.push("(?, ?)".to_string());
220 }
221 let values_clause = values_placeholders.join(", ");
222
223 (
224 format!(r#"SELECT k, v FROM `{table_name}` WHERE k IN ({in_clause})"#,),
225 format!(
226 r#"INSERT INTO `{table_name}` (k, v) VALUES {values_clause} ON DUPLICATE KEY UPDATE v = VALUES(v);"#,
227 ),
228 )
229 }
230}
231
232#[async_trait::async_trait]
233impl Executor for MySqlClient {
234 type Transaction<'a>
235 = MySqlTxnClient
236 where
237 Self: 'a;
238
239 fn name() -> &'static str {
240 "MySql"
241 }
242
243 async fn query(&mut self, raw_query: &str, params: &[&Vec<u8>]) -> Result<Vec<KeyValue>> {
244 let query = sqlx::query(raw_query);
245 let query = params.iter().fold(query, |query, param| query.bind(param));
246 let rows = query
247 .fetch_all(&**self)
248 .await
249 .context(MySqlExecutionSnafu { sql: raw_query })?;
250 Ok(rows.into_iter().map(key_value_from_row).collect())
251 }
252
253 async fn execute(&mut self, raw_query: &str, params: &[&Vec<u8>]) -> Result<()> {
254 let query = sqlx::query(raw_query);
255 let query = params.iter().fold(query, |query, param| query.bind(param));
256 query
257 .execute(&**self)
258 .await
259 .context(MySqlExecutionSnafu { sql: raw_query })?;
260 Ok(())
261 }
262
263 async fn txn_executor<'a>(&'a mut self) -> Result<Self::Transaction<'a>> {
264 sqlx::query("SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE")
267 .execute(&**self)
268 .await
269 .context(MySqlExecutionSnafu {
270 sql: "SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE",
271 })?;
272 let txn = self
273 .begin()
274 .await
275 .context(MySqlExecutionSnafu { sql: "begin" })?;
276 Ok(MySqlTxnClient(txn))
277 }
278}
279
280#[async_trait::async_trait]
281impl Transaction<'_> for MySqlTxnClient {
282 async fn query(&mut self, raw_query: &str, params: &[&Vec<u8>]) -> Result<Vec<KeyValue>> {
283 let query = sqlx::query(raw_query);
284 let query = params.iter().fold(query, |query, param| query.bind(param));
285 let rows = query
287 .fetch_all(&mut *(self.0))
288 .await
289 .context(MySqlExecutionSnafu { sql: raw_query })?;
290 Ok(rows.into_iter().map(key_value_from_row).collect())
291 }
292
293 async fn execute(&mut self, raw_query: &str, params: &[&Vec<u8>]) -> Result<()> {
294 let query = sqlx::query(raw_query);
295 let query = params.iter().fold(query, |query, param| query.bind(param));
296 query
298 .execute(&mut *(self.0))
299 .await
300 .context(MySqlExecutionSnafu { sql: raw_query })?;
301 Ok(())
302 }
303
304 async fn commit(self) -> Result<()> {
307 self.0.commit().await.context(MySqlTransactionSnafu {
308 operation: "commit",
309 })?;
310 Ok(())
311 }
312}
313
314pub struct MySqlExecutorFactory {
315 pool: Arc<Pool<MySql>>,
316}
317
318#[async_trait::async_trait]
319impl ExecutorFactory<MySqlClient> for MySqlExecutorFactory {
320 async fn default_executor(&self) -> Result<MySqlClient> {
321 Ok(self.pool.clone())
322 }
323
324 async fn txn_executor<'a>(
325 &self,
326 default_executor: &'a mut MySqlClient,
327 ) -> Result<MySqlTxnClient> {
328 default_executor.txn_executor().await
329 }
330}
331
332pub type MySqlStore = RdsStore<MySqlClient, MySqlExecutorFactory, MySqlTemplateSet>;
335
336#[async_trait::async_trait]
337impl KvQueryExecutor<MySqlClient> for MySqlStore {
338 async fn range_with_query_executor(
339 &self,
340 query_executor: &mut ExecutorImpl<'_, MySqlClient>,
341 req: RangeRequest,
342 ) -> Result<RangeResponse> {
343 let template_type = range_template(&req.key, &req.range_end);
344 let template = self.sql_template_set.range_template.get(template_type);
345 let params = template_type.build_params(req.key, req.range_end);
346 let params_ref = params.iter().collect::<Vec<_>>();
347 let query =
349 RangeTemplate::with_limit(template, if req.limit == 0 { 0 } else { req.limit + 1 });
350 let limit = req.limit as usize;
351 debug!("query: {:?}, params: {:?}", query, params);
352 let mut kvs = crate::record_rds_sql_execute_elapsed!(
353 query_executor.query(&query, ¶ms_ref).await,
354 MYSQL_STORE_NAME,
355 RDS_STORE_OP_RANGE_QUERY,
356 template_type.as_ref()
357 )?;
358 if req.keys_only {
359 kvs.iter_mut().for_each(|kv| kv.value = vec![]);
360 }
361 if limit == 0 || kvs.len() <= limit {
363 return Ok(RangeResponse { kvs, more: false });
364 }
365 let removed = kvs.pop();
367 debug_assert!(removed.is_some());
368 Ok(RangeResponse { kvs, more: true })
369 }
370
371 async fn batch_put_with_query_executor(
372 &self,
373 query_executor: &mut ExecutorImpl<'_, MySqlClient>,
374 req: BatchPutRequest,
375 ) -> Result<BatchPutResponse> {
376 let mut in_params = Vec::with_capacity(req.kvs.len() * 3);
377 let mut values_params = Vec::with_capacity(req.kvs.len() * 2);
378
379 for kv in &req.kvs {
380 let processed_key = &kv.key;
381 in_params.push(processed_key);
382
383 let processed_value = &kv.value;
384 values_params.push(processed_key);
385 values_params.push(processed_value);
386 }
387 let in_params = in_params.iter().map(|x| x as _).collect::<Vec<_>>();
388 let values_params = values_params.iter().map(|x| x as _).collect::<Vec<_>>();
389 let (select, update) = self
390 .sql_template_set
391 .generate_batch_upsert_query(req.kvs.len());
392
393 if !req.prev_kv {
395 crate::record_rds_sql_execute_elapsed!(
396 query_executor.execute(&update, &values_params).await,
397 MYSQL_STORE_NAME,
398 RDS_STORE_OP_BATCH_PUT,
399 ""
400 )?;
401 return Ok(BatchPutResponse::default());
402 }
403 if let ExecutorImpl::Default(query_executor) = query_executor {
405 let txn = query_executor.txn_executor().await?;
406 let mut txn = ExecutorImpl::Txn(txn);
407 let res = self.batch_put_with_query_executor(&mut txn, req).await;
408 txn.commit().await?;
409 return res;
410 }
411 let prev_kvs = crate::record_rds_sql_execute_elapsed!(
412 query_executor.query(&select, &in_params).await,
413 MYSQL_STORE_NAME,
414 RDS_STORE_OP_BATCH_PUT,
415 ""
416 )?;
417 query_executor.execute(&update, &values_params).await?;
418 Ok(BatchPutResponse { prev_kvs })
419 }
420
421 async fn batch_get_with_query_executor(
422 &self,
423 query_executor: &mut ExecutorImpl<'_, MySqlClient>,
424 req: BatchGetRequest,
425 ) -> Result<BatchGetResponse> {
426 if req.keys.is_empty() {
427 return Ok(BatchGetResponse { kvs: vec![] });
428 }
429 let query = self
430 .sql_template_set
431 .generate_batch_get_query(req.keys.len());
432 let params = req.keys.iter().map(|x| x as _).collect::<Vec<_>>();
433 let kvs = crate::record_rds_sql_execute_elapsed!(
434 query_executor.query(&query, ¶ms).await,
435 MYSQL_STORE_NAME,
436 RDS_STORE_OP_BATCH_GET,
437 ""
438 )?;
439 Ok(BatchGetResponse { kvs })
440 }
441
442 async fn delete_range_with_query_executor(
443 &self,
444 query_executor: &mut ExecutorImpl<'_, MySqlClient>,
445 req: DeleteRangeRequest,
446 ) -> Result<DeleteRangeResponse> {
447 if let ExecutorImpl::Default(query_executor) = query_executor {
450 let txn = query_executor.txn_executor().await?;
451 let mut txn = ExecutorImpl::Txn(txn);
452 let res = self.delete_range_with_query_executor(&mut txn, req).await;
453 txn.commit().await?;
454 return res;
455 }
456 let range_get_req = RangeRequest {
457 key: req.key.clone(),
458 range_end: req.range_end.clone(),
459 limit: 0,
460 keys_only: false,
461 };
462 let prev_kvs = self
463 .range_with_query_executor(query_executor, range_get_req)
464 .await?
465 .kvs;
466 let template_type = range_template(&req.key, &req.range_end);
467 let template = self.sql_template_set.delete_template.get(template_type);
468 let params = template_type.build_params(req.key, req.range_end);
469 let params_ref = params.iter().map(|x| x as _).collect::<Vec<_>>();
470 crate::record_rds_sql_execute_elapsed!(
471 query_executor.execute(template, ¶ms_ref).await,
472 MYSQL_STORE_NAME,
473 RDS_STORE_OP_RANGE_DELETE,
474 template_type.as_ref()
475 )?;
476 let mut resp = DeleteRangeResponse::new(prev_kvs.len() as i64);
477 if req.prev_kv {
478 resp.with_prev_kvs(prev_kvs);
479 }
480 Ok(resp)
481 }
482
483 async fn batch_delete_with_query_executor(
484 &self,
485 query_executor: &mut ExecutorImpl<'_, MySqlClient>,
486 req: BatchDeleteRequest,
487 ) -> Result<BatchDeleteResponse> {
488 if req.keys.is_empty() {
489 return Ok(BatchDeleteResponse::default());
490 }
491 let query = self
492 .sql_template_set
493 .generate_batch_delete_query(req.keys.len());
494 let params = req.keys.iter().map(|x| x as _).collect::<Vec<_>>();
495 if !req.prev_kv {
497 crate::record_rds_sql_execute_elapsed!(
498 query_executor.execute(&query, ¶ms).await,
499 MYSQL_STORE_NAME,
500 RDS_STORE_OP_BATCH_DELETE,
501 ""
502 )?;
503 return Ok(BatchDeleteResponse::default());
504 }
505 if let ExecutorImpl::Default(query_executor) = query_executor {
507 let txn = query_executor.txn_executor().await?;
508 let mut txn = ExecutorImpl::Txn(txn);
509 let res = self.batch_delete_with_query_executor(&mut txn, req).await;
510 txn.commit().await?;
511 return res;
512 }
513 let batch_get_req = BatchGetRequest {
515 keys: req.keys.clone(),
516 };
517 let prev_kvs = self
518 .batch_get_with_query_executor(query_executor, batch_get_req)
519 .await?
520 .kvs;
521 crate::record_rds_sql_execute_elapsed!(
523 query_executor.execute(&query, ¶ms).await,
524 MYSQL_STORE_NAME,
525 RDS_STORE_OP_BATCH_DELETE,
526 ""
527 )?;
528 if req.prev_kv {
529 Ok(BatchDeleteResponse { prev_kvs })
530 } else {
531 Ok(BatchDeleteResponse::default())
532 }
533 }
534}
535
536impl MySqlStore {
537 pub async fn with_url(url: &str, table_name: &str, max_txn_ops: usize) -> Result<KvBackendRef> {
539 let pool = MySqlPool::connect(url)
540 .await
541 .context(CreateMySqlPoolSnafu)?;
542 Self::with_mysql_pool(pool, table_name, max_txn_ops).await
543 }
544
545 pub async fn with_mysql_pool(
547 pool: Pool<MySql>,
548 table_name: &str,
549 max_txn_ops: usize,
550 ) -> Result<KvBackendRef> {
551 let sql_template_set = MySqlTemplateFactory::new(table_name).build();
555 sqlx::query(&sql_template_set.create_table_statement)
556 .execute(&pool)
557 .await
558 .context(MySqlExecutionSnafu {
559 sql: sql_template_set.create_table_statement.to_string(),
560 })?;
561 Ok(Arc::new(MySqlStore {
562 max_txn_ops,
563 sql_template_set,
564 txn_retry_count: RDS_STORE_TXN_RETRY_COUNT,
565 executor_factory: MySqlExecutorFactory {
566 pool: Arc::new(pool),
567 },
568 _phantom: PhantomData,
569 }))
570 }
571}
572
573#[cfg(test)]
574mod tests {
575 use common_telemetry::init_default_ut_logging;
576
577 use super::*;
578 use crate::kv_backend::test::{
579 prepare_kv_with_prefix, test_kv_batch_delete_with_prefix, test_kv_batch_get_with_prefix,
580 test_kv_compare_and_put_with_prefix, test_kv_delete_range_with_prefix,
581 test_kv_put_with_prefix, test_kv_range_2_with_prefix, test_kv_range_with_prefix,
582 test_simple_kv_range, test_txn_compare_equal, test_txn_compare_greater,
583 test_txn_compare_less, test_txn_compare_not_equal, test_txn_one_compare_op,
584 text_txn_multi_compare_op, unprepare_kv,
585 };
586 use crate::maybe_skip_mysql_integration_test;
587
588 async fn build_mysql_kv_backend(table_name: &str) -> Option<MySqlStore> {
589 init_default_ut_logging();
590 let endpoints = std::env::var("GT_MYSQL_ENDPOINTS").unwrap_or_default();
591 if endpoints.is_empty() {
592 return None;
593 }
594 let pool = MySqlPool::connect(&endpoints).await.unwrap();
595 let sql_templates = MySqlTemplateFactory::new(table_name).build();
596 sqlx::query(&sql_templates.create_table_statement)
597 .execute(&pool)
598 .await
599 .unwrap();
600 Some(MySqlStore {
601 max_txn_ops: 128,
602 sql_template_set: sql_templates,
603 txn_retry_count: RDS_STORE_TXN_RETRY_COUNT,
604 executor_factory: MySqlExecutorFactory {
605 pool: Arc::new(pool),
606 },
607 _phantom: PhantomData,
608 })
609 }
610
611 #[tokio::test]
612 async fn test_mysql_put() {
613 maybe_skip_mysql_integration_test!();
614 let kv_backend = build_mysql_kv_backend("put_test").await.unwrap();
615 let prefix = b"put/";
616 prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
617 test_kv_put_with_prefix(&kv_backend, prefix.to_vec()).await;
618 unprepare_kv(&kv_backend, prefix).await;
619 }
620
621 #[tokio::test]
622 async fn test_mysql_range() {
623 maybe_skip_mysql_integration_test!();
624 let kv_backend = build_mysql_kv_backend("range_test").await.unwrap();
625 let prefix = b"range/";
626 prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
627 test_kv_range_with_prefix(&kv_backend, prefix.to_vec()).await;
628 unprepare_kv(&kv_backend, prefix).await;
629 }
630
631 #[tokio::test]
632 async fn test_mysql_range_2() {
633 maybe_skip_mysql_integration_test!();
634 let kv_backend = build_mysql_kv_backend("range2_test").await.unwrap();
635 let prefix = b"range2/";
636 test_kv_range_2_with_prefix(&kv_backend, prefix.to_vec()).await;
637 unprepare_kv(&kv_backend, prefix).await;
638 }
639
640 #[tokio::test]
641 async fn test_mysql_all_range() {
642 maybe_skip_mysql_integration_test!();
643 let kv_backend = build_mysql_kv_backend("simple_range_test").await.unwrap();
644 let prefix = b"";
645 prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
646 test_simple_kv_range(&kv_backend).await;
647 unprepare_kv(&kv_backend, prefix).await;
648 }
649
650 #[tokio::test]
651 async fn test_mysql_batch_get() {
652 maybe_skip_mysql_integration_test!();
653 let kv_backend = build_mysql_kv_backend("batch_get_test").await.unwrap();
654 let prefix = b"batch_get/";
655 prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
656 test_kv_batch_get_with_prefix(&kv_backend, prefix.to_vec()).await;
657 unprepare_kv(&kv_backend, prefix).await;
658 }
659
660 #[tokio::test]
661 async fn test_mysql_batch_delete() {
662 maybe_skip_mysql_integration_test!();
663 let kv_backend = build_mysql_kv_backend("batch_delete_test").await.unwrap();
664 let prefix = b"batch_delete/";
665 prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
666 test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await;
667 unprepare_kv(&kv_backend, prefix).await;
668 }
669
670 #[tokio::test]
671 async fn test_mysql_batch_delete_with_prefix() {
672 maybe_skip_mysql_integration_test!();
673 let kv_backend = build_mysql_kv_backend("batch_delete_with_prefix_test")
674 .await
675 .unwrap();
676 let prefix = b"batch_delete/";
677 prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
678 test_kv_batch_delete_with_prefix(&kv_backend, prefix.to_vec()).await;
679 unprepare_kv(&kv_backend, prefix).await;
680 }
681
682 #[tokio::test]
683 async fn test_mysql_delete_range() {
684 maybe_skip_mysql_integration_test!();
685 let kv_backend = build_mysql_kv_backend("delete_range_test").await.unwrap();
686 let prefix = b"delete_range/";
687 prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
688 test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await;
689 unprepare_kv(&kv_backend, prefix).await;
690 }
691
692 #[tokio::test]
693 async fn test_mysql_compare_and_put() {
694 maybe_skip_mysql_integration_test!();
695 let kv_backend = build_mysql_kv_backend("compare_and_put_test")
696 .await
697 .unwrap();
698 let prefix = b"compare_and_put/";
699 let kv_backend = Arc::new(kv_backend);
700 test_kv_compare_and_put_with_prefix(kv_backend.clone(), prefix.to_vec()).await;
701 }
702
703 #[tokio::test]
704 async fn test_mysql_txn() {
705 maybe_skip_mysql_integration_test!();
706 let kv_backend = build_mysql_kv_backend("txn_test").await.unwrap();
707 test_txn_one_compare_op(&kv_backend).await;
708 text_txn_multi_compare_op(&kv_backend).await;
709 test_txn_compare_equal(&kv_backend).await;
710 test_txn_compare_greater(&kv_backend).await;
711 test_txn_compare_less(&kv_backend).await;
712 test_txn_compare_not_equal(&kv_backend).await;
713 }
714}