1use common_meta::election::ElectionRef;
16use common_meta::election::rds::mysql::{ElectionMysqlClient, MySqlElection};
17use common_meta::kv_backend::KvBackendRef;
18use common_meta::kv_backend::rds::MySqlStore;
19use common_telemetry::info;
20use servers::tls::{TlsMode, TlsOption};
21use snafu::{OptionExt, ResultExt};
22use sqlx::mysql::{MySqlConnectOptions, MySqlPool, MySqlSslMode};
23
24use crate::error::{self, Result};
25
26async fn setup_mysql_options(
27 store_addrs: &[String],
28 tls_config: Option<&TlsOption>,
29) -> Result<MySqlConnectOptions> {
30 let mysql_url = store_addrs.first().context(error::InvalidArgumentsSnafu {
31 err_msg: "empty store addrs",
32 })?;
33 let opts: MySqlConnectOptions = mysql_url
35 .parse()
36 .context(error::ParseMySqlUrlSnafu { mysql_url })?;
37 let mut opts = opts
38 .no_engine_substitution(false)
39 .pipes_as_concat(false)
40 .timezone(None)
41 .set_names(false);
42
43 let Some(tls_config) = tls_config else {
44 return Ok(opts);
45 };
46
47 match tls_config.mode {
48 TlsMode::Disable => return Ok(opts),
49 TlsMode::Prefer => {
50 opts = opts.ssl_mode(MySqlSslMode::Preferred);
51 }
52 TlsMode::Require => {
53 opts = opts.ssl_mode(MySqlSslMode::Required);
54 }
55 TlsMode::VerifyCa => {
56 opts = opts.ssl_mode(MySqlSslMode::VerifyCa);
57 opts = opts.ssl_ca(&tls_config.ca_cert_path);
58 }
59 TlsMode::VerifyFull => {
60 opts = opts.ssl_mode(MySqlSslMode::VerifyIdentity);
61 opts = opts.ssl_ca(&tls_config.ca_cert_path);
62 }
63 }
64 info!(
65 "Setting up MySQL options with TLS mode: {:?}",
66 tls_config.mode
67 );
68
69 if !tls_config.cert_path.is_empty() && !tls_config.key_path.is_empty() {
70 info!("Loading client certificate for mutual TLS");
71 opts = opts.ssl_client_cert(&tls_config.cert_path);
72 opts = opts.ssl_client_key(&tls_config.key_path);
73 }
74
75 Ok(opts)
76}
77
78pub async fn create_mysql_pool(
80 store_addrs: &[String],
81 tls_config: Option<&TlsOption>,
82) -> Result<MySqlPool> {
83 let opts = setup_mysql_options(store_addrs, tls_config).await?;
84 let pool = MySqlPool::connect_with(opts)
85 .await
86 .context(error::CreateMySqlPoolSnafu)?;
87
88 Ok(pool)
89}
90
91pub async fn build_mysql_kv_backend(
98 store_addrs: &[String],
99 tls_config: Option<&TlsOption>,
100 table_name: &str,
101 max_txn_ops: usize,
102) -> Result<KvBackendRef> {
103 let pool = create_mysql_pool(store_addrs, tls_config).await?;
104 MySqlStore::with_mysql_pool(pool, table_name, max_txn_ops)
105 .await
106 .context(error::KvBackendSnafu)
107}
108
109#[allow(clippy::too_many_arguments)]
120pub async fn build_mysql_election(
121 store_addrs: &[String],
122 tls_config: Option<&TlsOption>,
123 leader_value: String,
124 store_key_prefix: String,
125 candidate_lease_ttl: std::time::Duration,
126 meta_lease_ttl: std::time::Duration,
127 election_table_name: &str,
128 innodb_lock_wait_timeout: std::time::Duration,
129) -> Result<ElectionRef> {
130 let pool = create_mysql_pool(store_addrs, tls_config).await?;
131 let election_client = ElectionMysqlClient::new(
132 pool,
133 meta_lease_ttl,
134 meta_lease_ttl,
135 innodb_lock_wait_timeout,
136 meta_lease_ttl,
137 election_table_name,
138 );
139 MySqlElection::with_mysql_client(
140 leader_value,
141 election_client,
142 store_key_prefix,
143 candidate_lease_ttl,
144 meta_lease_ttl,
145 election_table_name,
146 )
147 .await
148 .context(error::KvBackendSnafu)
149}