1use std::marker::PhantomData;
16use std::sync::Arc;
17
18use common_telemetry::{debug, info, warn};
19use lazy_static::lazy_static;
20use regex::Regex;
21use snafu::{OptionExt, ResultExt};
22use sqlx::mysql::MySqlRow;
23use sqlx::pool::Pool;
24use sqlx::{MySql, MySqlPool, Row, Transaction as MySqlTransaction};
25use strum::AsRefStr;
26
27use crate::error::{
28 CreateMySqlPoolSnafu, MySqlExecutionSnafu, MySqlTransactionSnafu, Result, UnexpectedSnafu,
29};
30use crate::kv_backend::KvBackendRef;
31use crate::kv_backend::rds::{
32 Executor, ExecutorFactory, ExecutorImpl, KvQueryExecutor, RDS_STORE_OP_BATCH_DELETE,
33 RDS_STORE_OP_BATCH_GET, RDS_STORE_OP_BATCH_PUT, RDS_STORE_OP_RANGE_DELETE,
34 RDS_STORE_OP_RANGE_QUERY, RDS_STORE_TXN_RETRY_COUNT, RdsStore, Transaction,
35};
36use crate::rpc::KeyValue;
37use crate::rpc::store::{
38 BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
39 BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, RangeRequest, RangeResponse,
40};
41
42const MYSQL_STORE_NAME: &str = "mysql_store";
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45enum ValueBlobType {
46 Blob,
47 MediumBlob,
48 LongBlob,
49}
50
51lazy_static! {
52 static ref VALUE_COLUMN_BLOB_TYPE_RE: Regex =
53 Regex::new(r#"(?i)(?:\(|,)\s*[`"]?v[`"]?\s+(longblob|mediumblob|blob)\b"#).unwrap();
54}
55
56type MySqlClient = Arc<Pool<MySql>>;
57pub struct MySqlTxnClient(MySqlTransaction<'static, MySql>);
58
59fn key_value_from_row(row: MySqlRow) -> KeyValue {
60 KeyValue {
62 key: row.get_unchecked(0),
63 value: row.get_unchecked(1),
64 }
65}
66
67const EMPTY: &[u8] = &[0];
68
69#[derive(Debug, Clone, Copy, AsRefStr)]
71enum RangeTemplateType {
72 Point,
73 Range,
74 Full,
75 LeftBounded,
76 Prefix,
77}
78
79impl RangeTemplateType {
81 fn build_params(&self, mut key: Vec<u8>, range_end: Vec<u8>) -> Vec<Vec<u8>> {
84 match self {
85 RangeTemplateType::Point => vec![key],
86 RangeTemplateType::Range => vec![key, range_end],
87 RangeTemplateType::Full => vec![],
88 RangeTemplateType::LeftBounded => vec![key],
89 RangeTemplateType::Prefix => {
90 key.push(b'%');
91 vec![key]
92 }
93 }
94 }
95}
96
97#[derive(Debug, Clone)]
99struct RangeTemplate {
100 point: String,
101 range: String,
102 full: String,
103 left_bounded: String,
104 prefix: String,
105}
106
107impl RangeTemplate {
108 fn get(&self, typ: RangeTemplateType) -> &str {
110 match typ {
111 RangeTemplateType::Point => &self.point,
112 RangeTemplateType::Range => &self.range,
113 RangeTemplateType::Full => &self.full,
114 RangeTemplateType::LeftBounded => &self.left_bounded,
115 RangeTemplateType::Prefix => &self.prefix,
116 }
117 }
118
119 fn with_limit(template: &str, limit: i64) -> String {
121 if limit == 0 {
122 return format!("{};", template);
123 }
124 format!("{} LIMIT {};", template, limit)
125 }
126}
127
128fn is_prefix_range(start: &[u8], end: &[u8]) -> bool {
129 if start.len() != end.len() {
130 return false;
131 }
132 let l = start.len();
133 let same_prefix = start[0..l - 1] == end[0..l - 1];
134 if let (Some(rhs), Some(lhs)) = (start.last(), end.last()) {
135 return same_prefix && (*rhs + 1) == *lhs;
136 }
137 false
138}
139
140fn range_template(key: &[u8], range_end: &[u8]) -> RangeTemplateType {
142 match (key, range_end) {
143 (_, &[]) => RangeTemplateType::Point,
144 (EMPTY, EMPTY) => RangeTemplateType::Full,
145 (_, EMPTY) => RangeTemplateType::LeftBounded,
146 (start, end) => {
147 if is_prefix_range(start, end) {
148 RangeTemplateType::Prefix
149 } else {
150 RangeTemplateType::Range
151 }
152 }
153 }
154}
155
156fn mysql_generate_in_placeholders(from: usize, to: usize) -> Vec<String> {
158 (from..=to).map(|_| "?".to_string()).collect()
159}
160
161struct MySqlTemplateFactory<'a> {
163 table_name: &'a str,
164}
165
166impl<'a> MySqlTemplateFactory<'a> {
167 fn new(table_name: &'a str) -> Self {
169 Self { table_name }
170 }
171
172 fn build(&self) -> MySqlTemplateSet {
174 let table_name = self.table_name;
175 MySqlTemplateSet {
177 table_name: table_name.to_string(),
178 create_table_statement: format!(
179 "CREATE TABLE IF NOT EXISTS `{table_name}`(k VARBINARY(3072) PRIMARY KEY, v MEDIUMBLOB);",
181 ),
182 show_create_table_statement: format!("SHOW CREATE TABLE `{table_name}`"),
183 alter_value_column_statement: format!(
184 "ALTER TABLE `{table_name}` MODIFY COLUMN v MEDIUMBLOB;"
185 ),
186 range_template: RangeTemplate {
187 point: format!("SELECT k, v FROM `{table_name}` WHERE k = ?"),
188 range: format!("SELECT k, v FROM `{table_name}` WHERE k >= ? AND k < ? ORDER BY k"),
189 full: format!("SELECT k, v FROM `{table_name}` ORDER BY k"),
190 left_bounded: format!("SELECT k, v FROM `{table_name}` WHERE k >= ? ORDER BY k"),
191 prefix: format!("SELECT k, v FROM `{table_name}` WHERE k LIKE ? ORDER BY k"),
192 },
193 delete_template: RangeTemplate {
194 point: format!("DELETE FROM `{table_name}` WHERE k = ?;"),
195 range: format!("DELETE FROM `{table_name}` WHERE k >= ? AND k < ?;"),
196 full: format!("DELETE FROM `{table_name}`"),
197 left_bounded: format!("DELETE FROM `{table_name}` WHERE k >= ?;"),
198 prefix: format!("DELETE FROM `{table_name}` WHERE k LIKE ?;"),
199 },
200 }
201 }
202}
203
204#[derive(Debug, Clone)]
206pub struct MySqlTemplateSet {
207 table_name: String,
208 create_table_statement: String,
209 show_create_table_statement: String,
210 alter_value_column_statement: String,
211 range_template: RangeTemplate,
212 delete_template: RangeTemplate,
213}
214
215impl MySqlTemplateSet {
216 fn generate_batch_get_query(&self, key_len: usize) -> String {
218 let table_name = &self.table_name;
219 let in_clause = mysql_generate_in_placeholders(1, key_len).join(", ");
220 format!(
221 "SELECT k, v FROM `{table_name}` WHERE k in ({});",
222 in_clause
223 )
224 }
225
226 fn generate_batch_delete_query(&self, key_len: usize) -> String {
228 let table_name = &self.table_name;
229 let in_clause = mysql_generate_in_placeholders(1, key_len).join(", ");
230 format!("DELETE FROM `{table_name}` WHERE k in ({});", in_clause)
231 }
232
233 fn generate_batch_upsert_query(&self, kv_len: usize) -> (String, String) {
236 let table_name = &self.table_name;
237 let in_placeholders: Vec<String> = (1..=kv_len).map(|_| "?".to_string()).collect();
238 let in_clause = in_placeholders.join(", ");
239 let mut values_placeholders = Vec::new();
240 for _ in 0..kv_len {
241 values_placeholders.push("(?, ?)".to_string());
242 }
243 let values_clause = values_placeholders.join(", ");
244
245 (
246 format!(r#"SELECT k, v FROM `{table_name}` WHERE k IN ({in_clause})"#,),
247 format!(
248 r#"INSERT INTO `{table_name}` (k, v) VALUES {values_clause} ON DUPLICATE KEY UPDATE v = VALUES(v);"#,
249 ),
250 )
251 }
252}
253
254#[async_trait::async_trait]
255impl Executor for MySqlClient {
256 type Transaction<'a>
257 = MySqlTxnClient
258 where
259 Self: 'a;
260
261 fn name() -> &'static str {
262 "MySql"
263 }
264
265 async fn query(&mut self, raw_query: &str, params: &[&Vec<u8>]) -> Result<Vec<KeyValue>> {
266 let query = sqlx::query(raw_query);
267 let query = params.iter().fold(query, |query, param| query.bind(param));
268 let rows = query
269 .fetch_all(&**self)
270 .await
271 .context(MySqlExecutionSnafu { sql: raw_query })?;
272 Ok(rows.into_iter().map(key_value_from_row).collect())
273 }
274
275 async fn execute(&mut self, raw_query: &str, params: &[&Vec<u8>]) -> Result<()> {
276 let query = sqlx::query(raw_query);
277 let query = params.iter().fold(query, |query, param| query.bind(param));
278 query
279 .execute(&**self)
280 .await
281 .context(MySqlExecutionSnafu { sql: raw_query })?;
282 Ok(())
283 }
284
285 async fn txn_executor<'a>(&'a mut self) -> Result<Self::Transaction<'a>> {
286 sqlx::query("SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE")
289 .execute(&**self)
290 .await
291 .context(MySqlExecutionSnafu {
292 sql: "SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE",
293 })?;
294 let txn = self
295 .begin()
296 .await
297 .context(MySqlExecutionSnafu { sql: "begin" })?;
298 Ok(MySqlTxnClient(txn))
299 }
300}
301
302#[async_trait::async_trait]
303impl Transaction<'_> for MySqlTxnClient {
304 async fn query(&mut self, raw_query: &str, params: &[&Vec<u8>]) -> Result<Vec<KeyValue>> {
305 let query = sqlx::query(raw_query);
306 let query = params.iter().fold(query, |query, param| query.bind(param));
307 let rows = query
309 .fetch_all(&mut *(self.0))
310 .await
311 .context(MySqlExecutionSnafu { sql: raw_query })?;
312 Ok(rows.into_iter().map(key_value_from_row).collect())
313 }
314
315 async fn execute(&mut self, raw_query: &str, params: &[&Vec<u8>]) -> Result<()> {
316 let query = sqlx::query(raw_query);
317 let query = params.iter().fold(query, |query, param| query.bind(param));
318 query
320 .execute(&mut *(self.0))
321 .await
322 .context(MySqlExecutionSnafu { sql: raw_query })?;
323 Ok(())
324 }
325
326 async fn commit(self) -> Result<()> {
329 self.0.commit().await.context(MySqlTransactionSnafu {
330 operation: "commit",
331 })?;
332 Ok(())
333 }
334}
335
336pub struct MySqlExecutorFactory {
337 pool: Arc<Pool<MySql>>,
338}
339
340#[async_trait::async_trait]
341impl ExecutorFactory<MySqlClient> for MySqlExecutorFactory {
342 async fn default_executor(&self) -> Result<MySqlClient> {
343 Ok(self.pool.clone())
344 }
345
346 async fn txn_executor<'a>(
347 &self,
348 default_executor: &'a mut MySqlClient,
349 ) -> Result<MySqlTxnClient> {
350 default_executor.txn_executor().await
351 }
352}
353
354pub type MySqlStore = RdsStore<MySqlClient, MySqlExecutorFactory, MySqlTemplateSet>;
357
358#[async_trait::async_trait]
359impl KvQueryExecutor<MySqlClient> for MySqlStore {
360 async fn range_with_query_executor(
361 &self,
362 query_executor: &mut ExecutorImpl<'_, MySqlClient>,
363 req: RangeRequest,
364 ) -> Result<RangeResponse> {
365 let template_type = range_template(&req.key, &req.range_end);
366 let template = self.sql_template_set.range_template.get(template_type);
367 let params = template_type.build_params(req.key, req.range_end);
368 let params_ref = params.iter().collect::<Vec<_>>();
369 let query =
371 RangeTemplate::with_limit(template, if req.limit == 0 { 0 } else { req.limit + 1 });
372 let limit = req.limit as usize;
373 debug!("query: {:?}, params: {:?}", query, params);
374 let mut kvs = crate::record_rds_sql_execute_elapsed!(
375 query_executor.query(&query, ¶ms_ref).await,
376 MYSQL_STORE_NAME,
377 RDS_STORE_OP_RANGE_QUERY,
378 template_type.as_ref()
379 )?;
380 if req.keys_only {
381 kvs.iter_mut().for_each(|kv| kv.value = vec![]);
382 }
383 if limit == 0 || kvs.len() <= limit {
385 return Ok(RangeResponse { kvs, more: false });
386 }
387 let removed = kvs.pop();
389 debug_assert!(removed.is_some());
390 Ok(RangeResponse { kvs, more: true })
391 }
392
393 async fn batch_put_with_query_executor(
394 &self,
395 query_executor: &mut ExecutorImpl<'_, MySqlClient>,
396 req: BatchPutRequest,
397 ) -> Result<BatchPutResponse> {
398 let mut in_params = Vec::with_capacity(req.kvs.len() * 3);
399 let mut values_params = Vec::with_capacity(req.kvs.len() * 2);
400
401 for kv in &req.kvs {
402 let processed_key = &kv.key;
403 in_params.push(processed_key);
404
405 let processed_value = &kv.value;
406 values_params.push(processed_key);
407 values_params.push(processed_value);
408 }
409 let in_params = in_params.iter().map(|x| x as _).collect::<Vec<_>>();
410 let values_params = values_params.iter().map(|x| x as _).collect::<Vec<_>>();
411 let (select, update) = self
412 .sql_template_set
413 .generate_batch_upsert_query(req.kvs.len());
414
415 if !req.prev_kv {
417 crate::record_rds_sql_execute_elapsed!(
418 query_executor.execute(&update, &values_params).await,
419 MYSQL_STORE_NAME,
420 RDS_STORE_OP_BATCH_PUT,
421 ""
422 )?;
423 return Ok(BatchPutResponse::default());
424 }
425 if let ExecutorImpl::Default(query_executor) = query_executor {
427 let txn = query_executor.txn_executor().await?;
428 let mut txn = ExecutorImpl::Txn(txn);
429 let res = self.batch_put_with_query_executor(&mut txn, req).await;
430 txn.commit().await?;
431 return res;
432 }
433 let prev_kvs = crate::record_rds_sql_execute_elapsed!(
434 query_executor.query(&select, &in_params).await,
435 MYSQL_STORE_NAME,
436 RDS_STORE_OP_BATCH_PUT,
437 ""
438 )?;
439 query_executor.execute(&update, &values_params).await?;
440 Ok(BatchPutResponse { prev_kvs })
441 }
442
443 async fn batch_get_with_query_executor(
444 &self,
445 query_executor: &mut ExecutorImpl<'_, MySqlClient>,
446 req: BatchGetRequest,
447 ) -> Result<BatchGetResponse> {
448 if req.keys.is_empty() {
449 return Ok(BatchGetResponse { kvs: vec![] });
450 }
451 let query = self
452 .sql_template_set
453 .generate_batch_get_query(req.keys.len());
454 let params = req.keys.iter().map(|x| x as _).collect::<Vec<_>>();
455 let kvs = crate::record_rds_sql_execute_elapsed!(
456 query_executor.query(&query, ¶ms).await,
457 MYSQL_STORE_NAME,
458 RDS_STORE_OP_BATCH_GET,
459 ""
460 )?;
461 Ok(BatchGetResponse { kvs })
462 }
463
464 async fn delete_range_with_query_executor(
465 &self,
466 query_executor: &mut ExecutorImpl<'_, MySqlClient>,
467 req: DeleteRangeRequest,
468 ) -> Result<DeleteRangeResponse> {
469 if let ExecutorImpl::Default(query_executor) = query_executor {
472 let txn = query_executor.txn_executor().await?;
473 let mut txn = ExecutorImpl::Txn(txn);
474 let res = self.delete_range_with_query_executor(&mut txn, req).await;
475 txn.commit().await?;
476 return res;
477 }
478 let range_get_req = RangeRequest {
479 key: req.key.clone(),
480 range_end: req.range_end.clone(),
481 limit: 0,
482 keys_only: false,
483 };
484 let prev_kvs = self
485 .range_with_query_executor(query_executor, range_get_req)
486 .await?
487 .kvs;
488 let template_type = range_template(&req.key, &req.range_end);
489 let template = self.sql_template_set.delete_template.get(template_type);
490 let params = template_type.build_params(req.key, req.range_end);
491 let params_ref = params.iter().map(|x| x as _).collect::<Vec<_>>();
492 crate::record_rds_sql_execute_elapsed!(
493 query_executor.execute(template, ¶ms_ref).await,
494 MYSQL_STORE_NAME,
495 RDS_STORE_OP_RANGE_DELETE,
496 template_type.as_ref()
497 )?;
498 let mut resp = DeleteRangeResponse::new(prev_kvs.len() as i64);
499 if req.prev_kv {
500 resp.with_prev_kvs(prev_kvs);
501 }
502 Ok(resp)
503 }
504
505 async fn batch_delete_with_query_executor(
506 &self,
507 query_executor: &mut ExecutorImpl<'_, MySqlClient>,
508 req: BatchDeleteRequest,
509 ) -> Result<BatchDeleteResponse> {
510 if req.keys.is_empty() {
511 return Ok(BatchDeleteResponse::default());
512 }
513 let query = self
514 .sql_template_set
515 .generate_batch_delete_query(req.keys.len());
516 let params = req.keys.iter().map(|x| x as _).collect::<Vec<_>>();
517 if !req.prev_kv {
519 crate::record_rds_sql_execute_elapsed!(
520 query_executor.execute(&query, ¶ms).await,
521 MYSQL_STORE_NAME,
522 RDS_STORE_OP_BATCH_DELETE,
523 ""
524 )?;
525 return Ok(BatchDeleteResponse::default());
526 }
527 if let ExecutorImpl::Default(query_executor) = query_executor {
529 let txn = query_executor.txn_executor().await?;
530 let mut txn = ExecutorImpl::Txn(txn);
531 let res = self.batch_delete_with_query_executor(&mut txn, req).await;
532 txn.commit().await?;
533 return res;
534 }
535 let batch_get_req = BatchGetRequest {
537 keys: req.keys.clone(),
538 };
539 let prev_kvs = self
540 .batch_get_with_query_executor(query_executor, batch_get_req)
541 .await?
542 .kvs;
543 crate::record_rds_sql_execute_elapsed!(
545 query_executor.execute(&query, ¶ms).await,
546 MYSQL_STORE_NAME,
547 RDS_STORE_OP_BATCH_DELETE,
548 ""
549 )?;
550 if req.prev_kv {
551 Ok(BatchDeleteResponse { prev_kvs })
552 } else {
553 Ok(BatchDeleteResponse::default())
554 }
555 }
556}
557
558impl MySqlStore {
559 async fn fetch_create_table_sql(
561 pool: &Pool<MySql>,
562 sql_template_set: &MySqlTemplateSet,
563 ) -> Result<Option<String>> {
564 let row = sqlx::query(&sql_template_set.show_create_table_statement)
565 .fetch_optional(pool)
566 .await
567 .with_context(|_| MySqlExecutionSnafu {
568 sql: sql_template_set.show_create_table_statement.clone(),
569 })?;
570 Ok(row.map(|row| row.get(1)))
571 }
572
573 fn parse_value_column_blob_type(create_table_sql: &str) -> Option<ValueBlobType> {
575 let captures = VALUE_COLUMN_BLOB_TYPE_RE.captures(create_table_sql)?;
578 match captures.get(1)?.as_str().to_ascii_lowercase().as_str() {
579 "blob" => Some(ValueBlobType::Blob),
580 "mediumblob" => Some(ValueBlobType::MediumBlob),
581 "longblob" => Some(ValueBlobType::LongBlob),
582 _ => None,
583 }
584 }
585
586 async fn maybe_upgrade_value_column_to_mediumblob(
588 pool: &Pool<MySql>,
589 sql_template_set: &MySqlTemplateSet,
590 ) -> Result<()> {
591 let table_name = &sql_template_set.table_name;
592 let create_table_sql = Self::fetch_create_table_sql(pool, sql_template_set)
593 .await?
594 .context(UnexpectedSnafu {
595 err_msg: format!("Failed to fetch CREATE TABLE SQL for `{table_name}`"),
596 })?;
597
598 match Self::parse_value_column_blob_type(&create_table_sql) {
599 Some(ValueBlobType::Blob) => {
600 sqlx::query(&sql_template_set.alter_value_column_statement)
601 .execute(pool)
602 .await
603 .with_context(|_| MySqlExecutionSnafu {
604 sql: sql_template_set.alter_value_column_statement.clone(),
605 })?;
606 info!("Upgraded MySQL metadata value column to MEDIUMBLOB for `{table_name}`");
607 }
608 Some(ValueBlobType::MediumBlob | ValueBlobType::LongBlob) => {
609 debug!("MySQL metadata value column for `{table_name}` is already compatible");
610 }
611 None => {
612 warn!(
613 "Failed to determine MySQL metadata value column type from table definition for `{table_name}`, skip automatic MEDIUMBLOB upgrade"
614 );
615 }
616 }
617
618 Ok(())
619 }
620
621 pub async fn with_url(url: &str, table_name: &str, max_txn_ops: usize) -> Result<KvBackendRef> {
623 let pool = MySqlPool::connect(url)
624 .await
625 .context(CreateMySqlPoolSnafu)?;
626 Self::with_mysql_pool(pool, table_name, max_txn_ops).await
627 }
628
629 pub async fn with_mysql_pool(
631 pool: Pool<MySql>,
632 table_name: &str,
633 max_txn_ops: usize,
634 ) -> Result<KvBackendRef> {
635 let sql_template_set = MySqlTemplateFactory::new(table_name).build();
639 sqlx::query(&sql_template_set.create_table_statement)
640 .execute(&pool)
641 .await
642 .with_context(|_| MySqlExecutionSnafu {
643 sql: sql_template_set.create_table_statement.clone(),
644 })?;
645 Self::maybe_upgrade_value_column_to_mediumblob(&pool, &sql_template_set).await?;
646 Ok(Arc::new(MySqlStore {
647 max_txn_ops,
648 sql_template_set,
649 txn_retry_count: RDS_STORE_TXN_RETRY_COUNT,
650 executor_factory: MySqlExecutorFactory {
651 pool: Arc::new(pool),
652 },
653 _phantom: PhantomData,
654 }))
655 }
656}
657
658#[cfg(test)]
659mod tests {
660 use common_telemetry::init_default_ut_logging;
661 use sqlx::mysql::{MySqlConnectOptions, MySqlSslMode};
662 use uuid::Uuid;
663
664 use super::*;
665 use crate::kv_backend::test::{
666 prepare_kv_with_prefix, test_kv_batch_delete_with_prefix, test_kv_batch_get_with_prefix,
667 test_kv_compare_and_put_with_prefix, test_kv_delete_range_with_prefix,
668 test_kv_put_with_prefix, test_kv_range_2_with_prefix, test_kv_range_with_prefix,
669 test_simple_kv_range, test_txn_compare_equal, test_txn_compare_greater,
670 test_txn_compare_less, test_txn_compare_not_equal, test_txn_one_compare_op,
671 text_txn_multi_compare_op, unprepare_kv,
672 };
673 use crate::maybe_skip_mysql_integration_test;
674 use crate::rpc::store::{PutRequest, RangeRequest};
675 use crate::test_util::test_certs_dir;
676
677 fn new_test_table_name(prefix: &str) -> String {
678 let uuid = Uuid::new_v4().simple().to_string();
679 let max_prefix_len = 63usize.saturating_sub(uuid.len() + 1);
680 let prefix = &prefix[..prefix.len().min(max_prefix_len)];
681 format!("{prefix}_{uuid}")
682 }
683
684 async fn mysql_pool() -> Option<MySqlPool> {
685 init_default_ut_logging();
686 let endpoints = std::env::var("GT_MYSQL_ENDPOINTS").unwrap_or_default();
687 if endpoints.is_empty() {
688 return None;
689 }
690 Some(MySqlPool::connect(&endpoints).await.unwrap())
691 }
692
693 async fn show_create_table(pool: &MySqlPool, table_name: &str) -> String {
694 let sql = format!("SHOW CREATE TABLE `{table_name}`");
695 let row = sqlx::query(&sql).fetch_one(pool).await.unwrap();
696 row.get::<String, _>(1)
697 }
698
699 async fn create_legacy_blob_table(pool: &MySqlPool, table_name: &str) {
700 let sql = format!(
701 "CREATE TABLE IF NOT EXISTS `{table_name}`(k VARBINARY(3072) PRIMARY KEY, v BLOB);"
702 );
703 sqlx::query(&sql).execute(pool).await.unwrap();
704 }
705
706 async fn drop_table(pool: &MySqlPool, table_name: &str) {
707 let sql = format!("DROP TABLE IF EXISTS `{table_name}`;");
708 sqlx::query(&sql).execute(pool).await.unwrap();
709 }
710
711 async fn build_mysql_kv_backend(table_name: &str) -> Option<MySqlStore> {
712 let pool = mysql_pool().await?;
713 let sql_templates = MySqlTemplateFactory::new(table_name).build();
714 sqlx::query(&sql_templates.create_table_statement)
715 .execute(&pool)
716 .await
717 .unwrap();
718 Some(MySqlStore {
719 max_txn_ops: 128,
720 sql_template_set: sql_templates,
721 txn_retry_count: RDS_STORE_TXN_RETRY_COUNT,
722 executor_factory: MySqlExecutorFactory {
723 pool: Arc::new(pool),
724 },
725 _phantom: PhantomData,
726 })
727 }
728
729 #[test]
730 fn test_parse_value_column_blob_type() {
731 let sql = r#"CREATE TABLE `greptime_metakv` (
732 `k` varbinary(3072) NOT NULL,
733 `v` MEDIUMBLOB,
734 PRIMARY KEY (`k`)
735) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4"#;
736 assert_eq!(
737 Some(ValueBlobType::MediumBlob),
738 MySqlStore::parse_value_column_blob_type(sql)
739 );
740
741 let sql = r#"CREATE TABLE `greptime_metakv` (`k` varbinary(3072) NOT NULL, `v` blob, PRIMARY KEY (`k`))"#;
742 assert_eq!(
743 Some(ValueBlobType::Blob),
744 MySqlStore::parse_value_column_blob_type(sql)
745 );
746
747 let sql = r#"CREATE TABLE `greptime_metakv` (`k` varbinary(3072) NOT NULL, `v` longblob, PRIMARY KEY (`k`))"#;
748 assert_eq!(
749 Some(ValueBlobType::LongBlob),
750 MySqlStore::parse_value_column_blob_type(sql)
751 );
752
753 let sql = "CREATE TABLE `greptime_metakv` (`k` varbinary(3072) NOT NULL, `v` BLOB NOT NULL, PRIMARY KEY (`k`))";
754 assert_eq!(
755 Some(ValueBlobType::Blob),
756 MySqlStore::parse_value_column_blob_type(sql)
757 );
758
759 let sql = "CREATE TABLE `greptime_metakv` (\n `k` varbinary(3072) NOT NULL,\n \"v\" MediumBlob,\n PRIMARY KEY (`k`)\n)";
760 assert_eq!(
761 Some(ValueBlobType::MediumBlob),
762 MySqlStore::parse_value_column_blob_type(sql)
763 );
764
765 let sql = "CREATE TABLE `greptime_metakv` (`k` varbinary(3072) NOT NULL, vv blob, `v` longblob, PRIMARY KEY (`k`))";
766 assert_eq!(
767 Some(ValueBlobType::LongBlob),
768 MySqlStore::parse_value_column_blob_type(sql)
769 );
770
771 let sql = "CREATE TABLE `greptime_metakv` (`k` varbinary(3072) NOT NULL, value blob, PRIMARY KEY (`k`))";
772 assert_eq!(None, MySqlStore::parse_value_column_blob_type(sql));
773
774 let sql = "CREATE TABLE `greptime_metakv` (`k` varbinary(3072) NOT NULL, `v` varchar(255), PRIMARY KEY (`k`))";
775 assert_eq!(None, MySqlStore::parse_value_column_blob_type(sql));
776
777 let sql =
778 "CREATE TABLE `greptime_metakv` (`k` varbinary(3072) NOT NULL, PRIMARY KEY (`k`))";
779 assert_eq!(None, MySqlStore::parse_value_column_blob_type(sql));
780 }
781
782 #[tokio::test]
783 async fn test_mysql_new_metadata_table_uses_mediumblob() {
784 maybe_skip_mysql_integration_test!();
785 let pool = mysql_pool().await.unwrap();
786 let table_name = new_test_table_name("test_mysql_mediumblob_schema");
787
788 MySqlStore::with_mysql_pool(pool.clone(), &table_name, 128)
789 .await
790 .unwrap();
791
792 let create_table = show_create_table(&pool, &table_name).await;
793 assert!(create_table.to_ascii_uppercase().contains("MEDIUMBLOB"));
794
795 drop_table(&pool, &table_name).await;
796 }
797
798 #[tokio::test]
799 async fn test_mysql_legacy_blob_metadata_table_is_upgraded() {
800 maybe_skip_mysql_integration_test!();
801 let pool = mysql_pool().await.unwrap();
802 let table_name = new_test_table_name("test_mysql_legacy_blob_upgrade");
803
804 create_legacy_blob_table(&pool, &table_name).await;
805 MySqlStore::with_mysql_pool(pool.clone(), &table_name, 128)
806 .await
807 .unwrap();
808
809 let create_table = show_create_table(&pool, &table_name).await;
810 assert!(create_table.to_ascii_uppercase().contains("MEDIUMBLOB"));
811
812 drop_table(&pool, &table_name).await;
813 }
814
815 #[tokio::test]
816 async fn test_mysql_metadata_table_stores_large_values() {
817 maybe_skip_mysql_integration_test!();
818 let pool = mysql_pool().await.unwrap();
819 let table_name = new_test_table_name("test_mysql_large_metadata_value");
820 let kv_backend = MySqlStore::with_mysql_pool(pool.clone(), &table_name, 128)
821 .await
822 .unwrap();
823 let key = b"large-value".to_vec();
824 let value = vec![b'x'; 70 * 1024];
825
826 kv_backend
827 .put(
828 PutRequest::new()
829 .with_key(key.clone())
830 .with_value(value.clone()),
831 )
832 .await
833 .unwrap();
834 let response = kv_backend
835 .range(RangeRequest::new().with_key(key.clone()))
836 .await
837 .unwrap();
838
839 assert_eq!(1, response.kvs.len());
840 assert_eq!(key, response.kvs[0].key);
841 assert_eq!(value, response.kvs[0].value);
842
843 drop_table(&pool, &table_name).await;
844 }
845
846 #[tokio::test]
847 async fn test_mysql_upgraded_metadata_table_stores_large_values() {
848 maybe_skip_mysql_integration_test!();
849 let pool = mysql_pool().await.unwrap();
850 let table_name = new_test_table_name("test_mysql_upgraded_large_metadata_value");
851
852 create_legacy_blob_table(&pool, &table_name).await;
853 let kv_backend = MySqlStore::with_mysql_pool(pool.clone(), &table_name, 128)
854 .await
855 .unwrap();
856 let key = b"large-value".to_vec();
857 let value = vec![b'y'; 70 * 1024];
858
859 kv_backend
860 .put(
861 PutRequest::new()
862 .with_key(key.clone())
863 .with_value(value.clone()),
864 )
865 .await
866 .unwrap();
867 let response = kv_backend
868 .range(RangeRequest::new().with_key(key.clone()))
869 .await
870 .unwrap();
871
872 assert_eq!(1, response.kvs.len());
873 assert_eq!(key, response.kvs[0].key);
874 assert_eq!(value, response.kvs[0].value);
875
876 drop_table(&pool, &table_name).await;
877 }
878
879 #[tokio::test]
880 async fn test_mysql_put() {
881 maybe_skip_mysql_integration_test!();
882 let kv_backend = build_mysql_kv_backend("put-test").await.unwrap();
883 let prefix = b"put/";
884 prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
885 test_kv_put_with_prefix(&kv_backend, prefix.to_vec()).await;
886 unprepare_kv(&kv_backend, prefix).await;
887 }
888
889 #[tokio::test]
890 async fn test_mysql_range() {
891 maybe_skip_mysql_integration_test!();
892 let kv_backend = build_mysql_kv_backend("range-test").await.unwrap();
893 let prefix = b"range/";
894 prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
895 test_kv_range_with_prefix(&kv_backend, prefix.to_vec()).await;
896 unprepare_kv(&kv_backend, prefix).await;
897 }
898
899 #[tokio::test]
900 async fn test_mysql_range_2() {
901 maybe_skip_mysql_integration_test!();
902 let kv_backend = build_mysql_kv_backend("range2-test").await.unwrap();
903 let prefix = b"range2/";
904 test_kv_range_2_with_prefix(&kv_backend, prefix.to_vec()).await;
905 unprepare_kv(&kv_backend, prefix).await;
906 }
907
908 #[tokio::test]
909 async fn test_mysql_all_range() {
910 maybe_skip_mysql_integration_test!();
911 let kv_backend = build_mysql_kv_backend("simple_range-test").await.unwrap();
912 let prefix = b"";
913 prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
914 test_simple_kv_range(&kv_backend).await;
915 unprepare_kv(&kv_backend, prefix).await;
916 }
917
918 #[tokio::test]
919 async fn test_mysql_batch_get() {
920 maybe_skip_mysql_integration_test!();
921 let kv_backend = build_mysql_kv_backend("batch_get-test").await.unwrap();
922 let prefix = b"batch_get/";
923 prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
924 test_kv_batch_get_with_prefix(&kv_backend, prefix.to_vec()).await;
925 unprepare_kv(&kv_backend, prefix).await;
926 }
927
928 #[tokio::test]
929 async fn test_mysql_batch_delete() {
930 maybe_skip_mysql_integration_test!();
931 let kv_backend = build_mysql_kv_backend("batch_delete-test").await.unwrap();
932 let prefix = b"batch_delete/";
933 prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
934 test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await;
935 unprepare_kv(&kv_backend, prefix).await;
936 }
937
938 #[tokio::test]
939 async fn test_mysql_batch_delete_with_prefix() {
940 maybe_skip_mysql_integration_test!();
941 let kv_backend = build_mysql_kv_backend("batch_delete_with_prefix-test")
942 .await
943 .unwrap();
944 let prefix = b"batch_delete/";
945 prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
946 test_kv_batch_delete_with_prefix(&kv_backend, prefix.to_vec()).await;
947 unprepare_kv(&kv_backend, prefix).await;
948 }
949
950 #[tokio::test]
951 async fn test_mysql_delete_range() {
952 maybe_skip_mysql_integration_test!();
953 let kv_backend = build_mysql_kv_backend("delete_range-test").await.unwrap();
954 let prefix = b"delete_range/";
955 prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
956 test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await;
957 unprepare_kv(&kv_backend, prefix).await;
958 }
959
960 #[tokio::test]
961 async fn test_mysql_compare_and_put() {
962 maybe_skip_mysql_integration_test!();
963 let kv_backend = build_mysql_kv_backend("compare_and_put-test")
964 .await
965 .unwrap();
966 let prefix = b"compare_and_put/";
967 let kv_backend = Arc::new(kv_backend);
968 test_kv_compare_and_put_with_prefix(kv_backend.clone(), prefix.to_vec()).await;
969 }
970
971 #[tokio::test]
972 async fn test_mysql_txn() {
973 maybe_skip_mysql_integration_test!();
974 let kv_backend = build_mysql_kv_backend("txn-test").await.unwrap();
975 test_txn_one_compare_op(&kv_backend).await;
976 text_txn_multi_compare_op(&kv_backend).await;
977 test_txn_compare_equal(&kv_backend).await;
978 test_txn_compare_greater(&kv_backend).await;
979 test_txn_compare_less(&kv_backend).await;
980 test_txn_compare_not_equal(&kv_backend).await;
981 }
982
983 #[tokio::test]
984 async fn test_mysql_with_tls() {
985 common_telemetry::init_default_ut_logging();
986 maybe_skip_mysql_integration_test!();
987 let endpoint = std::env::var("GT_MYSQL_ENDPOINTS").unwrap();
988
989 let opts = endpoint
990 .parse::<MySqlConnectOptions>()
991 .unwrap()
992 .ssl_mode(MySqlSslMode::Required);
993 let pool = MySqlPool::connect_with(opts).await.unwrap();
994 sqlx::query("SELECT 1").execute(&pool).await.unwrap();
995 }
996
997 #[tokio::test]
998 async fn test_mysql_with_mtls() {
999 common_telemetry::init_default_ut_logging();
1000 maybe_skip_mysql_integration_test!();
1001 let endpoint = std::env::var("GT_MYSQL_ENDPOINTS").unwrap();
1002 let certs_dir = test_certs_dir();
1003
1004 let opts = endpoint
1005 .parse::<MySqlConnectOptions>()
1006 .unwrap()
1007 .ssl_mode(MySqlSslMode::Required)
1008 .ssl_client_cert(certs_dir.join("client.crt").to_string_lossy().to_string())
1009 .ssl_client_key(certs_dir.join("client.key").to_string_lossy().to_string());
1010 let pool = MySqlPool::connect_with(opts).await.unwrap();
1011 sqlx::query("SELECT 1").execute(&pool).await.unwrap();
1012 }
1013
1014 #[tokio::test]
1015 async fn test_mysql_with_tls_verify_ca() {
1016 common_telemetry::init_default_ut_logging();
1017 maybe_skip_mysql_integration_test!();
1018 let endpoint = std::env::var("GT_MYSQL_ENDPOINTS").unwrap();
1019 let certs_dir = test_certs_dir();
1020
1021 let opts = endpoint
1022 .parse::<MySqlConnectOptions>()
1023 .unwrap()
1024 .ssl_mode(MySqlSslMode::VerifyCa)
1025 .ssl_ca(certs_dir.join("root.crt").to_string_lossy().to_string())
1026 .ssl_client_cert(certs_dir.join("client.crt").to_string_lossy().to_string())
1027 .ssl_client_key(certs_dir.join("client.key").to_string_lossy().to_string());
1028 let pool = MySqlPool::connect_with(opts).await.unwrap();
1029 sqlx::query("SELECT 1").execute(&pool).await.unwrap();
1030 }
1031
1032 #[tokio::test]
1033 async fn test_mysql_with_tls_verify_ident() {
1034 common_telemetry::init_default_ut_logging();
1035 maybe_skip_mysql_integration_test!();
1036 let endpoint = std::env::var("GT_MYSQL_ENDPOINTS").unwrap();
1037 let certs_dir = test_certs_dir();
1038
1039 let opts = endpoint
1040 .parse::<MySqlConnectOptions>()
1041 .unwrap()
1042 .ssl_mode(MySqlSslMode::VerifyIdentity)
1043 .ssl_ca(certs_dir.join("root.crt").to_string_lossy().to_string())
1044 .ssl_client_cert(certs_dir.join("client.crt").to_string_lossy().to_string())
1045 .ssl_client_key(certs_dir.join("client.key").to_string_lossy().to_string());
1046 let pool = MySqlPool::connect_with(opts).await.unwrap();
1047 sqlx::query("SELECT 1").execute(&pool).await.unwrap();
1048 }
1049}