1use std::marker::PhantomData;
16use std::sync::Arc;
17
18use common_telemetry::{debug, info, warn};
19use lazy_static::lazy_static;
20use regex::Regex;
21use snafu::{OptionExt, ResultExt};
22use sqlx::mysql::MySqlRow;
23use sqlx::pool::Pool;
24use sqlx::{MySql, MySqlPool, Row, Transaction as MySqlTransaction};
25use strum::AsRefStr;
26
27use crate::error::{
28 CreateMySqlPoolSnafu, MySqlExecutionSnafu, MySqlTransactionSnafu, Result, UnexpectedSnafu,
29};
30use crate::kv_backend::KvBackendRef;
31use crate::kv_backend::rds::{
32 Executor, ExecutorFactory, ExecutorImpl, KvQueryExecutor, RDS_STORE_OP_BATCH_DELETE,
33 RDS_STORE_OP_BATCH_GET, RDS_STORE_OP_BATCH_PUT, RDS_STORE_OP_RANGE_DELETE,
34 RDS_STORE_OP_RANGE_QUERY, RDS_STORE_TXN_RETRY_COUNT, RdsStore, Transaction,
35 ensure_rustls_crypto_provider_installed,
36};
37use crate::rpc::KeyValue;
38use crate::rpc::store::{
39 BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
40 BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, RangeRequest, RangeResponse,
41};
42
43const MYSQL_STORE_NAME: &str = "mysql_store";
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46enum ValueBlobType {
47 Blob,
48 MediumBlob,
49 LongBlob,
50}
51
52lazy_static! {
53 static ref VALUE_COLUMN_BLOB_TYPE_RE: Regex =
54 Regex::new(r#"(?i)(?:\(|,)\s*[`"]?v[`"]?\s+(longblob|mediumblob|blob)\b"#).unwrap();
55}
56
57type MySqlClient = Arc<Pool<MySql>>;
58pub struct MySqlTxnClient(MySqlTransaction<'static, MySql>);
59
60fn key_value_from_row(row: MySqlRow) -> KeyValue {
61 KeyValue {
63 key: row.get_unchecked(0),
64 value: row.get_unchecked(1),
65 }
66}
67
68const EMPTY: &[u8] = &[0];
69
70#[derive(Debug, Clone, Copy, AsRefStr)]
72enum RangeTemplateType {
73 Point,
74 Range,
75 Full,
76 LeftBounded,
77 Prefix,
78}
79
80impl RangeTemplateType {
82 fn build_params(&self, mut key: Vec<u8>, range_end: Vec<u8>) -> Vec<Vec<u8>> {
85 match self {
86 RangeTemplateType::Point => vec![key],
87 RangeTemplateType::Range => vec![key, range_end],
88 RangeTemplateType::Full => vec![],
89 RangeTemplateType::LeftBounded => vec![key],
90 RangeTemplateType::Prefix => {
91 key.push(b'%');
92 vec![key]
93 }
94 }
95 }
96}
97
98#[derive(Debug, Clone)]
100struct RangeTemplate {
101 point: String,
102 range: String,
103 full: String,
104 left_bounded: String,
105 prefix: String,
106}
107
108impl RangeTemplate {
109 fn get(&self, typ: RangeTemplateType) -> &str {
111 match typ {
112 RangeTemplateType::Point => &self.point,
113 RangeTemplateType::Range => &self.range,
114 RangeTemplateType::Full => &self.full,
115 RangeTemplateType::LeftBounded => &self.left_bounded,
116 RangeTemplateType::Prefix => &self.prefix,
117 }
118 }
119
120 fn with_limit(template: &str, limit: i64) -> String {
122 if limit == 0 {
123 return format!("{};", template);
124 }
125 format!("{} LIMIT {};", template, limit)
126 }
127}
128
129fn is_prefix_range(start: &[u8], end: &[u8]) -> bool {
130 if start.len() != end.len() {
131 return false;
132 }
133 let l = start.len();
134 let same_prefix = start[0..l - 1] == end[0..l - 1];
135 if let (Some(rhs), Some(lhs)) = (start.last(), end.last()) {
136 return same_prefix && (*rhs + 1) == *lhs;
137 }
138 false
139}
140
141fn range_template(key: &[u8], range_end: &[u8]) -> RangeTemplateType {
143 match (key, range_end) {
144 (_, &[]) => RangeTemplateType::Point,
145 (EMPTY, EMPTY) => RangeTemplateType::Full,
146 (_, EMPTY) => RangeTemplateType::LeftBounded,
147 (start, end) => {
148 if is_prefix_range(start, end) {
149 RangeTemplateType::Prefix
150 } else {
151 RangeTemplateType::Range
152 }
153 }
154 }
155}
156
157fn mysql_generate_in_placeholders(from: usize, to: usize) -> Vec<String> {
159 (from..=to).map(|_| "?".to_string()).collect()
160}
161
162struct MySqlTemplateFactory<'a> {
164 table_name: &'a str,
165}
166
167impl<'a> MySqlTemplateFactory<'a> {
168 fn new(table_name: &'a str) -> Self {
170 Self { table_name }
171 }
172
173 fn build(&self) -> MySqlTemplateSet {
175 let table_name = self.table_name;
176 MySqlTemplateSet {
178 table_name: table_name.to_string(),
179 create_table_statement: format!(
180 "CREATE TABLE IF NOT EXISTS `{table_name}`(k VARBINARY(3072) PRIMARY KEY, v MEDIUMBLOB);",
182 ),
183 show_create_table_statement: format!("SHOW CREATE TABLE `{table_name}`"),
184 alter_value_column_statement: format!(
185 "ALTER TABLE `{table_name}` MODIFY COLUMN v MEDIUMBLOB;"
186 ),
187 range_template: RangeTemplate {
188 point: format!("SELECT k, v FROM `{table_name}` WHERE k = ?"),
189 range: format!("SELECT k, v FROM `{table_name}` WHERE k >= ? AND k < ? ORDER BY k"),
190 full: format!("SELECT k, v FROM `{table_name}` ORDER BY k"),
191 left_bounded: format!("SELECT k, v FROM `{table_name}` WHERE k >= ? ORDER BY k"),
192 prefix: format!("SELECT k, v FROM `{table_name}` WHERE k LIKE ? ORDER BY k"),
193 },
194 delete_template: RangeTemplate {
195 point: format!("DELETE FROM `{table_name}` WHERE k = ?;"),
196 range: format!("DELETE FROM `{table_name}` WHERE k >= ? AND k < ?;"),
197 full: format!("DELETE FROM `{table_name}`"),
198 left_bounded: format!("DELETE FROM `{table_name}` WHERE k >= ?;"),
199 prefix: format!("DELETE FROM `{table_name}` WHERE k LIKE ?;"),
200 },
201 }
202 }
203}
204
205#[derive(Debug, Clone)]
207pub struct MySqlTemplateSet {
208 table_name: String,
209 create_table_statement: String,
210 show_create_table_statement: String,
211 alter_value_column_statement: String,
212 range_template: RangeTemplate,
213 delete_template: RangeTemplate,
214}
215
216impl MySqlTemplateSet {
217 fn generate_batch_get_query(&self, key_len: usize) -> String {
219 let table_name = &self.table_name;
220 let in_clause = mysql_generate_in_placeholders(1, key_len).join(", ");
221 format!(
222 "SELECT k, v FROM `{table_name}` WHERE k in ({});",
223 in_clause
224 )
225 }
226
227 fn generate_batch_delete_query(&self, key_len: usize) -> String {
229 let table_name = &self.table_name;
230 let in_clause = mysql_generate_in_placeholders(1, key_len).join(", ");
231 format!("DELETE FROM `{table_name}` WHERE k in ({});", in_clause)
232 }
233
234 fn generate_batch_upsert_query(&self, kv_len: usize) -> (String, String) {
237 let table_name = &self.table_name;
238 let in_placeholders: Vec<String> = (1..=kv_len).map(|_| "?".to_string()).collect();
239 let in_clause = in_placeholders.join(", ");
240 let mut values_placeholders = Vec::new();
241 for _ in 0..kv_len {
242 values_placeholders.push("(?, ?)".to_string());
243 }
244 let values_clause = values_placeholders.join(", ");
245
246 (
247 format!(r#"SELECT k, v FROM `{table_name}` WHERE k IN ({in_clause})"#,),
248 format!(
249 r#"INSERT INTO `{table_name}` (k, v) VALUES {values_clause} ON DUPLICATE KEY UPDATE v = VALUES(v);"#,
250 ),
251 )
252 }
253}
254
255#[async_trait::async_trait]
256impl Executor for MySqlClient {
257 type Transaction<'a>
258 = MySqlTxnClient
259 where
260 Self: 'a;
261
262 fn name() -> &'static str {
263 "MySql"
264 }
265
266 async fn query(&mut self, raw_query: &str, params: &[&Vec<u8>]) -> Result<Vec<KeyValue>> {
267 let query = sqlx::query(raw_query);
268 let query = params.iter().fold(query, |query, param| query.bind(param));
269 let rows = query
270 .fetch_all(&**self)
271 .await
272 .context(MySqlExecutionSnafu { sql: raw_query })?;
273 Ok(rows.into_iter().map(key_value_from_row).collect())
274 }
275
276 async fn execute(&mut self, raw_query: &str, params: &[&Vec<u8>]) -> Result<()> {
277 let query = sqlx::query(raw_query);
278 let query = params.iter().fold(query, |query, param| query.bind(param));
279 query
280 .execute(&**self)
281 .await
282 .context(MySqlExecutionSnafu { sql: raw_query })?;
283 Ok(())
284 }
285
286 async fn txn_executor<'a>(&'a mut self) -> Result<Self::Transaction<'a>> {
287 sqlx::query("SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE")
290 .execute(&**self)
291 .await
292 .context(MySqlExecutionSnafu {
293 sql: "SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE",
294 })?;
295 let txn = self
296 .begin()
297 .await
298 .context(MySqlExecutionSnafu { sql: "begin" })?;
299 Ok(MySqlTxnClient(txn))
300 }
301}
302
303#[async_trait::async_trait]
304impl Transaction<'_> for MySqlTxnClient {
305 async fn query(&mut self, raw_query: &str, params: &[&Vec<u8>]) -> Result<Vec<KeyValue>> {
306 let query = sqlx::query(raw_query);
307 let query = params.iter().fold(query, |query, param| query.bind(param));
308 let rows = query
310 .fetch_all(&mut *(self.0))
311 .await
312 .context(MySqlExecutionSnafu { sql: raw_query })?;
313 Ok(rows.into_iter().map(key_value_from_row).collect())
314 }
315
316 async fn execute(&mut self, raw_query: &str, params: &[&Vec<u8>]) -> Result<()> {
317 let query = sqlx::query(raw_query);
318 let query = params.iter().fold(query, |query, param| query.bind(param));
319 query
321 .execute(&mut *(self.0))
322 .await
323 .context(MySqlExecutionSnafu { sql: raw_query })?;
324 Ok(())
325 }
326
327 async fn commit(self) -> Result<()> {
330 self.0.commit().await.context(MySqlTransactionSnafu {
331 operation: "commit",
332 })?;
333 Ok(())
334 }
335}
336
337pub struct MySqlExecutorFactory {
338 pool: Arc<Pool<MySql>>,
339}
340
341#[async_trait::async_trait]
342impl ExecutorFactory<MySqlClient> for MySqlExecutorFactory {
343 async fn default_executor(&self) -> Result<MySqlClient> {
344 Ok(self.pool.clone())
345 }
346
347 async fn txn_executor<'a>(
348 &self,
349 default_executor: &'a mut MySqlClient,
350 ) -> Result<MySqlTxnClient> {
351 default_executor.txn_executor().await
352 }
353}
354
355pub type MySqlStore = RdsStore<MySqlClient, MySqlExecutorFactory, MySqlTemplateSet>;
358
359#[async_trait::async_trait]
360impl KvQueryExecutor<MySqlClient> for MySqlStore {
361 async fn range_with_query_executor(
362 &self,
363 query_executor: &mut ExecutorImpl<'_, MySqlClient>,
364 req: RangeRequest,
365 ) -> Result<RangeResponse> {
366 let template_type = range_template(&req.key, &req.range_end);
367 let template = self.sql_template_set.range_template.get(template_type);
368 let params = template_type.build_params(req.key, req.range_end);
369 let params_ref = params.iter().collect::<Vec<_>>();
370 let query =
372 RangeTemplate::with_limit(template, if req.limit == 0 { 0 } else { req.limit + 1 });
373 let limit = req.limit as usize;
374 debug!("query: {:?}, params: {:?}", query, params);
375 let mut kvs = crate::record_rds_sql_execute_elapsed!(
376 query_executor.query(&query, ¶ms_ref).await,
377 MYSQL_STORE_NAME,
378 RDS_STORE_OP_RANGE_QUERY,
379 template_type.as_ref()
380 )?;
381 if req.keys_only {
382 kvs.iter_mut().for_each(|kv| kv.value = vec![]);
383 }
384 if limit == 0 || kvs.len() <= limit {
386 return Ok(RangeResponse { kvs, more: false });
387 }
388 let removed = kvs.pop();
390 debug_assert!(removed.is_some());
391 Ok(RangeResponse { kvs, more: true })
392 }
393
394 async fn batch_put_with_query_executor(
395 &self,
396 query_executor: &mut ExecutorImpl<'_, MySqlClient>,
397 req: BatchPutRequest,
398 ) -> Result<BatchPutResponse> {
399 let mut in_params = Vec::with_capacity(req.kvs.len() * 3);
400 let mut values_params = Vec::with_capacity(req.kvs.len() * 2);
401
402 for kv in &req.kvs {
403 let processed_key = &kv.key;
404 in_params.push(processed_key);
405
406 let processed_value = &kv.value;
407 values_params.push(processed_key);
408 values_params.push(processed_value);
409 }
410 let in_params = in_params.iter().map(|x| x as _).collect::<Vec<_>>();
411 let values_params = values_params.iter().map(|x| x as _).collect::<Vec<_>>();
412 let (select, update) = self
413 .sql_template_set
414 .generate_batch_upsert_query(req.kvs.len());
415
416 if !req.prev_kv {
418 crate::record_rds_sql_execute_elapsed!(
419 query_executor.execute(&update, &values_params).await,
420 MYSQL_STORE_NAME,
421 RDS_STORE_OP_BATCH_PUT,
422 ""
423 )?;
424 return Ok(BatchPutResponse::default());
425 }
426 if let ExecutorImpl::Default(query_executor) = query_executor {
428 let txn = query_executor.txn_executor().await?;
429 let mut txn = ExecutorImpl::Txn(txn);
430 let res = self.batch_put_with_query_executor(&mut txn, req).await;
431 txn.commit().await?;
432 return res;
433 }
434 let prev_kvs = crate::record_rds_sql_execute_elapsed!(
435 query_executor.query(&select, &in_params).await,
436 MYSQL_STORE_NAME,
437 RDS_STORE_OP_BATCH_PUT,
438 ""
439 )?;
440 query_executor.execute(&update, &values_params).await?;
441 Ok(BatchPutResponse { prev_kvs })
442 }
443
444 async fn batch_get_with_query_executor(
445 &self,
446 query_executor: &mut ExecutorImpl<'_, MySqlClient>,
447 req: BatchGetRequest,
448 ) -> Result<BatchGetResponse> {
449 if req.keys.is_empty() {
450 return Ok(BatchGetResponse { kvs: vec![] });
451 }
452 let query = self
453 .sql_template_set
454 .generate_batch_get_query(req.keys.len());
455 let params = req.keys.iter().map(|x| x as _).collect::<Vec<_>>();
456 let kvs = crate::record_rds_sql_execute_elapsed!(
457 query_executor.query(&query, ¶ms).await,
458 MYSQL_STORE_NAME,
459 RDS_STORE_OP_BATCH_GET,
460 ""
461 )?;
462 Ok(BatchGetResponse { kvs })
463 }
464
465 async fn delete_range_with_query_executor(
466 &self,
467 query_executor: &mut ExecutorImpl<'_, MySqlClient>,
468 req: DeleteRangeRequest,
469 ) -> Result<DeleteRangeResponse> {
470 if let ExecutorImpl::Default(query_executor) = query_executor {
473 let txn = query_executor.txn_executor().await?;
474 let mut txn = ExecutorImpl::Txn(txn);
475 let res = self.delete_range_with_query_executor(&mut txn, req).await;
476 txn.commit().await?;
477 return res;
478 }
479 let range_get_req = RangeRequest {
480 key: req.key.clone(),
481 range_end: req.range_end.clone(),
482 limit: 0,
483 keys_only: false,
484 };
485 let prev_kvs = self
486 .range_with_query_executor(query_executor, range_get_req)
487 .await?
488 .kvs;
489 let template_type = range_template(&req.key, &req.range_end);
490 let template = self.sql_template_set.delete_template.get(template_type);
491 let params = template_type.build_params(req.key, req.range_end);
492 let params_ref = params.iter().map(|x| x as _).collect::<Vec<_>>();
493 crate::record_rds_sql_execute_elapsed!(
494 query_executor.execute(template, ¶ms_ref).await,
495 MYSQL_STORE_NAME,
496 RDS_STORE_OP_RANGE_DELETE,
497 template_type.as_ref()
498 )?;
499 let mut resp = DeleteRangeResponse::new(prev_kvs.len() as i64);
500 if req.prev_kv {
501 resp.with_prev_kvs(prev_kvs);
502 }
503 Ok(resp)
504 }
505
506 async fn batch_delete_with_query_executor(
507 &self,
508 query_executor: &mut ExecutorImpl<'_, MySqlClient>,
509 req: BatchDeleteRequest,
510 ) -> Result<BatchDeleteResponse> {
511 if req.keys.is_empty() {
512 return Ok(BatchDeleteResponse::default());
513 }
514 let query = self
515 .sql_template_set
516 .generate_batch_delete_query(req.keys.len());
517 let params = req.keys.iter().map(|x| x as _).collect::<Vec<_>>();
518 if !req.prev_kv {
520 crate::record_rds_sql_execute_elapsed!(
521 query_executor.execute(&query, ¶ms).await,
522 MYSQL_STORE_NAME,
523 RDS_STORE_OP_BATCH_DELETE,
524 ""
525 )?;
526 return Ok(BatchDeleteResponse::default());
527 }
528 if let ExecutorImpl::Default(query_executor) = query_executor {
530 let txn = query_executor.txn_executor().await?;
531 let mut txn = ExecutorImpl::Txn(txn);
532 let res = self.batch_delete_with_query_executor(&mut txn, req).await;
533 txn.commit().await?;
534 return res;
535 }
536 let batch_get_req = BatchGetRequest {
538 keys: req.keys.clone(),
539 };
540 let prev_kvs = self
541 .batch_get_with_query_executor(query_executor, batch_get_req)
542 .await?
543 .kvs;
544 crate::record_rds_sql_execute_elapsed!(
546 query_executor.execute(&query, ¶ms).await,
547 MYSQL_STORE_NAME,
548 RDS_STORE_OP_BATCH_DELETE,
549 ""
550 )?;
551 if req.prev_kv {
552 Ok(BatchDeleteResponse { prev_kvs })
553 } else {
554 Ok(BatchDeleteResponse::default())
555 }
556 }
557}
558
559impl MySqlStore {
560 async fn fetch_create_table_sql(
562 pool: &Pool<MySql>,
563 sql_template_set: &MySqlTemplateSet,
564 ) -> Result<Option<String>> {
565 let row = sqlx::query(&sql_template_set.show_create_table_statement)
566 .fetch_optional(pool)
567 .await
568 .with_context(|_| MySqlExecutionSnafu {
569 sql: sql_template_set.show_create_table_statement.clone(),
570 })?;
571 Ok(row.map(|row| row.get(1)))
572 }
573
574 fn parse_value_column_blob_type(create_table_sql: &str) -> Option<ValueBlobType> {
576 let captures = VALUE_COLUMN_BLOB_TYPE_RE.captures(create_table_sql)?;
579 match captures.get(1)?.as_str().to_ascii_lowercase().as_str() {
580 "blob" => Some(ValueBlobType::Blob),
581 "mediumblob" => Some(ValueBlobType::MediumBlob),
582 "longblob" => Some(ValueBlobType::LongBlob),
583 _ => None,
584 }
585 }
586
587 async fn maybe_upgrade_value_column_to_mediumblob(
589 pool: &Pool<MySql>,
590 sql_template_set: &MySqlTemplateSet,
591 ) -> Result<()> {
592 let table_name = &sql_template_set.table_name;
593 let create_table_sql = Self::fetch_create_table_sql(pool, sql_template_set)
594 .await?
595 .context(UnexpectedSnafu {
596 err_msg: format!("Failed to fetch CREATE TABLE SQL for `{table_name}`"),
597 })?;
598
599 match Self::parse_value_column_blob_type(&create_table_sql) {
600 Some(ValueBlobType::Blob) => {
601 sqlx::query(&sql_template_set.alter_value_column_statement)
602 .execute(pool)
603 .await
604 .with_context(|_| MySqlExecutionSnafu {
605 sql: sql_template_set.alter_value_column_statement.clone(),
606 })?;
607 info!("Upgraded MySQL metadata value column to MEDIUMBLOB for `{table_name}`");
608 }
609 Some(ValueBlobType::MediumBlob | ValueBlobType::LongBlob) => {
610 debug!("MySQL metadata value column for `{table_name}` is already compatible");
611 }
612 None => {
613 warn!(
614 "Failed to determine MySQL metadata value column type from table definition for `{table_name}`, skip automatic MEDIUMBLOB upgrade"
615 );
616 }
617 }
618
619 Ok(())
620 }
621
622 pub async fn with_url(url: &str, table_name: &str, max_txn_ops: usize) -> Result<KvBackendRef> {
624 ensure_rustls_crypto_provider_installed()?;
625 let pool = MySqlPool::connect(url)
626 .await
627 .context(CreateMySqlPoolSnafu)?;
628 Self::with_mysql_pool(pool, table_name, max_txn_ops).await
629 }
630
631 pub async fn with_mysql_pool(
633 pool: Pool<MySql>,
634 table_name: &str,
635 max_txn_ops: usize,
636 ) -> Result<KvBackendRef> {
637 let sql_template_set = MySqlTemplateFactory::new(table_name).build();
641 sqlx::query(&sql_template_set.create_table_statement)
642 .execute(&pool)
643 .await
644 .with_context(|_| MySqlExecutionSnafu {
645 sql: sql_template_set.create_table_statement.clone(),
646 })?;
647 Self::maybe_upgrade_value_column_to_mediumblob(&pool, &sql_template_set).await?;
648 Ok(Arc::new(MySqlStore {
649 max_txn_ops,
650 sql_template_set,
651 txn_retry_count: RDS_STORE_TXN_RETRY_COUNT,
652 executor_factory: MySqlExecutorFactory {
653 pool: Arc::new(pool),
654 },
655 _phantom: PhantomData,
656 }))
657 }
658}
659
660#[cfg(test)]
661mod tests {
662 use common_telemetry::init_default_ut_logging;
663 use sqlx::mysql::{MySqlConnectOptions, MySqlSslMode};
664 use uuid::Uuid;
665
666 use super::*;
667 use crate::kv_backend::test::{
668 prepare_kv_with_prefix, test_kv_batch_delete_with_prefix, test_kv_batch_get_with_prefix,
669 test_kv_compare_and_put_with_prefix, test_kv_delete_range_with_prefix,
670 test_kv_put_with_prefix, test_kv_range_2_with_prefix, test_kv_range_with_prefix,
671 test_simple_kv_range, test_txn_compare_equal, test_txn_compare_greater,
672 test_txn_compare_less, test_txn_compare_not_equal, test_txn_one_compare_op,
673 text_txn_multi_compare_op, unprepare_kv,
674 };
675 use crate::maybe_skip_mysql_integration_test;
676 use crate::rpc::store::{PutRequest, RangeRequest};
677 use crate::test_util::test_certs_dir;
678
679 fn new_test_table_name(prefix: &str) -> String {
680 let uuid = Uuid::new_v4().simple().to_string();
681 let max_prefix_len = 63usize.saturating_sub(uuid.len() + 1);
682 let prefix = &prefix[..prefix.len().min(max_prefix_len)];
683 format!("{prefix}_{uuid}")
684 }
685
686 async fn mysql_pool() -> Option<MySqlPool> {
687 init_default_ut_logging();
688 let endpoints = std::env::var("GT_MYSQL_ENDPOINTS").unwrap_or_default();
689 if endpoints.is_empty() {
690 return None;
691 }
692 ensure_rustls_crypto_provider_installed().unwrap();
693 Some(MySqlPool::connect(&endpoints).await.unwrap())
694 }
695
696 async fn show_create_table(pool: &MySqlPool, table_name: &str) -> String {
697 let sql = format!("SHOW CREATE TABLE `{table_name}`");
698 let row = sqlx::query(&sql).fetch_one(pool).await.unwrap();
699 row.get::<String, _>(1)
700 }
701
702 async fn create_legacy_blob_table(pool: &MySqlPool, table_name: &str) {
703 let sql = format!(
704 "CREATE TABLE IF NOT EXISTS `{table_name}`(k VARBINARY(3072) PRIMARY KEY, v BLOB);"
705 );
706 sqlx::query(&sql).execute(pool).await.unwrap();
707 }
708
709 async fn drop_table(pool: &MySqlPool, table_name: &str) {
710 let sql = format!("DROP TABLE IF EXISTS `{table_name}`;");
711 sqlx::query(&sql).execute(pool).await.unwrap();
712 }
713
714 async fn build_mysql_kv_backend(table_name: &str) -> Option<MySqlStore> {
715 let pool = mysql_pool().await?;
716 let sql_templates = MySqlTemplateFactory::new(table_name).build();
717 sqlx::query(&sql_templates.create_table_statement)
718 .execute(&pool)
719 .await
720 .unwrap();
721 Some(MySqlStore {
722 max_txn_ops: 128,
723 sql_template_set: sql_templates,
724 txn_retry_count: RDS_STORE_TXN_RETRY_COUNT,
725 executor_factory: MySqlExecutorFactory {
726 pool: Arc::new(pool),
727 },
728 _phantom: PhantomData,
729 })
730 }
731
732 #[test]
733 fn test_parse_value_column_blob_type() {
734 let sql = r#"CREATE TABLE `greptime_metakv` (
735 `k` varbinary(3072) NOT NULL,
736 `v` MEDIUMBLOB,
737 PRIMARY KEY (`k`)
738) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4"#;
739 assert_eq!(
740 Some(ValueBlobType::MediumBlob),
741 MySqlStore::parse_value_column_blob_type(sql)
742 );
743
744 let sql = r#"CREATE TABLE `greptime_metakv` (`k` varbinary(3072) NOT NULL, `v` blob, PRIMARY KEY (`k`))"#;
745 assert_eq!(
746 Some(ValueBlobType::Blob),
747 MySqlStore::parse_value_column_blob_type(sql)
748 );
749
750 let sql = r#"CREATE TABLE `greptime_metakv` (`k` varbinary(3072) NOT NULL, `v` longblob, PRIMARY KEY (`k`))"#;
751 assert_eq!(
752 Some(ValueBlobType::LongBlob),
753 MySqlStore::parse_value_column_blob_type(sql)
754 );
755
756 let sql = "CREATE TABLE `greptime_metakv` (`k` varbinary(3072) NOT NULL, `v` BLOB NOT NULL, PRIMARY KEY (`k`))";
757 assert_eq!(
758 Some(ValueBlobType::Blob),
759 MySqlStore::parse_value_column_blob_type(sql)
760 );
761
762 let sql = "CREATE TABLE `greptime_metakv` (\n `k` varbinary(3072) NOT NULL,\n \"v\" MediumBlob,\n PRIMARY KEY (`k`)\n)";
763 assert_eq!(
764 Some(ValueBlobType::MediumBlob),
765 MySqlStore::parse_value_column_blob_type(sql)
766 );
767
768 let sql = "CREATE TABLE `greptime_metakv` (`k` varbinary(3072) NOT NULL, vv blob, `v` longblob, PRIMARY KEY (`k`))";
769 assert_eq!(
770 Some(ValueBlobType::LongBlob),
771 MySqlStore::parse_value_column_blob_type(sql)
772 );
773
774 let sql = "CREATE TABLE `greptime_metakv` (`k` varbinary(3072) NOT NULL, value blob, PRIMARY KEY (`k`))";
775 assert_eq!(None, MySqlStore::parse_value_column_blob_type(sql));
776
777 let sql = "CREATE TABLE `greptime_metakv` (`k` varbinary(3072) NOT NULL, `v` varchar(255), PRIMARY KEY (`k`))";
778 assert_eq!(None, MySqlStore::parse_value_column_blob_type(sql));
779
780 let sql =
781 "CREATE TABLE `greptime_metakv` (`k` varbinary(3072) NOT NULL, PRIMARY KEY (`k`))";
782 assert_eq!(None, MySqlStore::parse_value_column_blob_type(sql));
783 }
784
785 #[tokio::test]
786 async fn test_mysql_new_metadata_table_uses_mediumblob() {
787 maybe_skip_mysql_integration_test!();
788 let pool = mysql_pool().await.unwrap();
789 let table_name = new_test_table_name("test_mysql_mediumblob_schema");
790
791 MySqlStore::with_mysql_pool(pool.clone(), &table_name, 128)
792 .await
793 .unwrap();
794
795 let create_table = show_create_table(&pool, &table_name).await;
796 assert!(create_table.to_ascii_uppercase().contains("MEDIUMBLOB"));
797
798 drop_table(&pool, &table_name).await;
799 }
800
801 #[tokio::test]
802 async fn test_mysql_legacy_blob_metadata_table_is_upgraded() {
803 maybe_skip_mysql_integration_test!();
804 let pool = mysql_pool().await.unwrap();
805 let table_name = new_test_table_name("test_mysql_legacy_blob_upgrade");
806
807 create_legacy_blob_table(&pool, &table_name).await;
808 MySqlStore::with_mysql_pool(pool.clone(), &table_name, 128)
809 .await
810 .unwrap();
811
812 let create_table = show_create_table(&pool, &table_name).await;
813 assert!(create_table.to_ascii_uppercase().contains("MEDIUMBLOB"));
814
815 drop_table(&pool, &table_name).await;
816 }
817
818 #[tokio::test]
819 async fn test_mysql_metadata_table_stores_large_values() {
820 maybe_skip_mysql_integration_test!();
821 let pool = mysql_pool().await.unwrap();
822 let table_name = new_test_table_name("test_mysql_large_metadata_value");
823 let kv_backend = MySqlStore::with_mysql_pool(pool.clone(), &table_name, 128)
824 .await
825 .unwrap();
826 let key = b"large-value".to_vec();
827 let value = vec![b'x'; 70 * 1024];
828
829 kv_backend
830 .put(
831 PutRequest::new()
832 .with_key(key.clone())
833 .with_value(value.clone()),
834 )
835 .await
836 .unwrap();
837 let response = kv_backend
838 .range(RangeRequest::new().with_key(key.clone()))
839 .await
840 .unwrap();
841
842 assert_eq!(1, response.kvs.len());
843 assert_eq!(key, response.kvs[0].key);
844 assert_eq!(value, response.kvs[0].value);
845
846 drop_table(&pool, &table_name).await;
847 }
848
849 #[tokio::test]
850 async fn test_mysql_upgraded_metadata_table_stores_large_values() {
851 maybe_skip_mysql_integration_test!();
852 let pool = mysql_pool().await.unwrap();
853 let table_name = new_test_table_name("test_mysql_upgraded_large_metadata_value");
854
855 create_legacy_blob_table(&pool, &table_name).await;
856 let kv_backend = MySqlStore::with_mysql_pool(pool.clone(), &table_name, 128)
857 .await
858 .unwrap();
859 let key = b"large-value".to_vec();
860 let value = vec![b'y'; 70 * 1024];
861
862 kv_backend
863 .put(
864 PutRequest::new()
865 .with_key(key.clone())
866 .with_value(value.clone()),
867 )
868 .await
869 .unwrap();
870 let response = kv_backend
871 .range(RangeRequest::new().with_key(key.clone()))
872 .await
873 .unwrap();
874
875 assert_eq!(1, response.kvs.len());
876 assert_eq!(key, response.kvs[0].key);
877 assert_eq!(value, response.kvs[0].value);
878
879 drop_table(&pool, &table_name).await;
880 }
881
882 #[tokio::test]
883 async fn test_mysql_put() {
884 maybe_skip_mysql_integration_test!();
885 let kv_backend = build_mysql_kv_backend("put-test").await.unwrap();
886 let prefix = b"put/";
887 prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
888 test_kv_put_with_prefix(&kv_backend, prefix.to_vec()).await;
889 unprepare_kv(&kv_backend, prefix).await;
890 }
891
892 #[tokio::test]
893 async fn test_mysql_range() {
894 maybe_skip_mysql_integration_test!();
895 let kv_backend = build_mysql_kv_backend("range-test").await.unwrap();
896 let prefix = b"range/";
897 prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
898 test_kv_range_with_prefix(&kv_backend, prefix.to_vec()).await;
899 unprepare_kv(&kv_backend, prefix).await;
900 }
901
902 #[tokio::test]
903 async fn test_mysql_range_2() {
904 maybe_skip_mysql_integration_test!();
905 let kv_backend = build_mysql_kv_backend("range2-test").await.unwrap();
906 let prefix = b"range2/";
907 test_kv_range_2_with_prefix(&kv_backend, prefix.to_vec()).await;
908 unprepare_kv(&kv_backend, prefix).await;
909 }
910
911 #[tokio::test]
912 async fn test_mysql_all_range() {
913 maybe_skip_mysql_integration_test!();
914 let kv_backend = build_mysql_kv_backend("simple_range-test").await.unwrap();
915 let prefix = b"";
916 prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
917 test_simple_kv_range(&kv_backend).await;
918 unprepare_kv(&kv_backend, prefix).await;
919 }
920
921 #[tokio::test]
922 async fn test_mysql_batch_get() {
923 maybe_skip_mysql_integration_test!();
924 let kv_backend = build_mysql_kv_backend("batch_get-test").await.unwrap();
925 let prefix = b"batch_get/";
926 prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
927 test_kv_batch_get_with_prefix(&kv_backend, prefix.to_vec()).await;
928 unprepare_kv(&kv_backend, prefix).await;
929 }
930
931 #[tokio::test]
932 async fn test_mysql_batch_delete() {
933 maybe_skip_mysql_integration_test!();
934 let kv_backend = build_mysql_kv_backend("batch_delete-test").await.unwrap();
935 let prefix = b"batch_delete/";
936 prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
937 test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await;
938 unprepare_kv(&kv_backend, prefix).await;
939 }
940
941 #[tokio::test]
942 async fn test_mysql_batch_delete_with_prefix() {
943 maybe_skip_mysql_integration_test!();
944 let kv_backend = build_mysql_kv_backend("batch_delete_with_prefix-test")
945 .await
946 .unwrap();
947 let prefix = b"batch_delete/";
948 prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
949 test_kv_batch_delete_with_prefix(&kv_backend, prefix.to_vec()).await;
950 unprepare_kv(&kv_backend, prefix).await;
951 }
952
953 #[tokio::test]
954 async fn test_mysql_delete_range() {
955 maybe_skip_mysql_integration_test!();
956 let kv_backend = build_mysql_kv_backend("delete_range-test").await.unwrap();
957 let prefix = b"delete_range/";
958 prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
959 test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await;
960 unprepare_kv(&kv_backend, prefix).await;
961 }
962
963 #[tokio::test]
964 async fn test_mysql_compare_and_put() {
965 maybe_skip_mysql_integration_test!();
966 let kv_backend = build_mysql_kv_backend("compare_and_put-test")
967 .await
968 .unwrap();
969 let prefix = b"compare_and_put/";
970 let kv_backend = Arc::new(kv_backend);
971 test_kv_compare_and_put_with_prefix(kv_backend.clone(), prefix.to_vec()).await;
972 }
973
974 #[tokio::test]
975 async fn test_mysql_txn() {
976 maybe_skip_mysql_integration_test!();
977 let kv_backend = build_mysql_kv_backend("txn-test").await.unwrap();
978 test_txn_one_compare_op(&kv_backend).await;
979 text_txn_multi_compare_op(&kv_backend).await;
980 test_txn_compare_equal(&kv_backend).await;
981 test_txn_compare_greater(&kv_backend).await;
982 test_txn_compare_less(&kv_backend).await;
983 test_txn_compare_not_equal(&kv_backend).await;
984 }
985
986 #[tokio::test]
987 async fn test_mysql_with_tls() {
988 common_telemetry::init_default_ut_logging();
989 maybe_skip_mysql_integration_test!();
990 ensure_rustls_crypto_provider_installed().unwrap();
991 let endpoint = std::env::var("GT_MYSQL_ENDPOINTS").unwrap();
992
993 let opts = endpoint
994 .parse::<MySqlConnectOptions>()
995 .unwrap()
996 .ssl_mode(MySqlSslMode::Required);
997 let pool = MySqlPool::connect_with(opts).await.unwrap();
998 sqlx::query("SELECT 1").execute(&pool).await.unwrap();
999 }
1000
1001 #[tokio::test]
1002 async fn test_mysql_with_mtls() {
1003 common_telemetry::init_default_ut_logging();
1004 maybe_skip_mysql_integration_test!();
1005 ensure_rustls_crypto_provider_installed().unwrap();
1006 let endpoint = std::env::var("GT_MYSQL_ENDPOINTS").unwrap();
1007 let certs_dir = test_certs_dir();
1008
1009 let opts = endpoint
1010 .parse::<MySqlConnectOptions>()
1011 .unwrap()
1012 .ssl_mode(MySqlSslMode::Required)
1013 .ssl_client_cert(certs_dir.join("client.crt").to_string_lossy().to_string())
1014 .ssl_client_key(certs_dir.join("client.key").to_string_lossy().to_string());
1015 let pool = MySqlPool::connect_with(opts).await.unwrap();
1016 sqlx::query("SELECT 1").execute(&pool).await.unwrap();
1017 }
1018
1019 #[tokio::test]
1020 async fn test_mysql_with_tls_verify_ca() {
1021 common_telemetry::init_default_ut_logging();
1022 maybe_skip_mysql_integration_test!();
1023 ensure_rustls_crypto_provider_installed().unwrap();
1024 let endpoint = std::env::var("GT_MYSQL_ENDPOINTS").unwrap();
1025 let certs_dir = test_certs_dir();
1026
1027 let opts = endpoint
1028 .parse::<MySqlConnectOptions>()
1029 .unwrap()
1030 .ssl_mode(MySqlSslMode::VerifyCa)
1031 .ssl_ca(certs_dir.join("root.crt").to_string_lossy().to_string())
1032 .ssl_client_cert(certs_dir.join("client.crt").to_string_lossy().to_string())
1033 .ssl_client_key(certs_dir.join("client.key").to_string_lossy().to_string());
1034 let pool = MySqlPool::connect_with(opts).await.unwrap();
1035 sqlx::query("SELECT 1").execute(&pool).await.unwrap();
1036 }
1037
1038 #[tokio::test]
1039 async fn test_mysql_with_tls_verify_ident() {
1040 common_telemetry::init_default_ut_logging();
1041 maybe_skip_mysql_integration_test!();
1042 ensure_rustls_crypto_provider_installed().unwrap();
1043 let endpoint = std::env::var("GT_MYSQL_ENDPOINTS").unwrap();
1044 let certs_dir = test_certs_dir();
1045
1046 let opts = endpoint
1047 .parse::<MySqlConnectOptions>()
1048 .unwrap()
1049 .ssl_mode(MySqlSslMode::VerifyIdentity)
1050 .ssl_ca(certs_dir.join("root.crt").to_string_lossy().to_string())
1051 .ssl_client_cert(certs_dir.join("client.crt").to_string_lossy().to_string())
1052 .ssl_client_key(certs_dir.join("client.key").to_string_lossy().to_string());
1053 let pool = MySqlPool::connect_with(opts).await.unwrap();
1054 sqlx::query("SELECT 1").execute(&pool).await.unwrap();
1055 }
1056}