Skip to main content

common_meta/kv_backend/rds/
mysql.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::marker::PhantomData;
16use std::sync::Arc;
17
18use common_telemetry::{debug, info, warn};
19use lazy_static::lazy_static;
20use regex::Regex;
21use snafu::{OptionExt, ResultExt};
22use sqlx::mysql::MySqlRow;
23use sqlx::pool::Pool;
24use sqlx::{MySql, MySqlPool, Row, Transaction as MySqlTransaction};
25use strum::AsRefStr;
26
27use crate::error::{
28    CreateMySqlPoolSnafu, MySqlExecutionSnafu, MySqlTransactionSnafu, Result, UnexpectedSnafu,
29};
30use crate::kv_backend::KvBackendRef;
31use crate::kv_backend::rds::{
32    Executor, ExecutorFactory, ExecutorImpl, KvQueryExecutor, RDS_STORE_OP_BATCH_DELETE,
33    RDS_STORE_OP_BATCH_GET, RDS_STORE_OP_BATCH_PUT, RDS_STORE_OP_RANGE_DELETE,
34    RDS_STORE_OP_RANGE_QUERY, RDS_STORE_TXN_RETRY_COUNT, RdsStore, Transaction,
35    ensure_rustls_crypto_provider_installed,
36};
37use crate::rpc::KeyValue;
38use crate::rpc::store::{
39    BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
40    BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, RangeRequest, RangeResponse,
41};
42
43const MYSQL_STORE_NAME: &str = "mysql_store";
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46enum ValueBlobType {
47    Blob,
48    MediumBlob,
49    LongBlob,
50}
51
52lazy_static! {
53    static ref VALUE_COLUMN_BLOB_TYPE_RE: Regex =
54        Regex::new(r#"(?i)(?:\(|,)\s*[`"]?v[`"]?\s+(longblob|mediumblob|blob)\b"#).unwrap();
55}
56
57type MySqlClient = Arc<Pool<MySql>>;
58pub struct MySqlTxnClient(MySqlTransaction<'static, MySql>);
59
60fn key_value_from_row(row: MySqlRow) -> KeyValue {
61    // Safety: key and value are the first two columns in the row
62    KeyValue {
63        key: row.get_unchecked(0),
64        value: row.get_unchecked(1),
65    }
66}
67
68const EMPTY: &[u8] = &[0];
69
70/// Type of range template.
71#[derive(Debug, Clone, Copy, AsRefStr)]
72enum RangeTemplateType {
73    Point,
74    Range,
75    Full,
76    LeftBounded,
77    Prefix,
78}
79
80/// Builds params for the given range template type.
81impl RangeTemplateType {
82    /// Builds the parameters for the given range template type.
83    /// You can check out the conventions at [RangeRequest]
84    fn build_params(&self, mut key: Vec<u8>, range_end: Vec<u8>) -> Vec<Vec<u8>> {
85        match self {
86            RangeTemplateType::Point => vec![key],
87            RangeTemplateType::Range => vec![key, range_end],
88            RangeTemplateType::Full => vec![],
89            RangeTemplateType::LeftBounded => vec![key],
90            RangeTemplateType::Prefix => {
91                key.push(b'%');
92                vec![key]
93            }
94        }
95    }
96}
97
98/// Templates for range request.
99#[derive(Debug, Clone)]
100struct RangeTemplate {
101    point: String,
102    range: String,
103    full: String,
104    left_bounded: String,
105    prefix: String,
106}
107
108impl RangeTemplate {
109    /// Gets the template for the given type.
110    fn get(&self, typ: RangeTemplateType) -> &str {
111        match typ {
112            RangeTemplateType::Point => &self.point,
113            RangeTemplateType::Range => &self.range,
114            RangeTemplateType::Full => &self.full,
115            RangeTemplateType::LeftBounded => &self.left_bounded,
116            RangeTemplateType::Prefix => &self.prefix,
117        }
118    }
119
120    /// Adds limit to the template.
121    fn with_limit(template: &str, limit: i64) -> String {
122        if limit == 0 {
123            return format!("{};", template);
124        }
125        format!("{} LIMIT {};", template, limit)
126    }
127}
128
129fn is_prefix_range(start: &[u8], end: &[u8]) -> bool {
130    if start.len() != end.len() {
131        return false;
132    }
133    let l = start.len();
134    let same_prefix = start[0..l - 1] == end[0..l - 1];
135    if let (Some(rhs), Some(lhs)) = (start.last(), end.last()) {
136        return same_prefix && (*rhs + 1) == *lhs;
137    }
138    false
139}
140
141/// Determine the template type for range request.
142fn range_template(key: &[u8], range_end: &[u8]) -> RangeTemplateType {
143    match (key, range_end) {
144        (_, &[]) => RangeTemplateType::Point,
145        (EMPTY, EMPTY) => RangeTemplateType::Full,
146        (_, EMPTY) => RangeTemplateType::LeftBounded,
147        (start, end) => {
148            if is_prefix_range(start, end) {
149                RangeTemplateType::Prefix
150            } else {
151                RangeTemplateType::Range
152            }
153        }
154    }
155}
156
157/// Generate in placeholders for MySQL.
158fn mysql_generate_in_placeholders(from: usize, to: usize) -> Vec<String> {
159    (from..=to).map(|_| "?".to_string()).collect()
160}
161
162/// Factory for building sql templates.
163struct MySqlTemplateFactory<'a> {
164    table_name: &'a str,
165}
166
167impl<'a> MySqlTemplateFactory<'a> {
168    /// Creates a new [`SqlTemplateFactory`] with the given table name.
169    fn new(table_name: &'a str) -> Self {
170        Self { table_name }
171    }
172
173    /// Builds the template set for the given table name.
174    fn build(&self) -> MySqlTemplateSet {
175        let table_name = self.table_name;
176        // Some of queries don't end with `;`, because we need to add `LIMIT` clause.
177        MySqlTemplateSet {
178            table_name: table_name.to_string(),
179            create_table_statement: format!(
180                // Cannot be more than 3072 bytes in PRIMARY KEY
181                "CREATE TABLE IF NOT EXISTS `{table_name}`(k VARBINARY(3072) PRIMARY KEY, v MEDIUMBLOB);",
182            ),
183            show_create_table_statement: format!("SHOW CREATE TABLE `{table_name}`"),
184            alter_value_column_statement: format!(
185                "ALTER TABLE `{table_name}` MODIFY COLUMN v MEDIUMBLOB;"
186            ),
187            range_template: RangeTemplate {
188                point: format!("SELECT k, v FROM `{table_name}` WHERE k = ?"),
189                range: format!("SELECT k, v FROM `{table_name}` WHERE k >= ? AND k < ? ORDER BY k"),
190                full: format!("SELECT k, v FROM `{table_name}` ORDER BY k"),
191                left_bounded: format!("SELECT k, v FROM `{table_name}` WHERE k >= ? ORDER BY k"),
192                prefix: format!("SELECT k, v FROM `{table_name}` WHERE k LIKE ? ORDER BY k"),
193            },
194            delete_template: RangeTemplate {
195                point: format!("DELETE FROM `{table_name}` WHERE k = ?;"),
196                range: format!("DELETE FROM `{table_name}` WHERE k >= ? AND k < ?;"),
197                full: format!("DELETE FROM `{table_name}`"),
198                left_bounded: format!("DELETE FROM `{table_name}` WHERE k >= ?;"),
199                prefix: format!("DELETE FROM `{table_name}` WHERE k LIKE ?;"),
200            },
201        }
202    }
203}
204
205/// Templates for the given table name.
206#[derive(Debug, Clone)]
207pub struct MySqlTemplateSet {
208    table_name: String,
209    create_table_statement: String,
210    show_create_table_statement: String,
211    alter_value_column_statement: String,
212    range_template: RangeTemplate,
213    delete_template: RangeTemplate,
214}
215
216impl MySqlTemplateSet {
217    /// Generates the sql for batch get.
218    fn generate_batch_get_query(&self, key_len: usize) -> String {
219        let table_name = &self.table_name;
220        let in_clause = mysql_generate_in_placeholders(1, key_len).join(", ");
221        format!(
222            "SELECT k, v FROM `{table_name}` WHERE k in ({});",
223            in_clause
224        )
225    }
226
227    /// Generates the sql for batch delete.
228    fn generate_batch_delete_query(&self, key_len: usize) -> String {
229        let table_name = &self.table_name;
230        let in_clause = mysql_generate_in_placeholders(1, key_len).join(", ");
231        format!("DELETE FROM `{table_name}` WHERE k in ({});", in_clause)
232    }
233
234    /// Generates the sql for batch upsert.
235    /// For MySQL, it also generates a select query to get the previous values.
236    fn generate_batch_upsert_query(&self, kv_len: usize) -> (String, String) {
237        let table_name = &self.table_name;
238        let in_placeholders: Vec<String> = (1..=kv_len).map(|_| "?".to_string()).collect();
239        let in_clause = in_placeholders.join(", ");
240        let mut values_placeholders = Vec::new();
241        for _ in 0..kv_len {
242            values_placeholders.push("(?, ?)".to_string());
243        }
244        let values_clause = values_placeholders.join(", ");
245
246        (
247            format!(r#"SELECT k, v FROM `{table_name}` WHERE k IN ({in_clause})"#,),
248            format!(
249                r#"INSERT INTO `{table_name}` (k, v) VALUES {values_clause} ON DUPLICATE KEY UPDATE v = VALUES(v);"#,
250            ),
251        )
252    }
253}
254
255#[async_trait::async_trait]
256impl Executor for MySqlClient {
257    type Transaction<'a>
258        = MySqlTxnClient
259    where
260        Self: 'a;
261
262    fn name() -> &'static str {
263        "MySql"
264    }
265
266    async fn query(&mut self, raw_query: &str, params: &[&Vec<u8>]) -> Result<Vec<KeyValue>> {
267        let query = sqlx::query(raw_query);
268        let query = params.iter().fold(query, |query, param| query.bind(param));
269        let rows = query
270            .fetch_all(&**self)
271            .await
272            .context(MySqlExecutionSnafu { sql: raw_query })?;
273        Ok(rows.into_iter().map(key_value_from_row).collect())
274    }
275
276    async fn execute(&mut self, raw_query: &str, params: &[&Vec<u8>]) -> Result<()> {
277        let query = sqlx::query(raw_query);
278        let query = params.iter().fold(query, |query, param| query.bind(param));
279        query
280            .execute(&**self)
281            .await
282            .context(MySqlExecutionSnafu { sql: raw_query })?;
283        Ok(())
284    }
285
286    async fn txn_executor<'a>(&'a mut self) -> Result<Self::Transaction<'a>> {
287        // sqlx has no isolation level support for now, so we have to set it manually.
288        // TODO(CookiePie): Waiting for https://github.com/launchbadge/sqlx/pull/3614 and remove this.
289        sqlx::query("SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE")
290            .execute(&**self)
291            .await
292            .context(MySqlExecutionSnafu {
293                sql: "SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE",
294            })?;
295        let txn = self
296            .begin()
297            .await
298            .context(MySqlExecutionSnafu { sql: "begin" })?;
299        Ok(MySqlTxnClient(txn))
300    }
301}
302
303#[async_trait::async_trait]
304impl Transaction<'_> for MySqlTxnClient {
305    async fn query(&mut self, raw_query: &str, params: &[&Vec<u8>]) -> Result<Vec<KeyValue>> {
306        let query = sqlx::query(raw_query);
307        let query = params.iter().fold(query, |query, param| query.bind(param));
308        // As said in https://docs.rs/sqlx/latest/sqlx/trait.Executor.html, we need a `&mut *transaction`. Weird.
309        let rows = query
310            .fetch_all(&mut *(self.0))
311            .await
312            .context(MySqlExecutionSnafu { sql: raw_query })?;
313        Ok(rows.into_iter().map(key_value_from_row).collect())
314    }
315
316    async fn execute(&mut self, raw_query: &str, params: &[&Vec<u8>]) -> Result<()> {
317        let query = sqlx::query(raw_query);
318        let query = params.iter().fold(query, |query, param| query.bind(param));
319        // As said in https://docs.rs/sqlx/latest/sqlx/trait.Executor.html, we need a `&mut *transaction`. Weird.
320        query
321            .execute(&mut *(self.0))
322            .await
323            .context(MySqlExecutionSnafu { sql: raw_query })?;
324        Ok(())
325    }
326
327    /// Caution: sqlx will stuck on the query if two transactions conflict with each other.
328    /// Don't know if it's a feature or it depends on the database. Be careful.
329    async fn commit(self) -> Result<()> {
330        self.0.commit().await.context(MySqlTransactionSnafu {
331            operation: "commit",
332        })?;
333        Ok(())
334    }
335}
336
337pub struct MySqlExecutorFactory {
338    pool: Arc<Pool<MySql>>,
339}
340
341#[async_trait::async_trait]
342impl ExecutorFactory<MySqlClient> for MySqlExecutorFactory {
343    async fn default_executor(&self) -> Result<MySqlClient> {
344        Ok(self.pool.clone())
345    }
346
347    async fn txn_executor<'a>(
348        &self,
349        default_executor: &'a mut MySqlClient,
350    ) -> Result<MySqlTxnClient> {
351        default_executor.txn_executor().await
352    }
353}
354
355/// A MySQL-backed key-value store.
356/// It uses [sqlx::Pool<MySql>] as the connection pool for [RdsStore].
357pub type MySqlStore = RdsStore<MySqlClient, MySqlExecutorFactory, MySqlTemplateSet>;
358
359#[async_trait::async_trait]
360impl KvQueryExecutor<MySqlClient> for MySqlStore {
361    async fn range_with_query_executor(
362        &self,
363        query_executor: &mut ExecutorImpl<'_, MySqlClient>,
364        req: RangeRequest,
365    ) -> Result<RangeResponse> {
366        let template_type = range_template(&req.key, &req.range_end);
367        let template = self.sql_template_set.range_template.get(template_type);
368        let params = template_type.build_params(req.key, req.range_end);
369        let params_ref = params.iter().collect::<Vec<_>>();
370        // Always add 1 to limit to check if there is more data
371        let query =
372            RangeTemplate::with_limit(template, if req.limit == 0 { 0 } else { req.limit + 1 });
373        let limit = req.limit as usize;
374        debug!("query: {:?}, params: {:?}", query, params);
375        let mut kvs = crate::record_rds_sql_execute_elapsed!(
376            query_executor.query(&query, &params_ref).await,
377            MYSQL_STORE_NAME,
378            RDS_STORE_OP_RANGE_QUERY,
379            template_type.as_ref()
380        )?;
381        if req.keys_only {
382            kvs.iter_mut().for_each(|kv| kv.value = vec![]);
383        }
384        // If limit is 0, we always return all data
385        if limit == 0 || kvs.len() <= limit {
386            return Ok(RangeResponse { kvs, more: false });
387        }
388        // If limit is greater than the number of rows, we remove the last row and set more to true
389        let removed = kvs.pop();
390        debug_assert!(removed.is_some());
391        Ok(RangeResponse { kvs, more: true })
392    }
393
394    async fn batch_put_with_query_executor(
395        &self,
396        query_executor: &mut ExecutorImpl<'_, MySqlClient>,
397        req: BatchPutRequest,
398    ) -> Result<BatchPutResponse> {
399        let mut in_params = Vec::with_capacity(req.kvs.len() * 3);
400        let mut values_params = Vec::with_capacity(req.kvs.len() * 2);
401
402        for kv in &req.kvs {
403            let processed_key = &kv.key;
404            in_params.push(processed_key);
405
406            let processed_value = &kv.value;
407            values_params.push(processed_key);
408            values_params.push(processed_value);
409        }
410        let in_params = in_params.iter().map(|x| x as _).collect::<Vec<_>>();
411        let values_params = values_params.iter().map(|x| x as _).collect::<Vec<_>>();
412        let (select, update) = self
413            .sql_template_set
414            .generate_batch_upsert_query(req.kvs.len());
415
416        // Fast path: if we don't need previous kvs, we can just upsert the keys.
417        if !req.prev_kv {
418            crate::record_rds_sql_execute_elapsed!(
419                query_executor.execute(&update, &values_params).await,
420                MYSQL_STORE_NAME,
421                RDS_STORE_OP_BATCH_PUT,
422                ""
423            )?;
424            return Ok(BatchPutResponse::default());
425        }
426        // Should use transaction to ensure atomicity.
427        if let ExecutorImpl::Default(query_executor) = query_executor {
428            let txn = query_executor.txn_executor().await?;
429            let mut txn = ExecutorImpl::Txn(txn);
430            let res = self.batch_put_with_query_executor(&mut txn, req).await;
431            txn.commit().await?;
432            return res;
433        }
434        let prev_kvs = crate::record_rds_sql_execute_elapsed!(
435            query_executor.query(&select, &in_params).await,
436            MYSQL_STORE_NAME,
437            RDS_STORE_OP_BATCH_PUT,
438            ""
439        )?;
440        query_executor.execute(&update, &values_params).await?;
441        Ok(BatchPutResponse { prev_kvs })
442    }
443
444    async fn batch_get_with_query_executor(
445        &self,
446        query_executor: &mut ExecutorImpl<'_, MySqlClient>,
447        req: BatchGetRequest,
448    ) -> Result<BatchGetResponse> {
449        if req.keys.is_empty() {
450            return Ok(BatchGetResponse { kvs: vec![] });
451        }
452        let query = self
453            .sql_template_set
454            .generate_batch_get_query(req.keys.len());
455        let params = req.keys.iter().map(|x| x as _).collect::<Vec<_>>();
456        let kvs = crate::record_rds_sql_execute_elapsed!(
457            query_executor.query(&query, &params).await,
458            MYSQL_STORE_NAME,
459            RDS_STORE_OP_BATCH_GET,
460            ""
461        )?;
462        Ok(BatchGetResponse { kvs })
463    }
464
465    async fn delete_range_with_query_executor(
466        &self,
467        query_executor: &mut ExecutorImpl<'_, MySqlClient>,
468        req: DeleteRangeRequest,
469    ) -> Result<DeleteRangeResponse> {
470        // Since we need to know the number of deleted keys, we have no fast path here.
471        // Should use transaction to ensure atomicity.
472        if let ExecutorImpl::Default(query_executor) = query_executor {
473            let txn = query_executor.txn_executor().await?;
474            let mut txn = ExecutorImpl::Txn(txn);
475            let res = self.delete_range_with_query_executor(&mut txn, req).await;
476            txn.commit().await?;
477            return res;
478        }
479        let range_get_req = RangeRequest {
480            key: req.key.clone(),
481            range_end: req.range_end.clone(),
482            limit: 0,
483            keys_only: false,
484        };
485        let prev_kvs = self
486            .range_with_query_executor(query_executor, range_get_req)
487            .await?
488            .kvs;
489        let template_type = range_template(&req.key, &req.range_end);
490        let template = self.sql_template_set.delete_template.get(template_type);
491        let params = template_type.build_params(req.key, req.range_end);
492        let params_ref = params.iter().map(|x| x as _).collect::<Vec<_>>();
493        crate::record_rds_sql_execute_elapsed!(
494            query_executor.execute(template, &params_ref).await,
495            MYSQL_STORE_NAME,
496            RDS_STORE_OP_RANGE_DELETE,
497            template_type.as_ref()
498        )?;
499        let mut resp = DeleteRangeResponse::new(prev_kvs.len() as i64);
500        if req.prev_kv {
501            resp.with_prev_kvs(prev_kvs);
502        }
503        Ok(resp)
504    }
505
506    async fn batch_delete_with_query_executor(
507        &self,
508        query_executor: &mut ExecutorImpl<'_, MySqlClient>,
509        req: BatchDeleteRequest,
510    ) -> Result<BatchDeleteResponse> {
511        if req.keys.is_empty() {
512            return Ok(BatchDeleteResponse::default());
513        }
514        let query = self
515            .sql_template_set
516            .generate_batch_delete_query(req.keys.len());
517        let params = req.keys.iter().map(|x| x as _).collect::<Vec<_>>();
518        // Fast path: if we don't need previous kvs, we can just delete the keys.
519        if !req.prev_kv {
520            crate::record_rds_sql_execute_elapsed!(
521                query_executor.execute(&query, &params).await,
522                MYSQL_STORE_NAME,
523                RDS_STORE_OP_BATCH_DELETE,
524                ""
525            )?;
526            return Ok(BatchDeleteResponse::default());
527        }
528        // Should use transaction to ensure atomicity.
529        if let ExecutorImpl::Default(query_executor) = query_executor {
530            let txn = query_executor.txn_executor().await?;
531            let mut txn = ExecutorImpl::Txn(txn);
532            let res = self.batch_delete_with_query_executor(&mut txn, req).await;
533            txn.commit().await?;
534            return res;
535        }
536        // Should get previous kvs first
537        let batch_get_req = BatchGetRequest {
538            keys: req.keys.clone(),
539        };
540        let prev_kvs = self
541            .batch_get_with_query_executor(query_executor, batch_get_req)
542            .await?
543            .kvs;
544        // Pure `DELETE` has no return value, so we need to use `execute` instead of `query`.
545        crate::record_rds_sql_execute_elapsed!(
546            query_executor.execute(&query, &params).await,
547            MYSQL_STORE_NAME,
548            RDS_STORE_OP_BATCH_DELETE,
549            ""
550        )?;
551        if req.prev_kv {
552            Ok(BatchDeleteResponse { prev_kvs })
553        } else {
554            Ok(BatchDeleteResponse::default())
555        }
556    }
557}
558
559impl MySqlStore {
560    /// Reads the current table definition for best-effort schema upgrades.
561    async fn fetch_create_table_sql(
562        pool: &Pool<MySql>,
563        sql_template_set: &MySqlTemplateSet,
564    ) -> Result<Option<String>> {
565        let row = sqlx::query(&sql_template_set.show_create_table_statement)
566            .fetch_optional(pool)
567            .await
568            .with_context(|_| MySqlExecutionSnafu {
569                sql: sql_template_set.show_create_table_statement.clone(),
570            })?;
571        Ok(row.map(|row| row.get(1)))
572    }
573
574    /// Parses the blob type of the `v` column from `SHOW CREATE TABLE` output.
575    fn parse_value_column_blob_type(create_table_sql: &str) -> Option<ValueBlobType> {
576        // `SHOW CREATE TABLE` returns MySQL-specific DDL. A minimal parser keeps the
577        // upgrade check small and avoids introducing a SQL parser just for one column.
578        let captures = VALUE_COLUMN_BLOB_TYPE_RE.captures(create_table_sql)?;
579        match captures.get(1)?.as_str().to_ascii_lowercase().as_str() {
580            "blob" => Some(ValueBlobType::Blob),
581            "mediumblob" => Some(ValueBlobType::MediumBlob),
582            "longblob" => Some(ValueBlobType::LongBlob),
583            _ => None,
584        }
585    }
586
587    /// Upgrades the metadata value column to `MEDIUMBLOB` when an old table still uses `BLOB`.
588    async fn maybe_upgrade_value_column_to_mediumblob(
589        pool: &Pool<MySql>,
590        sql_template_set: &MySqlTemplateSet,
591    ) -> Result<()> {
592        let table_name = &sql_template_set.table_name;
593        let create_table_sql = Self::fetch_create_table_sql(pool, sql_template_set)
594            .await?
595            .context(UnexpectedSnafu {
596                err_msg: format!("Failed to fetch CREATE TABLE SQL for `{table_name}`"),
597            })?;
598
599        match Self::parse_value_column_blob_type(&create_table_sql) {
600            Some(ValueBlobType::Blob) => {
601                sqlx::query(&sql_template_set.alter_value_column_statement)
602                    .execute(pool)
603                    .await
604                    .with_context(|_| MySqlExecutionSnafu {
605                        sql: sql_template_set.alter_value_column_statement.clone(),
606                    })?;
607                info!("Upgraded MySQL metadata value column to MEDIUMBLOB for `{table_name}`");
608            }
609            Some(ValueBlobType::MediumBlob | ValueBlobType::LongBlob) => {
610                debug!("MySQL metadata value column for `{table_name}` is already compatible");
611            }
612            None => {
613                warn!(
614                    "Failed to determine MySQL metadata value column type from table definition for `{table_name}`, skip automatic MEDIUMBLOB upgrade"
615                );
616            }
617        }
618
619        Ok(())
620    }
621
622    /// Create [MySqlStore] impl of [KvBackendRef] from url.
623    pub async fn with_url(url: &str, table_name: &str, max_txn_ops: usize) -> Result<KvBackendRef> {
624        ensure_rustls_crypto_provider_installed()?;
625        let pool = MySqlPool::connect(url)
626            .await
627            .context(CreateMySqlPoolSnafu)?;
628        Self::with_mysql_pool(pool, table_name, max_txn_ops).await
629    }
630
631    /// Create [MySqlStore] impl of [KvBackendRef] from [sqlx::Pool<MySql>].
632    pub async fn with_mysql_pool(
633        pool: Pool<MySql>,
634        table_name: &str,
635        max_txn_ops: usize,
636    ) -> Result<KvBackendRef> {
637        // This step ensures the mysql metadata backend is ready to use.
638        // We check if greptime_metakv table exists, and we will create a new table
639        // if it does not exist.
640        let sql_template_set = MySqlTemplateFactory::new(table_name).build();
641        sqlx::query(&sql_template_set.create_table_statement)
642            .execute(&pool)
643            .await
644            .with_context(|_| MySqlExecutionSnafu {
645                sql: sql_template_set.create_table_statement.clone(),
646            })?;
647        Self::maybe_upgrade_value_column_to_mediumblob(&pool, &sql_template_set).await?;
648        Ok(Arc::new(MySqlStore {
649            max_txn_ops,
650            sql_template_set,
651            txn_retry_count: RDS_STORE_TXN_RETRY_COUNT,
652            executor_factory: MySqlExecutorFactory {
653                pool: Arc::new(pool),
654            },
655            _phantom: PhantomData,
656        }))
657    }
658}
659
660#[cfg(test)]
661mod tests {
662    use common_telemetry::init_default_ut_logging;
663    use sqlx::mysql::{MySqlConnectOptions, MySqlSslMode};
664    use uuid::Uuid;
665
666    use super::*;
667    use crate::kv_backend::test::{
668        prepare_kv_with_prefix, test_kv_batch_delete_with_prefix, test_kv_batch_get_with_prefix,
669        test_kv_compare_and_put_with_prefix, test_kv_delete_range_with_prefix,
670        test_kv_put_with_prefix, test_kv_range_2_with_prefix, test_kv_range_with_prefix,
671        test_simple_kv_range, test_txn_compare_equal, test_txn_compare_greater,
672        test_txn_compare_less, test_txn_compare_not_equal, test_txn_one_compare_op,
673        text_txn_multi_compare_op, unprepare_kv,
674    };
675    use crate::maybe_skip_mysql_integration_test;
676    use crate::rpc::store::{PutRequest, RangeRequest};
677    use crate::test_util::test_certs_dir;
678
679    fn new_test_table_name(prefix: &str) -> String {
680        let uuid = Uuid::new_v4().simple().to_string();
681        let max_prefix_len = 63usize.saturating_sub(uuid.len() + 1);
682        let prefix = &prefix[..prefix.len().min(max_prefix_len)];
683        format!("{prefix}_{uuid}")
684    }
685
686    async fn mysql_pool() -> Option<MySqlPool> {
687        init_default_ut_logging();
688        let endpoints = std::env::var("GT_MYSQL_ENDPOINTS").unwrap_or_default();
689        if endpoints.is_empty() {
690            return None;
691        }
692        ensure_rustls_crypto_provider_installed().unwrap();
693        Some(MySqlPool::connect(&endpoints).await.unwrap())
694    }
695
696    async fn show_create_table(pool: &MySqlPool, table_name: &str) -> String {
697        let sql = format!("SHOW CREATE TABLE `{table_name}`");
698        let row = sqlx::query(&sql).fetch_one(pool).await.unwrap();
699        row.get::<String, _>(1)
700    }
701
702    async fn create_legacy_blob_table(pool: &MySqlPool, table_name: &str) {
703        let sql = format!(
704            "CREATE TABLE IF NOT EXISTS `{table_name}`(k VARBINARY(3072) PRIMARY KEY, v BLOB);"
705        );
706        sqlx::query(&sql).execute(pool).await.unwrap();
707    }
708
709    async fn drop_table(pool: &MySqlPool, table_name: &str) {
710        let sql = format!("DROP TABLE IF EXISTS `{table_name}`;");
711        sqlx::query(&sql).execute(pool).await.unwrap();
712    }
713
714    async fn build_mysql_kv_backend(table_name: &str) -> Option<MySqlStore> {
715        let pool = mysql_pool().await?;
716        let sql_templates = MySqlTemplateFactory::new(table_name).build();
717        sqlx::query(&sql_templates.create_table_statement)
718            .execute(&pool)
719            .await
720            .unwrap();
721        Some(MySqlStore {
722            max_txn_ops: 128,
723            sql_template_set: sql_templates,
724            txn_retry_count: RDS_STORE_TXN_RETRY_COUNT,
725            executor_factory: MySqlExecutorFactory {
726                pool: Arc::new(pool),
727            },
728            _phantom: PhantomData,
729        })
730    }
731
732    #[test]
733    fn test_parse_value_column_blob_type() {
734        let sql = r#"CREATE TABLE `greptime_metakv` (
735  `k` varbinary(3072) NOT NULL,
736  `v` MEDIUMBLOB,
737  PRIMARY KEY (`k`)
738) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4"#;
739        assert_eq!(
740            Some(ValueBlobType::MediumBlob),
741            MySqlStore::parse_value_column_blob_type(sql)
742        );
743
744        let sql = r#"CREATE TABLE `greptime_metakv` (`k` varbinary(3072) NOT NULL, `v` blob, PRIMARY KEY (`k`))"#;
745        assert_eq!(
746            Some(ValueBlobType::Blob),
747            MySqlStore::parse_value_column_blob_type(sql)
748        );
749
750        let sql = r#"CREATE TABLE `greptime_metakv` (`k` varbinary(3072) NOT NULL, `v` longblob, PRIMARY KEY (`k`))"#;
751        assert_eq!(
752            Some(ValueBlobType::LongBlob),
753            MySqlStore::parse_value_column_blob_type(sql)
754        );
755
756        let sql = "CREATE TABLE `greptime_metakv` (`k` varbinary(3072) NOT NULL, `v`    BLOB NOT NULL, PRIMARY KEY (`k`))";
757        assert_eq!(
758            Some(ValueBlobType::Blob),
759            MySqlStore::parse_value_column_blob_type(sql)
760        );
761
762        let sql = "CREATE TABLE `greptime_metakv` (\n  `k` varbinary(3072) NOT NULL,\n  \"v\" MediumBlob,\n  PRIMARY KEY (`k`)\n)";
763        assert_eq!(
764            Some(ValueBlobType::MediumBlob),
765            MySqlStore::parse_value_column_blob_type(sql)
766        );
767
768        let sql = "CREATE TABLE `greptime_metakv` (`k` varbinary(3072) NOT NULL, vv blob, `v` longblob, PRIMARY KEY (`k`))";
769        assert_eq!(
770            Some(ValueBlobType::LongBlob),
771            MySqlStore::parse_value_column_blob_type(sql)
772        );
773
774        let sql = "CREATE TABLE `greptime_metakv` (`k` varbinary(3072) NOT NULL, value blob, PRIMARY KEY (`k`))";
775        assert_eq!(None, MySqlStore::parse_value_column_blob_type(sql));
776
777        let sql = "CREATE TABLE `greptime_metakv` (`k` varbinary(3072) NOT NULL, `v` varchar(255), PRIMARY KEY (`k`))";
778        assert_eq!(None, MySqlStore::parse_value_column_blob_type(sql));
779
780        let sql =
781            "CREATE TABLE `greptime_metakv` (`k` varbinary(3072) NOT NULL, PRIMARY KEY (`k`))";
782        assert_eq!(None, MySqlStore::parse_value_column_blob_type(sql));
783    }
784
785    #[tokio::test]
786    async fn test_mysql_new_metadata_table_uses_mediumblob() {
787        maybe_skip_mysql_integration_test!();
788        let pool = mysql_pool().await.unwrap();
789        let table_name = new_test_table_name("test_mysql_mediumblob_schema");
790
791        MySqlStore::with_mysql_pool(pool.clone(), &table_name, 128)
792            .await
793            .unwrap();
794
795        let create_table = show_create_table(&pool, &table_name).await;
796        assert!(create_table.to_ascii_uppercase().contains("MEDIUMBLOB"));
797
798        drop_table(&pool, &table_name).await;
799    }
800
801    #[tokio::test]
802    async fn test_mysql_legacy_blob_metadata_table_is_upgraded() {
803        maybe_skip_mysql_integration_test!();
804        let pool = mysql_pool().await.unwrap();
805        let table_name = new_test_table_name("test_mysql_legacy_blob_upgrade");
806
807        create_legacy_blob_table(&pool, &table_name).await;
808        MySqlStore::with_mysql_pool(pool.clone(), &table_name, 128)
809            .await
810            .unwrap();
811
812        let create_table = show_create_table(&pool, &table_name).await;
813        assert!(create_table.to_ascii_uppercase().contains("MEDIUMBLOB"));
814
815        drop_table(&pool, &table_name).await;
816    }
817
818    #[tokio::test]
819    async fn test_mysql_metadata_table_stores_large_values() {
820        maybe_skip_mysql_integration_test!();
821        let pool = mysql_pool().await.unwrap();
822        let table_name = new_test_table_name("test_mysql_large_metadata_value");
823        let kv_backend = MySqlStore::with_mysql_pool(pool.clone(), &table_name, 128)
824            .await
825            .unwrap();
826        let key = b"large-value".to_vec();
827        let value = vec![b'x'; 70 * 1024];
828
829        kv_backend
830            .put(
831                PutRequest::new()
832                    .with_key(key.clone())
833                    .with_value(value.clone()),
834            )
835            .await
836            .unwrap();
837        let response = kv_backend
838            .range(RangeRequest::new().with_key(key.clone()))
839            .await
840            .unwrap();
841
842        assert_eq!(1, response.kvs.len());
843        assert_eq!(key, response.kvs[0].key);
844        assert_eq!(value, response.kvs[0].value);
845
846        drop_table(&pool, &table_name).await;
847    }
848
849    #[tokio::test]
850    async fn test_mysql_upgraded_metadata_table_stores_large_values() {
851        maybe_skip_mysql_integration_test!();
852        let pool = mysql_pool().await.unwrap();
853        let table_name = new_test_table_name("test_mysql_upgraded_large_metadata_value");
854
855        create_legacy_blob_table(&pool, &table_name).await;
856        let kv_backend = MySqlStore::with_mysql_pool(pool.clone(), &table_name, 128)
857            .await
858            .unwrap();
859        let key = b"large-value".to_vec();
860        let value = vec![b'y'; 70 * 1024];
861
862        kv_backend
863            .put(
864                PutRequest::new()
865                    .with_key(key.clone())
866                    .with_value(value.clone()),
867            )
868            .await
869            .unwrap();
870        let response = kv_backend
871            .range(RangeRequest::new().with_key(key.clone()))
872            .await
873            .unwrap();
874
875        assert_eq!(1, response.kvs.len());
876        assert_eq!(key, response.kvs[0].key);
877        assert_eq!(value, response.kvs[0].value);
878
879        drop_table(&pool, &table_name).await;
880    }
881
882    #[tokio::test]
883    async fn test_mysql_put() {
884        maybe_skip_mysql_integration_test!();
885        let kv_backend = build_mysql_kv_backend("put-test").await.unwrap();
886        let prefix = b"put/";
887        prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
888        test_kv_put_with_prefix(&kv_backend, prefix.to_vec()).await;
889        unprepare_kv(&kv_backend, prefix).await;
890    }
891
892    #[tokio::test]
893    async fn test_mysql_range() {
894        maybe_skip_mysql_integration_test!();
895        let kv_backend = build_mysql_kv_backend("range-test").await.unwrap();
896        let prefix = b"range/";
897        prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
898        test_kv_range_with_prefix(&kv_backend, prefix.to_vec()).await;
899        unprepare_kv(&kv_backend, prefix).await;
900    }
901
902    #[tokio::test]
903    async fn test_mysql_range_2() {
904        maybe_skip_mysql_integration_test!();
905        let kv_backend = build_mysql_kv_backend("range2-test").await.unwrap();
906        let prefix = b"range2/";
907        test_kv_range_2_with_prefix(&kv_backend, prefix.to_vec()).await;
908        unprepare_kv(&kv_backend, prefix).await;
909    }
910
911    #[tokio::test]
912    async fn test_mysql_all_range() {
913        maybe_skip_mysql_integration_test!();
914        let kv_backend = build_mysql_kv_backend("simple_range-test").await.unwrap();
915        let prefix = b"";
916        prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
917        test_simple_kv_range(&kv_backend).await;
918        unprepare_kv(&kv_backend, prefix).await;
919    }
920
921    #[tokio::test]
922    async fn test_mysql_batch_get() {
923        maybe_skip_mysql_integration_test!();
924        let kv_backend = build_mysql_kv_backend("batch_get-test").await.unwrap();
925        let prefix = b"batch_get/";
926        prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
927        test_kv_batch_get_with_prefix(&kv_backend, prefix.to_vec()).await;
928        unprepare_kv(&kv_backend, prefix).await;
929    }
930
931    #[tokio::test]
932    async fn test_mysql_batch_delete() {
933        maybe_skip_mysql_integration_test!();
934        let kv_backend = build_mysql_kv_backend("batch_delete-test").await.unwrap();
935        let prefix = b"batch_delete/";
936        prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
937        test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await;
938        unprepare_kv(&kv_backend, prefix).await;
939    }
940
941    #[tokio::test]
942    async fn test_mysql_batch_delete_with_prefix() {
943        maybe_skip_mysql_integration_test!();
944        let kv_backend = build_mysql_kv_backend("batch_delete_with_prefix-test")
945            .await
946            .unwrap();
947        let prefix = b"batch_delete/";
948        prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
949        test_kv_batch_delete_with_prefix(&kv_backend, prefix.to_vec()).await;
950        unprepare_kv(&kv_backend, prefix).await;
951    }
952
953    #[tokio::test]
954    async fn test_mysql_delete_range() {
955        maybe_skip_mysql_integration_test!();
956        let kv_backend = build_mysql_kv_backend("delete_range-test").await.unwrap();
957        let prefix = b"delete_range/";
958        prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
959        test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await;
960        unprepare_kv(&kv_backend, prefix).await;
961    }
962
963    #[tokio::test]
964    async fn test_mysql_compare_and_put() {
965        maybe_skip_mysql_integration_test!();
966        let kv_backend = build_mysql_kv_backend("compare_and_put-test")
967            .await
968            .unwrap();
969        let prefix = b"compare_and_put/";
970        let kv_backend = Arc::new(kv_backend);
971        test_kv_compare_and_put_with_prefix(kv_backend.clone(), prefix.to_vec()).await;
972    }
973
974    #[tokio::test]
975    async fn test_mysql_txn() {
976        maybe_skip_mysql_integration_test!();
977        let kv_backend = build_mysql_kv_backend("txn-test").await.unwrap();
978        test_txn_one_compare_op(&kv_backend).await;
979        text_txn_multi_compare_op(&kv_backend).await;
980        test_txn_compare_equal(&kv_backend).await;
981        test_txn_compare_greater(&kv_backend).await;
982        test_txn_compare_less(&kv_backend).await;
983        test_txn_compare_not_equal(&kv_backend).await;
984    }
985
986    #[tokio::test]
987    async fn test_mysql_with_tls() {
988        common_telemetry::init_default_ut_logging();
989        maybe_skip_mysql_integration_test!();
990        ensure_rustls_crypto_provider_installed().unwrap();
991        let endpoint = std::env::var("GT_MYSQL_ENDPOINTS").unwrap();
992
993        let opts = endpoint
994            .parse::<MySqlConnectOptions>()
995            .unwrap()
996            .ssl_mode(MySqlSslMode::Required);
997        let pool = MySqlPool::connect_with(opts).await.unwrap();
998        sqlx::query("SELECT 1").execute(&pool).await.unwrap();
999    }
1000
1001    #[tokio::test]
1002    async fn test_mysql_with_mtls() {
1003        common_telemetry::init_default_ut_logging();
1004        maybe_skip_mysql_integration_test!();
1005        ensure_rustls_crypto_provider_installed().unwrap();
1006        let endpoint = std::env::var("GT_MYSQL_ENDPOINTS").unwrap();
1007        let certs_dir = test_certs_dir();
1008
1009        let opts = endpoint
1010            .parse::<MySqlConnectOptions>()
1011            .unwrap()
1012            .ssl_mode(MySqlSslMode::Required)
1013            .ssl_client_cert(certs_dir.join("client.crt").to_string_lossy().to_string())
1014            .ssl_client_key(certs_dir.join("client.key").to_string_lossy().to_string());
1015        let pool = MySqlPool::connect_with(opts).await.unwrap();
1016        sqlx::query("SELECT 1").execute(&pool).await.unwrap();
1017    }
1018
1019    #[tokio::test]
1020    async fn test_mysql_with_tls_verify_ca() {
1021        common_telemetry::init_default_ut_logging();
1022        maybe_skip_mysql_integration_test!();
1023        ensure_rustls_crypto_provider_installed().unwrap();
1024        let endpoint = std::env::var("GT_MYSQL_ENDPOINTS").unwrap();
1025        let certs_dir = test_certs_dir();
1026
1027        let opts = endpoint
1028            .parse::<MySqlConnectOptions>()
1029            .unwrap()
1030            .ssl_mode(MySqlSslMode::VerifyCa)
1031            .ssl_ca(certs_dir.join("root.crt").to_string_lossy().to_string())
1032            .ssl_client_cert(certs_dir.join("client.crt").to_string_lossy().to_string())
1033            .ssl_client_key(certs_dir.join("client.key").to_string_lossy().to_string());
1034        let pool = MySqlPool::connect_with(opts).await.unwrap();
1035        sqlx::query("SELECT 1").execute(&pool).await.unwrap();
1036    }
1037
1038    #[tokio::test]
1039    async fn test_mysql_with_tls_verify_ident() {
1040        common_telemetry::init_default_ut_logging();
1041        maybe_skip_mysql_integration_test!();
1042        ensure_rustls_crypto_provider_installed().unwrap();
1043        let endpoint = std::env::var("GT_MYSQL_ENDPOINTS").unwrap();
1044        let certs_dir = test_certs_dir();
1045
1046        let opts = endpoint
1047            .parse::<MySqlConnectOptions>()
1048            .unwrap()
1049            .ssl_mode(MySqlSslMode::VerifyIdentity)
1050            .ssl_ca(certs_dir.join("root.crt").to_string_lossy().to_string())
1051            .ssl_client_cert(certs_dir.join("client.crt").to_string_lossy().to_string())
1052            .ssl_client_key(certs_dir.join("client.key").to_string_lossy().to_string());
1053        let pool = MySqlPool::connect_with(opts).await.unwrap();
1054        sqlx::query("SELECT 1").execute(&pool).await.unwrap();
1055    }
1056}