Skip to main content

common_meta/kv_backend/rds/
mysql.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::marker::PhantomData;
16use std::sync::Arc;
17
18use common_telemetry::{debug, info, warn};
19use lazy_static::lazy_static;
20use regex::Regex;
21use snafu::{OptionExt, ResultExt};
22use sqlx::mysql::MySqlRow;
23use sqlx::pool::Pool;
24use sqlx::{MySql, MySqlPool, Row, Transaction as MySqlTransaction};
25use strum::AsRefStr;
26
27use crate::error::{
28    CreateMySqlPoolSnafu, MySqlExecutionSnafu, MySqlTransactionSnafu, Result, UnexpectedSnafu,
29};
30use crate::kv_backend::KvBackendRef;
31use crate::kv_backend::rds::{
32    Executor, ExecutorFactory, ExecutorImpl, KvQueryExecutor, RDS_STORE_OP_BATCH_DELETE,
33    RDS_STORE_OP_BATCH_GET, RDS_STORE_OP_BATCH_PUT, RDS_STORE_OP_RANGE_DELETE,
34    RDS_STORE_OP_RANGE_QUERY, RDS_STORE_TXN_RETRY_COUNT, RdsStore, Transaction,
35};
36use crate::rpc::KeyValue;
37use crate::rpc::store::{
38    BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
39    BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, RangeRequest, RangeResponse,
40};
41
42const MYSQL_STORE_NAME: &str = "mysql_store";
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45enum ValueBlobType {
46    Blob,
47    MediumBlob,
48    LongBlob,
49}
50
51lazy_static! {
52    static ref VALUE_COLUMN_BLOB_TYPE_RE: Regex =
53        Regex::new(r#"(?i)(?:\(|,)\s*[`"]?v[`"]?\s+(longblob|mediumblob|blob)\b"#).unwrap();
54}
55
56type MySqlClient = Arc<Pool<MySql>>;
57pub struct MySqlTxnClient(MySqlTransaction<'static, MySql>);
58
59fn key_value_from_row(row: MySqlRow) -> KeyValue {
60    // Safety: key and value are the first two columns in the row
61    KeyValue {
62        key: row.get_unchecked(0),
63        value: row.get_unchecked(1),
64    }
65}
66
67const EMPTY: &[u8] = &[0];
68
69/// Type of range template.
70#[derive(Debug, Clone, Copy, AsRefStr)]
71enum RangeTemplateType {
72    Point,
73    Range,
74    Full,
75    LeftBounded,
76    Prefix,
77}
78
79/// Builds params for the given range template type.
80impl RangeTemplateType {
81    /// Builds the parameters for the given range template type.
82    /// You can check out the conventions at [RangeRequest]
83    fn build_params(&self, mut key: Vec<u8>, range_end: Vec<u8>) -> Vec<Vec<u8>> {
84        match self {
85            RangeTemplateType::Point => vec![key],
86            RangeTemplateType::Range => vec![key, range_end],
87            RangeTemplateType::Full => vec![],
88            RangeTemplateType::LeftBounded => vec![key],
89            RangeTemplateType::Prefix => {
90                key.push(b'%');
91                vec![key]
92            }
93        }
94    }
95}
96
97/// Templates for range request.
98#[derive(Debug, Clone)]
99struct RangeTemplate {
100    point: String,
101    range: String,
102    full: String,
103    left_bounded: String,
104    prefix: String,
105}
106
107impl RangeTemplate {
108    /// Gets the template for the given type.
109    fn get(&self, typ: RangeTemplateType) -> &str {
110        match typ {
111            RangeTemplateType::Point => &self.point,
112            RangeTemplateType::Range => &self.range,
113            RangeTemplateType::Full => &self.full,
114            RangeTemplateType::LeftBounded => &self.left_bounded,
115            RangeTemplateType::Prefix => &self.prefix,
116        }
117    }
118
119    /// Adds limit to the template.
120    fn with_limit(template: &str, limit: i64) -> String {
121        if limit == 0 {
122            return format!("{};", template);
123        }
124        format!("{} LIMIT {};", template, limit)
125    }
126}
127
128fn is_prefix_range(start: &[u8], end: &[u8]) -> bool {
129    if start.len() != end.len() {
130        return false;
131    }
132    let l = start.len();
133    let same_prefix = start[0..l - 1] == end[0..l - 1];
134    if let (Some(rhs), Some(lhs)) = (start.last(), end.last()) {
135        return same_prefix && (*rhs + 1) == *lhs;
136    }
137    false
138}
139
140/// Determine the template type for range request.
141fn range_template(key: &[u8], range_end: &[u8]) -> RangeTemplateType {
142    match (key, range_end) {
143        (_, &[]) => RangeTemplateType::Point,
144        (EMPTY, EMPTY) => RangeTemplateType::Full,
145        (_, EMPTY) => RangeTemplateType::LeftBounded,
146        (start, end) => {
147            if is_prefix_range(start, end) {
148                RangeTemplateType::Prefix
149            } else {
150                RangeTemplateType::Range
151            }
152        }
153    }
154}
155
156/// Generate in placeholders for MySQL.
157fn mysql_generate_in_placeholders(from: usize, to: usize) -> Vec<String> {
158    (from..=to).map(|_| "?".to_string()).collect()
159}
160
161/// Factory for building sql templates.
162struct MySqlTemplateFactory<'a> {
163    table_name: &'a str,
164}
165
166impl<'a> MySqlTemplateFactory<'a> {
167    /// Creates a new [`SqlTemplateFactory`] with the given table name.
168    fn new(table_name: &'a str) -> Self {
169        Self { table_name }
170    }
171
172    /// Builds the template set for the given table name.
173    fn build(&self) -> MySqlTemplateSet {
174        let table_name = self.table_name;
175        // Some of queries don't end with `;`, because we need to add `LIMIT` clause.
176        MySqlTemplateSet {
177            table_name: table_name.to_string(),
178            create_table_statement: format!(
179                // Cannot be more than 3072 bytes in PRIMARY KEY
180                "CREATE TABLE IF NOT EXISTS `{table_name}`(k VARBINARY(3072) PRIMARY KEY, v MEDIUMBLOB);",
181            ),
182            show_create_table_statement: format!("SHOW CREATE TABLE `{table_name}`"),
183            alter_value_column_statement: format!(
184                "ALTER TABLE `{table_name}` MODIFY COLUMN v MEDIUMBLOB;"
185            ),
186            range_template: RangeTemplate {
187                point: format!("SELECT k, v FROM `{table_name}` WHERE k = ?"),
188                range: format!("SELECT k, v FROM `{table_name}` WHERE k >= ? AND k < ? ORDER BY k"),
189                full: format!("SELECT k, v FROM `{table_name}` ORDER BY k"),
190                left_bounded: format!("SELECT k, v FROM `{table_name}` WHERE k >= ? ORDER BY k"),
191                prefix: format!("SELECT k, v FROM `{table_name}` WHERE k LIKE ? ORDER BY k"),
192            },
193            delete_template: RangeTemplate {
194                point: format!("DELETE FROM `{table_name}` WHERE k = ?;"),
195                range: format!("DELETE FROM `{table_name}` WHERE k >= ? AND k < ?;"),
196                full: format!("DELETE FROM `{table_name}`"),
197                left_bounded: format!("DELETE FROM `{table_name}` WHERE k >= ?;"),
198                prefix: format!("DELETE FROM `{table_name}` WHERE k LIKE ?;"),
199            },
200        }
201    }
202}
203
204/// Templates for the given table name.
205#[derive(Debug, Clone)]
206pub struct MySqlTemplateSet {
207    table_name: String,
208    create_table_statement: String,
209    show_create_table_statement: String,
210    alter_value_column_statement: String,
211    range_template: RangeTemplate,
212    delete_template: RangeTemplate,
213}
214
215impl MySqlTemplateSet {
216    /// Generates the sql for batch get.
217    fn generate_batch_get_query(&self, key_len: usize) -> String {
218        let table_name = &self.table_name;
219        let in_clause = mysql_generate_in_placeholders(1, key_len).join(", ");
220        format!(
221            "SELECT k, v FROM `{table_name}` WHERE k in ({});",
222            in_clause
223        )
224    }
225
226    /// Generates the sql for batch delete.
227    fn generate_batch_delete_query(&self, key_len: usize) -> String {
228        let table_name = &self.table_name;
229        let in_clause = mysql_generate_in_placeholders(1, key_len).join(", ");
230        format!("DELETE FROM `{table_name}` WHERE k in ({});", in_clause)
231    }
232
233    /// Generates the sql for batch upsert.
234    /// For MySQL, it also generates a select query to get the previous values.
235    fn generate_batch_upsert_query(&self, kv_len: usize) -> (String, String) {
236        let table_name = &self.table_name;
237        let in_placeholders: Vec<String> = (1..=kv_len).map(|_| "?".to_string()).collect();
238        let in_clause = in_placeholders.join(", ");
239        let mut values_placeholders = Vec::new();
240        for _ in 0..kv_len {
241            values_placeholders.push("(?, ?)".to_string());
242        }
243        let values_clause = values_placeholders.join(", ");
244
245        (
246            format!(r#"SELECT k, v FROM `{table_name}` WHERE k IN ({in_clause})"#,),
247            format!(
248                r#"INSERT INTO `{table_name}` (k, v) VALUES {values_clause} ON DUPLICATE KEY UPDATE v = VALUES(v);"#,
249            ),
250        )
251    }
252}
253
254#[async_trait::async_trait]
255impl Executor for MySqlClient {
256    type Transaction<'a>
257        = MySqlTxnClient
258    where
259        Self: 'a;
260
261    fn name() -> &'static str {
262        "MySql"
263    }
264
265    async fn query(&mut self, raw_query: &str, params: &[&Vec<u8>]) -> Result<Vec<KeyValue>> {
266        let query = sqlx::query(raw_query);
267        let query = params.iter().fold(query, |query, param| query.bind(param));
268        let rows = query
269            .fetch_all(&**self)
270            .await
271            .context(MySqlExecutionSnafu { sql: raw_query })?;
272        Ok(rows.into_iter().map(key_value_from_row).collect())
273    }
274
275    async fn execute(&mut self, raw_query: &str, params: &[&Vec<u8>]) -> Result<()> {
276        let query = sqlx::query(raw_query);
277        let query = params.iter().fold(query, |query, param| query.bind(param));
278        query
279            .execute(&**self)
280            .await
281            .context(MySqlExecutionSnafu { sql: raw_query })?;
282        Ok(())
283    }
284
285    async fn txn_executor<'a>(&'a mut self) -> Result<Self::Transaction<'a>> {
286        // sqlx has no isolation level support for now, so we have to set it manually.
287        // TODO(CookiePie): Waiting for https://github.com/launchbadge/sqlx/pull/3614 and remove this.
288        sqlx::query("SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE")
289            .execute(&**self)
290            .await
291            .context(MySqlExecutionSnafu {
292                sql: "SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE",
293            })?;
294        let txn = self
295            .begin()
296            .await
297            .context(MySqlExecutionSnafu { sql: "begin" })?;
298        Ok(MySqlTxnClient(txn))
299    }
300}
301
302#[async_trait::async_trait]
303impl Transaction<'_> for MySqlTxnClient {
304    async fn query(&mut self, raw_query: &str, params: &[&Vec<u8>]) -> Result<Vec<KeyValue>> {
305        let query = sqlx::query(raw_query);
306        let query = params.iter().fold(query, |query, param| query.bind(param));
307        // As said in https://docs.rs/sqlx/latest/sqlx/trait.Executor.html, we need a `&mut *transaction`. Weird.
308        let rows = query
309            .fetch_all(&mut *(self.0))
310            .await
311            .context(MySqlExecutionSnafu { sql: raw_query })?;
312        Ok(rows.into_iter().map(key_value_from_row).collect())
313    }
314
315    async fn execute(&mut self, raw_query: &str, params: &[&Vec<u8>]) -> Result<()> {
316        let query = sqlx::query(raw_query);
317        let query = params.iter().fold(query, |query, param| query.bind(param));
318        // As said in https://docs.rs/sqlx/latest/sqlx/trait.Executor.html, we need a `&mut *transaction`. Weird.
319        query
320            .execute(&mut *(self.0))
321            .await
322            .context(MySqlExecutionSnafu { sql: raw_query })?;
323        Ok(())
324    }
325
326    /// Caution: sqlx will stuck on the query if two transactions conflict with each other.
327    /// Don't know if it's a feature or it depends on the database. Be careful.
328    async fn commit(self) -> Result<()> {
329        self.0.commit().await.context(MySqlTransactionSnafu {
330            operation: "commit",
331        })?;
332        Ok(())
333    }
334}
335
336pub struct MySqlExecutorFactory {
337    pool: Arc<Pool<MySql>>,
338}
339
340#[async_trait::async_trait]
341impl ExecutorFactory<MySqlClient> for MySqlExecutorFactory {
342    async fn default_executor(&self) -> Result<MySqlClient> {
343        Ok(self.pool.clone())
344    }
345
346    async fn txn_executor<'a>(
347        &self,
348        default_executor: &'a mut MySqlClient,
349    ) -> Result<MySqlTxnClient> {
350        default_executor.txn_executor().await
351    }
352}
353
354/// A MySQL-backed key-value store.
355/// It uses [sqlx::Pool<MySql>] as the connection pool for [RdsStore].
356pub type MySqlStore = RdsStore<MySqlClient, MySqlExecutorFactory, MySqlTemplateSet>;
357
358#[async_trait::async_trait]
359impl KvQueryExecutor<MySqlClient> for MySqlStore {
360    async fn range_with_query_executor(
361        &self,
362        query_executor: &mut ExecutorImpl<'_, MySqlClient>,
363        req: RangeRequest,
364    ) -> Result<RangeResponse> {
365        let template_type = range_template(&req.key, &req.range_end);
366        let template = self.sql_template_set.range_template.get(template_type);
367        let params = template_type.build_params(req.key, req.range_end);
368        let params_ref = params.iter().collect::<Vec<_>>();
369        // Always add 1 to limit to check if there is more data
370        let query =
371            RangeTemplate::with_limit(template, if req.limit == 0 { 0 } else { req.limit + 1 });
372        let limit = req.limit as usize;
373        debug!("query: {:?}, params: {:?}", query, params);
374        let mut kvs = crate::record_rds_sql_execute_elapsed!(
375            query_executor.query(&query, &params_ref).await,
376            MYSQL_STORE_NAME,
377            RDS_STORE_OP_RANGE_QUERY,
378            template_type.as_ref()
379        )?;
380        if req.keys_only {
381            kvs.iter_mut().for_each(|kv| kv.value = vec![]);
382        }
383        // If limit is 0, we always return all data
384        if limit == 0 || kvs.len() <= limit {
385            return Ok(RangeResponse { kvs, more: false });
386        }
387        // If limit is greater than the number of rows, we remove the last row and set more to true
388        let removed = kvs.pop();
389        debug_assert!(removed.is_some());
390        Ok(RangeResponse { kvs, more: true })
391    }
392
393    async fn batch_put_with_query_executor(
394        &self,
395        query_executor: &mut ExecutorImpl<'_, MySqlClient>,
396        req: BatchPutRequest,
397    ) -> Result<BatchPutResponse> {
398        let mut in_params = Vec::with_capacity(req.kvs.len() * 3);
399        let mut values_params = Vec::with_capacity(req.kvs.len() * 2);
400
401        for kv in &req.kvs {
402            let processed_key = &kv.key;
403            in_params.push(processed_key);
404
405            let processed_value = &kv.value;
406            values_params.push(processed_key);
407            values_params.push(processed_value);
408        }
409        let in_params = in_params.iter().map(|x| x as _).collect::<Vec<_>>();
410        let values_params = values_params.iter().map(|x| x as _).collect::<Vec<_>>();
411        let (select, update) = self
412            .sql_template_set
413            .generate_batch_upsert_query(req.kvs.len());
414
415        // Fast path: if we don't need previous kvs, we can just upsert the keys.
416        if !req.prev_kv {
417            crate::record_rds_sql_execute_elapsed!(
418                query_executor.execute(&update, &values_params).await,
419                MYSQL_STORE_NAME,
420                RDS_STORE_OP_BATCH_PUT,
421                ""
422            )?;
423            return Ok(BatchPutResponse::default());
424        }
425        // Should use transaction to ensure atomicity.
426        if let ExecutorImpl::Default(query_executor) = query_executor {
427            let txn = query_executor.txn_executor().await?;
428            let mut txn = ExecutorImpl::Txn(txn);
429            let res = self.batch_put_with_query_executor(&mut txn, req).await;
430            txn.commit().await?;
431            return res;
432        }
433        let prev_kvs = crate::record_rds_sql_execute_elapsed!(
434            query_executor.query(&select, &in_params).await,
435            MYSQL_STORE_NAME,
436            RDS_STORE_OP_BATCH_PUT,
437            ""
438        )?;
439        query_executor.execute(&update, &values_params).await?;
440        Ok(BatchPutResponse { prev_kvs })
441    }
442
443    async fn batch_get_with_query_executor(
444        &self,
445        query_executor: &mut ExecutorImpl<'_, MySqlClient>,
446        req: BatchGetRequest,
447    ) -> Result<BatchGetResponse> {
448        if req.keys.is_empty() {
449            return Ok(BatchGetResponse { kvs: vec![] });
450        }
451        let query = self
452            .sql_template_set
453            .generate_batch_get_query(req.keys.len());
454        let params = req.keys.iter().map(|x| x as _).collect::<Vec<_>>();
455        let kvs = crate::record_rds_sql_execute_elapsed!(
456            query_executor.query(&query, &params).await,
457            MYSQL_STORE_NAME,
458            RDS_STORE_OP_BATCH_GET,
459            ""
460        )?;
461        Ok(BatchGetResponse { kvs })
462    }
463
464    async fn delete_range_with_query_executor(
465        &self,
466        query_executor: &mut ExecutorImpl<'_, MySqlClient>,
467        req: DeleteRangeRequest,
468    ) -> Result<DeleteRangeResponse> {
469        // Since we need to know the number of deleted keys, we have no fast path here.
470        // Should use transaction to ensure atomicity.
471        if let ExecutorImpl::Default(query_executor) = query_executor {
472            let txn = query_executor.txn_executor().await?;
473            let mut txn = ExecutorImpl::Txn(txn);
474            let res = self.delete_range_with_query_executor(&mut txn, req).await;
475            txn.commit().await?;
476            return res;
477        }
478        let range_get_req = RangeRequest {
479            key: req.key.clone(),
480            range_end: req.range_end.clone(),
481            limit: 0,
482            keys_only: false,
483        };
484        let prev_kvs = self
485            .range_with_query_executor(query_executor, range_get_req)
486            .await?
487            .kvs;
488        let template_type = range_template(&req.key, &req.range_end);
489        let template = self.sql_template_set.delete_template.get(template_type);
490        let params = template_type.build_params(req.key, req.range_end);
491        let params_ref = params.iter().map(|x| x as _).collect::<Vec<_>>();
492        crate::record_rds_sql_execute_elapsed!(
493            query_executor.execute(template, &params_ref).await,
494            MYSQL_STORE_NAME,
495            RDS_STORE_OP_RANGE_DELETE,
496            template_type.as_ref()
497        )?;
498        let mut resp = DeleteRangeResponse::new(prev_kvs.len() as i64);
499        if req.prev_kv {
500            resp.with_prev_kvs(prev_kvs);
501        }
502        Ok(resp)
503    }
504
505    async fn batch_delete_with_query_executor(
506        &self,
507        query_executor: &mut ExecutorImpl<'_, MySqlClient>,
508        req: BatchDeleteRequest,
509    ) -> Result<BatchDeleteResponse> {
510        if req.keys.is_empty() {
511            return Ok(BatchDeleteResponse::default());
512        }
513        let query = self
514            .sql_template_set
515            .generate_batch_delete_query(req.keys.len());
516        let params = req.keys.iter().map(|x| x as _).collect::<Vec<_>>();
517        // Fast path: if we don't need previous kvs, we can just delete the keys.
518        if !req.prev_kv {
519            crate::record_rds_sql_execute_elapsed!(
520                query_executor.execute(&query, &params).await,
521                MYSQL_STORE_NAME,
522                RDS_STORE_OP_BATCH_DELETE,
523                ""
524            )?;
525            return Ok(BatchDeleteResponse::default());
526        }
527        // Should use transaction to ensure atomicity.
528        if let ExecutorImpl::Default(query_executor) = query_executor {
529            let txn = query_executor.txn_executor().await?;
530            let mut txn = ExecutorImpl::Txn(txn);
531            let res = self.batch_delete_with_query_executor(&mut txn, req).await;
532            txn.commit().await?;
533            return res;
534        }
535        // Should get previous kvs first
536        let batch_get_req = BatchGetRequest {
537            keys: req.keys.clone(),
538        };
539        let prev_kvs = self
540            .batch_get_with_query_executor(query_executor, batch_get_req)
541            .await?
542            .kvs;
543        // Pure `DELETE` has no return value, so we need to use `execute` instead of `query`.
544        crate::record_rds_sql_execute_elapsed!(
545            query_executor.execute(&query, &params).await,
546            MYSQL_STORE_NAME,
547            RDS_STORE_OP_BATCH_DELETE,
548            ""
549        )?;
550        if req.prev_kv {
551            Ok(BatchDeleteResponse { prev_kvs })
552        } else {
553            Ok(BatchDeleteResponse::default())
554        }
555    }
556}
557
558impl MySqlStore {
559    /// Reads the current table definition for best-effort schema upgrades.
560    async fn fetch_create_table_sql(
561        pool: &Pool<MySql>,
562        sql_template_set: &MySqlTemplateSet,
563    ) -> Result<Option<String>> {
564        let row = sqlx::query(&sql_template_set.show_create_table_statement)
565            .fetch_optional(pool)
566            .await
567            .with_context(|_| MySqlExecutionSnafu {
568                sql: sql_template_set.show_create_table_statement.clone(),
569            })?;
570        Ok(row.map(|row| row.get(1)))
571    }
572
573    /// Parses the blob type of the `v` column from `SHOW CREATE TABLE` output.
574    fn parse_value_column_blob_type(create_table_sql: &str) -> Option<ValueBlobType> {
575        // `SHOW CREATE TABLE` returns MySQL-specific DDL. A minimal parser keeps the
576        // upgrade check small and avoids introducing a SQL parser just for one column.
577        let captures = VALUE_COLUMN_BLOB_TYPE_RE.captures(create_table_sql)?;
578        match captures.get(1)?.as_str().to_ascii_lowercase().as_str() {
579            "blob" => Some(ValueBlobType::Blob),
580            "mediumblob" => Some(ValueBlobType::MediumBlob),
581            "longblob" => Some(ValueBlobType::LongBlob),
582            _ => None,
583        }
584    }
585
586    /// Upgrades the metadata value column to `MEDIUMBLOB` when an old table still uses `BLOB`.
587    async fn maybe_upgrade_value_column_to_mediumblob(
588        pool: &Pool<MySql>,
589        sql_template_set: &MySqlTemplateSet,
590    ) -> Result<()> {
591        let table_name = &sql_template_set.table_name;
592        let create_table_sql = Self::fetch_create_table_sql(pool, sql_template_set)
593            .await?
594            .context(UnexpectedSnafu {
595                err_msg: format!("Failed to fetch CREATE TABLE SQL for `{table_name}`"),
596            })?;
597
598        match Self::parse_value_column_blob_type(&create_table_sql) {
599            Some(ValueBlobType::Blob) => {
600                sqlx::query(&sql_template_set.alter_value_column_statement)
601                    .execute(pool)
602                    .await
603                    .with_context(|_| MySqlExecutionSnafu {
604                        sql: sql_template_set.alter_value_column_statement.clone(),
605                    })?;
606                info!("Upgraded MySQL metadata value column to MEDIUMBLOB for `{table_name}`");
607            }
608            Some(ValueBlobType::MediumBlob | ValueBlobType::LongBlob) => {
609                debug!("MySQL metadata value column for `{table_name}` is already compatible");
610            }
611            None => {
612                warn!(
613                    "Failed to determine MySQL metadata value column type from table definition for `{table_name}`, skip automatic MEDIUMBLOB upgrade"
614                );
615            }
616        }
617
618        Ok(())
619    }
620
621    /// Create [MySqlStore] impl of [KvBackendRef] from url.
622    pub async fn with_url(url: &str, table_name: &str, max_txn_ops: usize) -> Result<KvBackendRef> {
623        let pool = MySqlPool::connect(url)
624            .await
625            .context(CreateMySqlPoolSnafu)?;
626        Self::with_mysql_pool(pool, table_name, max_txn_ops).await
627    }
628
629    /// Create [MySqlStore] impl of [KvBackendRef] from [sqlx::Pool<MySql>].
630    pub async fn with_mysql_pool(
631        pool: Pool<MySql>,
632        table_name: &str,
633        max_txn_ops: usize,
634    ) -> Result<KvBackendRef> {
635        // This step ensures the mysql metadata backend is ready to use.
636        // We check if greptime_metakv table exists, and we will create a new table
637        // if it does not exist.
638        let sql_template_set = MySqlTemplateFactory::new(table_name).build();
639        sqlx::query(&sql_template_set.create_table_statement)
640            .execute(&pool)
641            .await
642            .with_context(|_| MySqlExecutionSnafu {
643                sql: sql_template_set.create_table_statement.clone(),
644            })?;
645        Self::maybe_upgrade_value_column_to_mediumblob(&pool, &sql_template_set).await?;
646        Ok(Arc::new(MySqlStore {
647            max_txn_ops,
648            sql_template_set,
649            txn_retry_count: RDS_STORE_TXN_RETRY_COUNT,
650            executor_factory: MySqlExecutorFactory {
651                pool: Arc::new(pool),
652            },
653            _phantom: PhantomData,
654        }))
655    }
656}
657
658#[cfg(test)]
659mod tests {
660    use common_telemetry::init_default_ut_logging;
661    use sqlx::mysql::{MySqlConnectOptions, MySqlSslMode};
662    use uuid::Uuid;
663
664    use super::*;
665    use crate::kv_backend::test::{
666        prepare_kv_with_prefix, test_kv_batch_delete_with_prefix, test_kv_batch_get_with_prefix,
667        test_kv_compare_and_put_with_prefix, test_kv_delete_range_with_prefix,
668        test_kv_put_with_prefix, test_kv_range_2_with_prefix, test_kv_range_with_prefix,
669        test_simple_kv_range, test_txn_compare_equal, test_txn_compare_greater,
670        test_txn_compare_less, test_txn_compare_not_equal, test_txn_one_compare_op,
671        text_txn_multi_compare_op, unprepare_kv,
672    };
673    use crate::maybe_skip_mysql_integration_test;
674    use crate::rpc::store::{PutRequest, RangeRequest};
675    use crate::test_util::test_certs_dir;
676
677    fn new_test_table_name(prefix: &str) -> String {
678        let uuid = Uuid::new_v4().simple().to_string();
679        let max_prefix_len = 63usize.saturating_sub(uuid.len() + 1);
680        let prefix = &prefix[..prefix.len().min(max_prefix_len)];
681        format!("{prefix}_{uuid}")
682    }
683
684    async fn mysql_pool() -> Option<MySqlPool> {
685        init_default_ut_logging();
686        let endpoints = std::env::var("GT_MYSQL_ENDPOINTS").unwrap_or_default();
687        if endpoints.is_empty() {
688            return None;
689        }
690        Some(MySqlPool::connect(&endpoints).await.unwrap())
691    }
692
693    async fn show_create_table(pool: &MySqlPool, table_name: &str) -> String {
694        let sql = format!("SHOW CREATE TABLE `{table_name}`");
695        let row = sqlx::query(&sql).fetch_one(pool).await.unwrap();
696        row.get::<String, _>(1)
697    }
698
699    async fn create_legacy_blob_table(pool: &MySqlPool, table_name: &str) {
700        let sql = format!(
701            "CREATE TABLE IF NOT EXISTS `{table_name}`(k VARBINARY(3072) PRIMARY KEY, v BLOB);"
702        );
703        sqlx::query(&sql).execute(pool).await.unwrap();
704    }
705
706    async fn drop_table(pool: &MySqlPool, table_name: &str) {
707        let sql = format!("DROP TABLE IF EXISTS `{table_name}`;");
708        sqlx::query(&sql).execute(pool).await.unwrap();
709    }
710
711    async fn build_mysql_kv_backend(table_name: &str) -> Option<MySqlStore> {
712        let pool = mysql_pool().await?;
713        let sql_templates = MySqlTemplateFactory::new(table_name).build();
714        sqlx::query(&sql_templates.create_table_statement)
715            .execute(&pool)
716            .await
717            .unwrap();
718        Some(MySqlStore {
719            max_txn_ops: 128,
720            sql_template_set: sql_templates,
721            txn_retry_count: RDS_STORE_TXN_RETRY_COUNT,
722            executor_factory: MySqlExecutorFactory {
723                pool: Arc::new(pool),
724            },
725            _phantom: PhantomData,
726        })
727    }
728
729    #[test]
730    fn test_parse_value_column_blob_type() {
731        let sql = r#"CREATE TABLE `greptime_metakv` (
732  `k` varbinary(3072) NOT NULL,
733  `v` MEDIUMBLOB,
734  PRIMARY KEY (`k`)
735) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4"#;
736        assert_eq!(
737            Some(ValueBlobType::MediumBlob),
738            MySqlStore::parse_value_column_blob_type(sql)
739        );
740
741        let sql = r#"CREATE TABLE `greptime_metakv` (`k` varbinary(3072) NOT NULL, `v` blob, PRIMARY KEY (`k`))"#;
742        assert_eq!(
743            Some(ValueBlobType::Blob),
744            MySqlStore::parse_value_column_blob_type(sql)
745        );
746
747        let sql = r#"CREATE TABLE `greptime_metakv` (`k` varbinary(3072) NOT NULL, `v` longblob, PRIMARY KEY (`k`))"#;
748        assert_eq!(
749            Some(ValueBlobType::LongBlob),
750            MySqlStore::parse_value_column_blob_type(sql)
751        );
752
753        let sql = "CREATE TABLE `greptime_metakv` (`k` varbinary(3072) NOT NULL, `v`    BLOB NOT NULL, PRIMARY KEY (`k`))";
754        assert_eq!(
755            Some(ValueBlobType::Blob),
756            MySqlStore::parse_value_column_blob_type(sql)
757        );
758
759        let sql = "CREATE TABLE `greptime_metakv` (\n  `k` varbinary(3072) NOT NULL,\n  \"v\" MediumBlob,\n  PRIMARY KEY (`k`)\n)";
760        assert_eq!(
761            Some(ValueBlobType::MediumBlob),
762            MySqlStore::parse_value_column_blob_type(sql)
763        );
764
765        let sql = "CREATE TABLE `greptime_metakv` (`k` varbinary(3072) NOT NULL, vv blob, `v` longblob, PRIMARY KEY (`k`))";
766        assert_eq!(
767            Some(ValueBlobType::LongBlob),
768            MySqlStore::parse_value_column_blob_type(sql)
769        );
770
771        let sql = "CREATE TABLE `greptime_metakv` (`k` varbinary(3072) NOT NULL, value blob, PRIMARY KEY (`k`))";
772        assert_eq!(None, MySqlStore::parse_value_column_blob_type(sql));
773
774        let sql = "CREATE TABLE `greptime_metakv` (`k` varbinary(3072) NOT NULL, `v` varchar(255), PRIMARY KEY (`k`))";
775        assert_eq!(None, MySqlStore::parse_value_column_blob_type(sql));
776
777        let sql =
778            "CREATE TABLE `greptime_metakv` (`k` varbinary(3072) NOT NULL, PRIMARY KEY (`k`))";
779        assert_eq!(None, MySqlStore::parse_value_column_blob_type(sql));
780    }
781
782    #[tokio::test]
783    async fn test_mysql_new_metadata_table_uses_mediumblob() {
784        maybe_skip_mysql_integration_test!();
785        let pool = mysql_pool().await.unwrap();
786        let table_name = new_test_table_name("test_mysql_mediumblob_schema");
787
788        MySqlStore::with_mysql_pool(pool.clone(), &table_name, 128)
789            .await
790            .unwrap();
791
792        let create_table = show_create_table(&pool, &table_name).await;
793        assert!(create_table.to_ascii_uppercase().contains("MEDIUMBLOB"));
794
795        drop_table(&pool, &table_name).await;
796    }
797
798    #[tokio::test]
799    async fn test_mysql_legacy_blob_metadata_table_is_upgraded() {
800        maybe_skip_mysql_integration_test!();
801        let pool = mysql_pool().await.unwrap();
802        let table_name = new_test_table_name("test_mysql_legacy_blob_upgrade");
803
804        create_legacy_blob_table(&pool, &table_name).await;
805        MySqlStore::with_mysql_pool(pool.clone(), &table_name, 128)
806            .await
807            .unwrap();
808
809        let create_table = show_create_table(&pool, &table_name).await;
810        assert!(create_table.to_ascii_uppercase().contains("MEDIUMBLOB"));
811
812        drop_table(&pool, &table_name).await;
813    }
814
815    #[tokio::test]
816    async fn test_mysql_metadata_table_stores_large_values() {
817        maybe_skip_mysql_integration_test!();
818        let pool = mysql_pool().await.unwrap();
819        let table_name = new_test_table_name("test_mysql_large_metadata_value");
820        let kv_backend = MySqlStore::with_mysql_pool(pool.clone(), &table_name, 128)
821            .await
822            .unwrap();
823        let key = b"large-value".to_vec();
824        let value = vec![b'x'; 70 * 1024];
825
826        kv_backend
827            .put(
828                PutRequest::new()
829                    .with_key(key.clone())
830                    .with_value(value.clone()),
831            )
832            .await
833            .unwrap();
834        let response = kv_backend
835            .range(RangeRequest::new().with_key(key.clone()))
836            .await
837            .unwrap();
838
839        assert_eq!(1, response.kvs.len());
840        assert_eq!(key, response.kvs[0].key);
841        assert_eq!(value, response.kvs[0].value);
842
843        drop_table(&pool, &table_name).await;
844    }
845
846    #[tokio::test]
847    async fn test_mysql_upgraded_metadata_table_stores_large_values() {
848        maybe_skip_mysql_integration_test!();
849        let pool = mysql_pool().await.unwrap();
850        let table_name = new_test_table_name("test_mysql_upgraded_large_metadata_value");
851
852        create_legacy_blob_table(&pool, &table_name).await;
853        let kv_backend = MySqlStore::with_mysql_pool(pool.clone(), &table_name, 128)
854            .await
855            .unwrap();
856        let key = b"large-value".to_vec();
857        let value = vec![b'y'; 70 * 1024];
858
859        kv_backend
860            .put(
861                PutRequest::new()
862                    .with_key(key.clone())
863                    .with_value(value.clone()),
864            )
865            .await
866            .unwrap();
867        let response = kv_backend
868            .range(RangeRequest::new().with_key(key.clone()))
869            .await
870            .unwrap();
871
872        assert_eq!(1, response.kvs.len());
873        assert_eq!(key, response.kvs[0].key);
874        assert_eq!(value, response.kvs[0].value);
875
876        drop_table(&pool, &table_name).await;
877    }
878
879    #[tokio::test]
880    async fn test_mysql_put() {
881        maybe_skip_mysql_integration_test!();
882        let kv_backend = build_mysql_kv_backend("put-test").await.unwrap();
883        let prefix = b"put/";
884        prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
885        test_kv_put_with_prefix(&kv_backend, prefix.to_vec()).await;
886        unprepare_kv(&kv_backend, prefix).await;
887    }
888
889    #[tokio::test]
890    async fn test_mysql_range() {
891        maybe_skip_mysql_integration_test!();
892        let kv_backend = build_mysql_kv_backend("range-test").await.unwrap();
893        let prefix = b"range/";
894        prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
895        test_kv_range_with_prefix(&kv_backend, prefix.to_vec()).await;
896        unprepare_kv(&kv_backend, prefix).await;
897    }
898
899    #[tokio::test]
900    async fn test_mysql_range_2() {
901        maybe_skip_mysql_integration_test!();
902        let kv_backend = build_mysql_kv_backend("range2-test").await.unwrap();
903        let prefix = b"range2/";
904        test_kv_range_2_with_prefix(&kv_backend, prefix.to_vec()).await;
905        unprepare_kv(&kv_backend, prefix).await;
906    }
907
908    #[tokio::test]
909    async fn test_mysql_all_range() {
910        maybe_skip_mysql_integration_test!();
911        let kv_backend = build_mysql_kv_backend("simple_range-test").await.unwrap();
912        let prefix = b"";
913        prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
914        test_simple_kv_range(&kv_backend).await;
915        unprepare_kv(&kv_backend, prefix).await;
916    }
917
918    #[tokio::test]
919    async fn test_mysql_batch_get() {
920        maybe_skip_mysql_integration_test!();
921        let kv_backend = build_mysql_kv_backend("batch_get-test").await.unwrap();
922        let prefix = b"batch_get/";
923        prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
924        test_kv_batch_get_with_prefix(&kv_backend, prefix.to_vec()).await;
925        unprepare_kv(&kv_backend, prefix).await;
926    }
927
928    #[tokio::test]
929    async fn test_mysql_batch_delete() {
930        maybe_skip_mysql_integration_test!();
931        let kv_backend = build_mysql_kv_backend("batch_delete-test").await.unwrap();
932        let prefix = b"batch_delete/";
933        prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
934        test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await;
935        unprepare_kv(&kv_backend, prefix).await;
936    }
937
938    #[tokio::test]
939    async fn test_mysql_batch_delete_with_prefix() {
940        maybe_skip_mysql_integration_test!();
941        let kv_backend = build_mysql_kv_backend("batch_delete_with_prefix-test")
942            .await
943            .unwrap();
944        let prefix = b"batch_delete/";
945        prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
946        test_kv_batch_delete_with_prefix(&kv_backend, prefix.to_vec()).await;
947        unprepare_kv(&kv_backend, prefix).await;
948    }
949
950    #[tokio::test]
951    async fn test_mysql_delete_range() {
952        maybe_skip_mysql_integration_test!();
953        let kv_backend = build_mysql_kv_backend("delete_range-test").await.unwrap();
954        let prefix = b"delete_range/";
955        prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
956        test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await;
957        unprepare_kv(&kv_backend, prefix).await;
958    }
959
960    #[tokio::test]
961    async fn test_mysql_compare_and_put() {
962        maybe_skip_mysql_integration_test!();
963        let kv_backend = build_mysql_kv_backend("compare_and_put-test")
964            .await
965            .unwrap();
966        let prefix = b"compare_and_put/";
967        let kv_backend = Arc::new(kv_backend);
968        test_kv_compare_and_put_with_prefix(kv_backend.clone(), prefix.to_vec()).await;
969    }
970
971    #[tokio::test]
972    async fn test_mysql_txn() {
973        maybe_skip_mysql_integration_test!();
974        let kv_backend = build_mysql_kv_backend("txn-test").await.unwrap();
975        test_txn_one_compare_op(&kv_backend).await;
976        text_txn_multi_compare_op(&kv_backend).await;
977        test_txn_compare_equal(&kv_backend).await;
978        test_txn_compare_greater(&kv_backend).await;
979        test_txn_compare_less(&kv_backend).await;
980        test_txn_compare_not_equal(&kv_backend).await;
981    }
982
983    #[tokio::test]
984    async fn test_mysql_with_tls() {
985        common_telemetry::init_default_ut_logging();
986        maybe_skip_mysql_integration_test!();
987        let endpoint = std::env::var("GT_MYSQL_ENDPOINTS").unwrap();
988
989        let opts = endpoint
990            .parse::<MySqlConnectOptions>()
991            .unwrap()
992            .ssl_mode(MySqlSslMode::Required);
993        let pool = MySqlPool::connect_with(opts).await.unwrap();
994        sqlx::query("SELECT 1").execute(&pool).await.unwrap();
995    }
996
997    #[tokio::test]
998    async fn test_mysql_with_mtls() {
999        common_telemetry::init_default_ut_logging();
1000        maybe_skip_mysql_integration_test!();
1001        let endpoint = std::env::var("GT_MYSQL_ENDPOINTS").unwrap();
1002        let certs_dir = test_certs_dir();
1003
1004        let opts = endpoint
1005            .parse::<MySqlConnectOptions>()
1006            .unwrap()
1007            .ssl_mode(MySqlSslMode::Required)
1008            .ssl_client_cert(certs_dir.join("client.crt").to_string_lossy().to_string())
1009            .ssl_client_key(certs_dir.join("client.key").to_string_lossy().to_string());
1010        let pool = MySqlPool::connect_with(opts).await.unwrap();
1011        sqlx::query("SELECT 1").execute(&pool).await.unwrap();
1012    }
1013
1014    #[tokio::test]
1015    async fn test_mysql_with_tls_verify_ca() {
1016        common_telemetry::init_default_ut_logging();
1017        maybe_skip_mysql_integration_test!();
1018        let endpoint = std::env::var("GT_MYSQL_ENDPOINTS").unwrap();
1019        let certs_dir = test_certs_dir();
1020
1021        let opts = endpoint
1022            .parse::<MySqlConnectOptions>()
1023            .unwrap()
1024            .ssl_mode(MySqlSslMode::VerifyCa)
1025            .ssl_ca(certs_dir.join("root.crt").to_string_lossy().to_string())
1026            .ssl_client_cert(certs_dir.join("client.crt").to_string_lossy().to_string())
1027            .ssl_client_key(certs_dir.join("client.key").to_string_lossy().to_string());
1028        let pool = MySqlPool::connect_with(opts).await.unwrap();
1029        sqlx::query("SELECT 1").execute(&pool).await.unwrap();
1030    }
1031
1032    #[tokio::test]
1033    async fn test_mysql_with_tls_verify_ident() {
1034        common_telemetry::init_default_ut_logging();
1035        maybe_skip_mysql_integration_test!();
1036        let endpoint = std::env::var("GT_MYSQL_ENDPOINTS").unwrap();
1037        let certs_dir = test_certs_dir();
1038
1039        let opts = endpoint
1040            .parse::<MySqlConnectOptions>()
1041            .unwrap()
1042            .ssl_mode(MySqlSslMode::VerifyIdentity)
1043            .ssl_ca(certs_dir.join("root.crt").to_string_lossy().to_string())
1044            .ssl_client_cert(certs_dir.join("client.crt").to_string_lossy().to_string())
1045            .ssl_client_key(certs_dir.join("client.key").to_string_lossy().to_string());
1046        let pool = MySqlPool::connect_with(opts).await.unwrap();
1047        sqlx::query("SELECT 1").execute(&pool).await.unwrap();
1048    }
1049}