common_meta/kv_backend/rds/
mysql.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::marker::PhantomData;
16use std::sync::Arc;
17
18use common_telemetry::debug;
19use snafu::ResultExt;
20use sqlx::mysql::MySqlRow;
21use sqlx::pool::Pool;
22use sqlx::{MySql, MySqlPool, Row, Transaction as MySqlTransaction};
23use strum::AsRefStr;
24
25use crate::error::{CreateMySqlPoolSnafu, MySqlExecutionSnafu, MySqlTransactionSnafu, Result};
26use crate::kv_backend::rds::{
27    Executor, ExecutorFactory, ExecutorImpl, KvQueryExecutor, RdsStore, Transaction,
28    RDS_STORE_OP_BATCH_DELETE, RDS_STORE_OP_BATCH_GET, RDS_STORE_OP_BATCH_PUT,
29    RDS_STORE_OP_RANGE_DELETE, RDS_STORE_OP_RANGE_QUERY, RDS_STORE_TXN_RETRY_COUNT,
30};
31use crate::kv_backend::KvBackendRef;
32use crate::rpc::store::{
33    BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
34    BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, RangeRequest, RangeResponse,
35};
36use crate::rpc::KeyValue;
37
38const MYSQL_STORE_NAME: &str = "mysql_store";
39
40type MySqlClient = Arc<Pool<MySql>>;
41pub struct MySqlTxnClient(MySqlTransaction<'static, MySql>);
42
43fn key_value_from_row(row: MySqlRow) -> KeyValue {
44    // Safety: key and value are the first two columns in the row
45    KeyValue {
46        key: row.get_unchecked(0),
47        value: row.get_unchecked(1),
48    }
49}
50
51const EMPTY: &[u8] = &[0];
52
53/// Type of range template.
54#[derive(Debug, Clone, Copy, AsRefStr)]
55enum RangeTemplateType {
56    Point,
57    Range,
58    Full,
59    LeftBounded,
60    Prefix,
61}
62
63/// Builds params for the given range template type.
64impl RangeTemplateType {
65    /// Builds the parameters for the given range template type.
66    /// You can check out the conventions at [RangeRequest]
67    fn build_params(&self, mut key: Vec<u8>, range_end: Vec<u8>) -> Vec<Vec<u8>> {
68        match self {
69            RangeTemplateType::Point => vec![key],
70            RangeTemplateType::Range => vec![key, range_end],
71            RangeTemplateType::Full => vec![],
72            RangeTemplateType::LeftBounded => vec![key],
73            RangeTemplateType::Prefix => {
74                key.push(b'%');
75                vec![key]
76            }
77        }
78    }
79}
80
81/// Templates for range request.
82#[derive(Debug, Clone)]
83struct RangeTemplate {
84    point: String,
85    range: String,
86    full: String,
87    left_bounded: String,
88    prefix: String,
89}
90
91impl RangeTemplate {
92    /// Gets the template for the given type.
93    fn get(&self, typ: RangeTemplateType) -> &str {
94        match typ {
95            RangeTemplateType::Point => &self.point,
96            RangeTemplateType::Range => &self.range,
97            RangeTemplateType::Full => &self.full,
98            RangeTemplateType::LeftBounded => &self.left_bounded,
99            RangeTemplateType::Prefix => &self.prefix,
100        }
101    }
102
103    /// Adds limit to the template.
104    fn with_limit(template: &str, limit: i64) -> String {
105        if limit == 0 {
106            return format!("{};", template);
107        }
108        format!("{} LIMIT {};", template, limit)
109    }
110}
111
112fn is_prefix_range(start: &[u8], end: &[u8]) -> bool {
113    if start.len() != end.len() {
114        return false;
115    }
116    let l = start.len();
117    let same_prefix = start[0..l - 1] == end[0..l - 1];
118    if let (Some(rhs), Some(lhs)) = (start.last(), end.last()) {
119        return same_prefix && (*rhs + 1) == *lhs;
120    }
121    false
122}
123
124/// Determine the template type for range request.
125fn range_template(key: &[u8], range_end: &[u8]) -> RangeTemplateType {
126    match (key, range_end) {
127        (_, &[]) => RangeTemplateType::Point,
128        (EMPTY, EMPTY) => RangeTemplateType::Full,
129        (_, EMPTY) => RangeTemplateType::LeftBounded,
130        (start, end) => {
131            if is_prefix_range(start, end) {
132                RangeTemplateType::Prefix
133            } else {
134                RangeTemplateType::Range
135            }
136        }
137    }
138}
139
140/// Generate in placeholders for MySQL.
141fn mysql_generate_in_placeholders(from: usize, to: usize) -> Vec<String> {
142    (from..=to).map(|_| "?".to_string()).collect()
143}
144
145/// Factory for building sql templates.
146struct MySqlTemplateFactory<'a> {
147    table_name: &'a str,
148}
149
150impl<'a> MySqlTemplateFactory<'a> {
151    /// Creates a new [`SqlTemplateFactory`] with the given table name.
152    fn new(table_name: &'a str) -> Self {
153        Self { table_name }
154    }
155
156    /// Builds the template set for the given table name.
157    fn build(&self) -> MySqlTemplateSet {
158        let table_name = self.table_name;
159        // Some of queries don't end with `;`, because we need to add `LIMIT` clause.
160        MySqlTemplateSet {
161            table_name: table_name.to_string(),
162            create_table_statement: format!(
163                // Cannot be more than 3072 bytes in PRIMARY KEY
164                "CREATE TABLE IF NOT EXISTS `{table_name}`(k VARBINARY(3072) PRIMARY KEY, v BLOB);",
165            ),
166            range_template: RangeTemplate {
167                point: format!("SELECT k, v FROM `{table_name}` WHERE k = ?"),
168                range: format!("SELECT k, v FROM `{table_name}` WHERE k >= ? AND k < ? ORDER BY k"),
169                full: format!("SELECT k, v FROM `{table_name}` ORDER BY k"),
170                left_bounded: format!("SELECT k, v FROM `{table_name}` WHERE k >= ? ORDER BY k"),
171                prefix: format!("SELECT k, v FROM `{table_name}` WHERE k LIKE ? ORDER BY k"),
172            },
173            delete_template: RangeTemplate {
174                point: format!("DELETE FROM `{table_name}` WHERE k = ?;"),
175                range: format!("DELETE FROM `{table_name}` WHERE k >= ? AND k < ?;"),
176                full: format!("DELETE FROM `{table_name}`"),
177                left_bounded: format!("DELETE FROM `{table_name}` WHERE k >= ?;"),
178                prefix: format!("DELETE FROM `{table_name}` WHERE k LIKE ?;"),
179            },
180        }
181    }
182}
183
184/// Templates for the given table name.
185#[derive(Debug, Clone)]
186pub struct MySqlTemplateSet {
187    table_name: String,
188    create_table_statement: String,
189    range_template: RangeTemplate,
190    delete_template: RangeTemplate,
191}
192
193impl MySqlTemplateSet {
194    /// Generates the sql for batch get.
195    fn generate_batch_get_query(&self, key_len: usize) -> String {
196        let table_name = &self.table_name;
197        let in_clause = mysql_generate_in_placeholders(1, key_len).join(", ");
198        format!(
199            "SELECT k, v FROM `{table_name}` WHERE k in ({});",
200            in_clause
201        )
202    }
203
204    /// Generates the sql for batch delete.
205    fn generate_batch_delete_query(&self, key_len: usize) -> String {
206        let table_name = &self.table_name;
207        let in_clause = mysql_generate_in_placeholders(1, key_len).join(", ");
208        format!("DELETE FROM `{table_name}` WHERE k in ({});", in_clause)
209    }
210
211    /// Generates the sql for batch upsert.
212    /// For MySQL, it also generates a select query to get the previous values.
213    fn generate_batch_upsert_query(&self, kv_len: usize) -> (String, String) {
214        let table_name = &self.table_name;
215        let in_placeholders: Vec<String> = (1..=kv_len).map(|_| "?".to_string()).collect();
216        let in_clause = in_placeholders.join(", ");
217        let mut values_placeholders = Vec::new();
218        for _ in 0..kv_len {
219            values_placeholders.push("(?, ?)".to_string());
220        }
221        let values_clause = values_placeholders.join(", ");
222
223        (
224            format!(r#"SELECT k, v FROM `{table_name}` WHERE k IN ({in_clause})"#,),
225            format!(
226                r#"INSERT INTO `{table_name}` (k, v) VALUES {values_clause} ON DUPLICATE KEY UPDATE v = VALUES(v);"#,
227            ),
228        )
229    }
230}
231
232#[async_trait::async_trait]
233impl Executor for MySqlClient {
234    type Transaction<'a>
235        = MySqlTxnClient
236    where
237        Self: 'a;
238
239    fn name() -> &'static str {
240        "MySql"
241    }
242
243    async fn query(&mut self, raw_query: &str, params: &[&Vec<u8>]) -> Result<Vec<KeyValue>> {
244        let query = sqlx::query(raw_query);
245        let query = params.iter().fold(query, |query, param| query.bind(param));
246        let rows = query
247            .fetch_all(&**self)
248            .await
249            .context(MySqlExecutionSnafu { sql: raw_query })?;
250        Ok(rows.into_iter().map(key_value_from_row).collect())
251    }
252
253    async fn execute(&mut self, raw_query: &str, params: &[&Vec<u8>]) -> Result<()> {
254        let query = sqlx::query(raw_query);
255        let query = params.iter().fold(query, |query, param| query.bind(param));
256        query
257            .execute(&**self)
258            .await
259            .context(MySqlExecutionSnafu { sql: raw_query })?;
260        Ok(())
261    }
262
263    async fn txn_executor<'a>(&'a mut self) -> Result<Self::Transaction<'a>> {
264        // sqlx has no isolation level support for now, so we have to set it manually.
265        // TODO(CookiePie): Waiting for https://github.com/launchbadge/sqlx/pull/3614 and remove this.
266        sqlx::query("SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE")
267            .execute(&**self)
268            .await
269            .context(MySqlExecutionSnafu {
270                sql: "SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE",
271            })?;
272        let txn = self
273            .begin()
274            .await
275            .context(MySqlExecutionSnafu { sql: "begin" })?;
276        Ok(MySqlTxnClient(txn))
277    }
278}
279
280#[async_trait::async_trait]
281impl Transaction<'_> for MySqlTxnClient {
282    async fn query(&mut self, raw_query: &str, params: &[&Vec<u8>]) -> Result<Vec<KeyValue>> {
283        let query = sqlx::query(raw_query);
284        let query = params.iter().fold(query, |query, param| query.bind(param));
285        // As said in https://docs.rs/sqlx/latest/sqlx/trait.Executor.html, we need a `&mut *transaction`. Weird.
286        let rows = query
287            .fetch_all(&mut *(self.0))
288            .await
289            .context(MySqlExecutionSnafu { sql: raw_query })?;
290        Ok(rows.into_iter().map(key_value_from_row).collect())
291    }
292
293    async fn execute(&mut self, raw_query: &str, params: &[&Vec<u8>]) -> Result<()> {
294        let query = sqlx::query(raw_query);
295        let query = params.iter().fold(query, |query, param| query.bind(param));
296        // As said in https://docs.rs/sqlx/latest/sqlx/trait.Executor.html, we need a `&mut *transaction`. Weird.
297        query
298            .execute(&mut *(self.0))
299            .await
300            .context(MySqlExecutionSnafu { sql: raw_query })?;
301        Ok(())
302    }
303
304    /// Caution: sqlx will stuck on the query if two transactions conflict with each other.
305    /// Don't know if it's a feature or it depends on the database. Be careful.
306    async fn commit(self) -> Result<()> {
307        self.0.commit().await.context(MySqlTransactionSnafu {
308            operation: "commit",
309        })?;
310        Ok(())
311    }
312}
313
314pub struct MySqlExecutorFactory {
315    pool: Arc<Pool<MySql>>,
316}
317
318#[async_trait::async_trait]
319impl ExecutorFactory<MySqlClient> for MySqlExecutorFactory {
320    async fn default_executor(&self) -> Result<MySqlClient> {
321        Ok(self.pool.clone())
322    }
323
324    async fn txn_executor<'a>(
325        &self,
326        default_executor: &'a mut MySqlClient,
327    ) -> Result<MySqlTxnClient> {
328        default_executor.txn_executor().await
329    }
330}
331
332/// A MySQL-backed key-value store.
333/// It uses [sqlx::Pool<MySql>] as the connection pool for [RdsStore].
334pub type MySqlStore = RdsStore<MySqlClient, MySqlExecutorFactory, MySqlTemplateSet>;
335
336#[async_trait::async_trait]
337impl KvQueryExecutor<MySqlClient> for MySqlStore {
338    async fn range_with_query_executor(
339        &self,
340        query_executor: &mut ExecutorImpl<'_, MySqlClient>,
341        req: RangeRequest,
342    ) -> Result<RangeResponse> {
343        let template_type = range_template(&req.key, &req.range_end);
344        let template = self.sql_template_set.range_template.get(template_type);
345        let params = template_type.build_params(req.key, req.range_end);
346        let params_ref = params.iter().collect::<Vec<_>>();
347        // Always add 1 to limit to check if there is more data
348        let query =
349            RangeTemplate::with_limit(template, if req.limit == 0 { 0 } else { req.limit + 1 });
350        let limit = req.limit as usize;
351        debug!("query: {:?}, params: {:?}", query, params);
352        let mut kvs = crate::record_rds_sql_execute_elapsed!(
353            query_executor.query(&query, &params_ref).await,
354            MYSQL_STORE_NAME,
355            RDS_STORE_OP_RANGE_QUERY,
356            template_type.as_ref()
357        )?;
358        if req.keys_only {
359            kvs.iter_mut().for_each(|kv| kv.value = vec![]);
360        }
361        // If limit is 0, we always return all data
362        if limit == 0 || kvs.len() <= limit {
363            return Ok(RangeResponse { kvs, more: false });
364        }
365        // If limit is greater than the number of rows, we remove the last row and set more to true
366        let removed = kvs.pop();
367        debug_assert!(removed.is_some());
368        Ok(RangeResponse { kvs, more: true })
369    }
370
371    async fn batch_put_with_query_executor(
372        &self,
373        query_executor: &mut ExecutorImpl<'_, MySqlClient>,
374        req: BatchPutRequest,
375    ) -> Result<BatchPutResponse> {
376        let mut in_params = Vec::with_capacity(req.kvs.len() * 3);
377        let mut values_params = Vec::with_capacity(req.kvs.len() * 2);
378
379        for kv in &req.kvs {
380            let processed_key = &kv.key;
381            in_params.push(processed_key);
382
383            let processed_value = &kv.value;
384            values_params.push(processed_key);
385            values_params.push(processed_value);
386        }
387        let in_params = in_params.iter().map(|x| x as _).collect::<Vec<_>>();
388        let values_params = values_params.iter().map(|x| x as _).collect::<Vec<_>>();
389        let (select, update) = self
390            .sql_template_set
391            .generate_batch_upsert_query(req.kvs.len());
392
393        // Fast path: if we don't need previous kvs, we can just upsert the keys.
394        if !req.prev_kv {
395            crate::record_rds_sql_execute_elapsed!(
396                query_executor.execute(&update, &values_params).await,
397                MYSQL_STORE_NAME,
398                RDS_STORE_OP_BATCH_PUT,
399                ""
400            )?;
401            return Ok(BatchPutResponse::default());
402        }
403        // Should use transaction to ensure atomicity.
404        if let ExecutorImpl::Default(query_executor) = query_executor {
405            let txn = query_executor.txn_executor().await?;
406            let mut txn = ExecutorImpl::Txn(txn);
407            let res = self.batch_put_with_query_executor(&mut txn, req).await;
408            txn.commit().await?;
409            return res;
410        }
411        let prev_kvs = crate::record_rds_sql_execute_elapsed!(
412            query_executor.query(&select, &in_params).await,
413            MYSQL_STORE_NAME,
414            RDS_STORE_OP_BATCH_PUT,
415            ""
416        )?;
417        query_executor.execute(&update, &values_params).await?;
418        Ok(BatchPutResponse { prev_kvs })
419    }
420
421    async fn batch_get_with_query_executor(
422        &self,
423        query_executor: &mut ExecutorImpl<'_, MySqlClient>,
424        req: BatchGetRequest,
425    ) -> Result<BatchGetResponse> {
426        if req.keys.is_empty() {
427            return Ok(BatchGetResponse { kvs: vec![] });
428        }
429        let query = self
430            .sql_template_set
431            .generate_batch_get_query(req.keys.len());
432        let params = req.keys.iter().map(|x| x as _).collect::<Vec<_>>();
433        let kvs = crate::record_rds_sql_execute_elapsed!(
434            query_executor.query(&query, &params).await,
435            MYSQL_STORE_NAME,
436            RDS_STORE_OP_BATCH_GET,
437            ""
438        )?;
439        Ok(BatchGetResponse { kvs })
440    }
441
442    async fn delete_range_with_query_executor(
443        &self,
444        query_executor: &mut ExecutorImpl<'_, MySqlClient>,
445        req: DeleteRangeRequest,
446    ) -> Result<DeleteRangeResponse> {
447        // Since we need to know the number of deleted keys, we have no fast path here.
448        // Should use transaction to ensure atomicity.
449        if let ExecutorImpl::Default(query_executor) = query_executor {
450            let txn = query_executor.txn_executor().await?;
451            let mut txn = ExecutorImpl::Txn(txn);
452            let res = self.delete_range_with_query_executor(&mut txn, req).await;
453            txn.commit().await?;
454            return res;
455        }
456        let range_get_req = RangeRequest {
457            key: req.key.clone(),
458            range_end: req.range_end.clone(),
459            limit: 0,
460            keys_only: false,
461        };
462        let prev_kvs = self
463            .range_with_query_executor(query_executor, range_get_req)
464            .await?
465            .kvs;
466        let template_type = range_template(&req.key, &req.range_end);
467        let template = self.sql_template_set.delete_template.get(template_type);
468        let params = template_type.build_params(req.key, req.range_end);
469        let params_ref = params.iter().map(|x| x as _).collect::<Vec<_>>();
470        crate::record_rds_sql_execute_elapsed!(
471            query_executor.execute(template, &params_ref).await,
472            MYSQL_STORE_NAME,
473            RDS_STORE_OP_RANGE_DELETE,
474            template_type.as_ref()
475        )?;
476        let mut resp = DeleteRangeResponse::new(prev_kvs.len() as i64);
477        if req.prev_kv {
478            resp.with_prev_kvs(prev_kvs);
479        }
480        Ok(resp)
481    }
482
483    async fn batch_delete_with_query_executor(
484        &self,
485        query_executor: &mut ExecutorImpl<'_, MySqlClient>,
486        req: BatchDeleteRequest,
487    ) -> Result<BatchDeleteResponse> {
488        if req.keys.is_empty() {
489            return Ok(BatchDeleteResponse::default());
490        }
491        let query = self
492            .sql_template_set
493            .generate_batch_delete_query(req.keys.len());
494        let params = req.keys.iter().map(|x| x as _).collect::<Vec<_>>();
495        // Fast path: if we don't need previous kvs, we can just delete the keys.
496        if !req.prev_kv {
497            crate::record_rds_sql_execute_elapsed!(
498                query_executor.execute(&query, &params).await,
499                MYSQL_STORE_NAME,
500                RDS_STORE_OP_BATCH_DELETE,
501                ""
502            )?;
503            return Ok(BatchDeleteResponse::default());
504        }
505        // Should use transaction to ensure atomicity.
506        if let ExecutorImpl::Default(query_executor) = query_executor {
507            let txn = query_executor.txn_executor().await?;
508            let mut txn = ExecutorImpl::Txn(txn);
509            let res = self.batch_delete_with_query_executor(&mut txn, req).await;
510            txn.commit().await?;
511            return res;
512        }
513        // Should get previous kvs first
514        let batch_get_req = BatchGetRequest {
515            keys: req.keys.clone(),
516        };
517        let prev_kvs = self
518            .batch_get_with_query_executor(query_executor, batch_get_req)
519            .await?
520            .kvs;
521        // Pure `DELETE` has no return value, so we need to use `execute` instead of `query`.
522        crate::record_rds_sql_execute_elapsed!(
523            query_executor.execute(&query, &params).await,
524            MYSQL_STORE_NAME,
525            RDS_STORE_OP_BATCH_DELETE,
526            ""
527        )?;
528        if req.prev_kv {
529            Ok(BatchDeleteResponse { prev_kvs })
530        } else {
531            Ok(BatchDeleteResponse::default())
532        }
533    }
534}
535
536impl MySqlStore {
537    /// Create [MySqlStore] impl of [KvBackendRef] from url.
538    pub async fn with_url(url: &str, table_name: &str, max_txn_ops: usize) -> Result<KvBackendRef> {
539        let pool = MySqlPool::connect(url)
540            .await
541            .context(CreateMySqlPoolSnafu)?;
542        Self::with_mysql_pool(pool, table_name, max_txn_ops).await
543    }
544
545    /// Create [MySqlStore] impl of [KvBackendRef] from [sqlx::Pool<MySql>].
546    pub async fn with_mysql_pool(
547        pool: Pool<MySql>,
548        table_name: &str,
549        max_txn_ops: usize,
550    ) -> Result<KvBackendRef> {
551        // This step ensures the mysql metadata backend is ready to use.
552        // We check if greptime_metakv table exists, and we will create a new table
553        // if it does not exist.
554        let sql_template_set = MySqlTemplateFactory::new(table_name).build();
555        sqlx::query(&sql_template_set.create_table_statement)
556            .execute(&pool)
557            .await
558            .context(MySqlExecutionSnafu {
559                sql: sql_template_set.create_table_statement.to_string(),
560            })?;
561        Ok(Arc::new(MySqlStore {
562            max_txn_ops,
563            sql_template_set,
564            txn_retry_count: RDS_STORE_TXN_RETRY_COUNT,
565            executor_factory: MySqlExecutorFactory {
566                pool: Arc::new(pool),
567            },
568            _phantom: PhantomData,
569        }))
570    }
571}
572
573#[cfg(test)]
574mod tests {
575    use common_telemetry::init_default_ut_logging;
576
577    use super::*;
578    use crate::kv_backend::test::{
579        prepare_kv_with_prefix, test_kv_batch_delete_with_prefix, test_kv_batch_get_with_prefix,
580        test_kv_compare_and_put_with_prefix, test_kv_delete_range_with_prefix,
581        test_kv_put_with_prefix, test_kv_range_2_with_prefix, test_kv_range_with_prefix,
582        test_simple_kv_range, test_txn_compare_equal, test_txn_compare_greater,
583        test_txn_compare_less, test_txn_compare_not_equal, test_txn_one_compare_op,
584        text_txn_multi_compare_op, unprepare_kv,
585    };
586    use crate::maybe_skip_mysql_integration_test;
587
588    async fn build_mysql_kv_backend(table_name: &str) -> Option<MySqlStore> {
589        init_default_ut_logging();
590        let endpoints = std::env::var("GT_MYSQL_ENDPOINTS").unwrap_or_default();
591        if endpoints.is_empty() {
592            return None;
593        }
594        let pool = MySqlPool::connect(&endpoints).await.unwrap();
595        let sql_templates = MySqlTemplateFactory::new(table_name).build();
596        sqlx::query(&sql_templates.create_table_statement)
597            .execute(&pool)
598            .await
599            .unwrap();
600        Some(MySqlStore {
601            max_txn_ops: 128,
602            sql_template_set: sql_templates,
603            txn_retry_count: RDS_STORE_TXN_RETRY_COUNT,
604            executor_factory: MySqlExecutorFactory {
605                pool: Arc::new(pool),
606            },
607            _phantom: PhantomData,
608        })
609    }
610
611    #[tokio::test]
612    async fn test_mysql_put() {
613        maybe_skip_mysql_integration_test!();
614        let kv_backend = build_mysql_kv_backend("put_test").await.unwrap();
615        let prefix = b"put/";
616        prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
617        test_kv_put_with_prefix(&kv_backend, prefix.to_vec()).await;
618        unprepare_kv(&kv_backend, prefix).await;
619    }
620
621    #[tokio::test]
622    async fn test_mysql_range() {
623        maybe_skip_mysql_integration_test!();
624        let kv_backend = build_mysql_kv_backend("range_test").await.unwrap();
625        let prefix = b"range/";
626        prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
627        test_kv_range_with_prefix(&kv_backend, prefix.to_vec()).await;
628        unprepare_kv(&kv_backend, prefix).await;
629    }
630
631    #[tokio::test]
632    async fn test_mysql_range_2() {
633        maybe_skip_mysql_integration_test!();
634        let kv_backend = build_mysql_kv_backend("range2_test").await.unwrap();
635        let prefix = b"range2/";
636        test_kv_range_2_with_prefix(&kv_backend, prefix.to_vec()).await;
637        unprepare_kv(&kv_backend, prefix).await;
638    }
639
640    #[tokio::test]
641    async fn test_mysql_all_range() {
642        maybe_skip_mysql_integration_test!();
643        let kv_backend = build_mysql_kv_backend("simple_range_test").await.unwrap();
644        let prefix = b"";
645        prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
646        test_simple_kv_range(&kv_backend).await;
647        unprepare_kv(&kv_backend, prefix).await;
648    }
649
650    #[tokio::test]
651    async fn test_mysql_batch_get() {
652        maybe_skip_mysql_integration_test!();
653        let kv_backend = build_mysql_kv_backend("batch_get_test").await.unwrap();
654        let prefix = b"batch_get/";
655        prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
656        test_kv_batch_get_with_prefix(&kv_backend, prefix.to_vec()).await;
657        unprepare_kv(&kv_backend, prefix).await;
658    }
659
660    #[tokio::test]
661    async fn test_mysql_batch_delete() {
662        maybe_skip_mysql_integration_test!();
663        let kv_backend = build_mysql_kv_backend("batch_delete_test").await.unwrap();
664        let prefix = b"batch_delete/";
665        prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
666        test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await;
667        unprepare_kv(&kv_backend, prefix).await;
668    }
669
670    #[tokio::test]
671    async fn test_mysql_batch_delete_with_prefix() {
672        maybe_skip_mysql_integration_test!();
673        let kv_backend = build_mysql_kv_backend("batch_delete_with_prefix_test")
674            .await
675            .unwrap();
676        let prefix = b"batch_delete/";
677        prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
678        test_kv_batch_delete_with_prefix(&kv_backend, prefix.to_vec()).await;
679        unprepare_kv(&kv_backend, prefix).await;
680    }
681
682    #[tokio::test]
683    async fn test_mysql_delete_range() {
684        maybe_skip_mysql_integration_test!();
685        let kv_backend = build_mysql_kv_backend("delete_range_test").await.unwrap();
686        let prefix = b"delete_range/";
687        prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
688        test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await;
689        unprepare_kv(&kv_backend, prefix).await;
690    }
691
692    #[tokio::test]
693    async fn test_mysql_compare_and_put() {
694        maybe_skip_mysql_integration_test!();
695        let kv_backend = build_mysql_kv_backend("compare_and_put_test")
696            .await
697            .unwrap();
698        let prefix = b"compare_and_put/";
699        let kv_backend = Arc::new(kv_backend);
700        test_kv_compare_and_put_with_prefix(kv_backend.clone(), prefix.to_vec()).await;
701    }
702
703    #[tokio::test]
704    async fn test_mysql_txn() {
705        maybe_skip_mysql_integration_test!();
706        let kv_backend = build_mysql_kv_backend("txn_test").await.unwrap();
707        test_txn_one_compare_op(&kv_backend).await;
708        text_txn_multi_compare_op(&kv_backend).await;
709        test_txn_compare_equal(&kv_backend).await;
710        test_txn_compare_greater(&kv_backend).await;
711        test_txn_compare_less(&kv_backend).await;
712        test_txn_compare_not_equal(&kv_backend).await;
713    }
714}