Skip to main content

meta_srv/utils/
mysql.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_meta::election::ElectionRef;
16use common_meta::election::rds::mysql::{ElectionMysqlClient, MySqlElection};
17use common_meta::kv_backend::KvBackendRef;
18use common_meta::kv_backend::rds::MySqlStore;
19use common_telemetry::info;
20use servers::tls::{TlsMode, TlsOption};
21use snafu::{OptionExt, ResultExt};
22use sqlx::mysql::{MySqlConnectOptions, MySqlPool, MySqlSslMode};
23
24use crate::error::{self, Result};
25
26async fn setup_mysql_options(
27    store_addrs: &[String],
28    tls_config: Option<&TlsOption>,
29) -> Result<MySqlConnectOptions> {
30    let mysql_url = store_addrs.first().context(error::InvalidArgumentsSnafu {
31        err_msg: "empty store addrs",
32    })?;
33    // Avoid `SET` commands in sqlx
34    let opts: MySqlConnectOptions = mysql_url
35        .parse()
36        .context(error::ParseMySqlUrlSnafu { mysql_url })?;
37    let mut opts = opts
38        .no_engine_substitution(false)
39        .pipes_as_concat(false)
40        .timezone(None)
41        .set_names(false);
42
43    let Some(tls_config) = tls_config else {
44        return Ok(opts);
45    };
46
47    match tls_config.mode {
48        TlsMode::Disable => return Ok(opts),
49        TlsMode::Prefer => {
50            opts = opts.ssl_mode(MySqlSslMode::Preferred);
51        }
52        TlsMode::Require => {
53            opts = opts.ssl_mode(MySqlSslMode::Required);
54        }
55        TlsMode::VerifyCa => {
56            opts = opts.ssl_mode(MySqlSslMode::VerifyCa);
57            opts = opts.ssl_ca(&tls_config.ca_cert_path);
58        }
59        TlsMode::VerifyFull => {
60            opts = opts.ssl_mode(MySqlSslMode::VerifyIdentity);
61            opts = opts.ssl_ca(&tls_config.ca_cert_path);
62        }
63    }
64    info!(
65        "Setting up MySQL options with TLS mode: {:?}",
66        tls_config.mode
67    );
68
69    if !tls_config.cert_path.is_empty() && !tls_config.key_path.is_empty() {
70        info!("Loading client certificate for mutual TLS");
71        opts = opts.ssl_client_cert(&tls_config.cert_path);
72        opts = opts.ssl_client_key(&tls_config.key_path);
73    }
74
75    Ok(opts)
76}
77
78/// Creates a MySQL pool.
79pub async fn create_mysql_pool(
80    store_addrs: &[String],
81    tls_config: Option<&TlsOption>,
82) -> Result<MySqlPool> {
83    let opts = setup_mysql_options(store_addrs, tls_config).await?;
84    let pool = MySqlPool::connect_with(opts)
85        .await
86        .context(error::CreateMySqlPoolSnafu)?;
87
88    Ok(pool)
89}
90
91/// Builds a MySQL-backed metadata [`KvBackendRef`].
92///
93/// * `store_addrs` - MySQL connection URLs; only the first address is used.
94/// * `tls_config` - optional TLS settings for the MySQL connection.
95/// * `table_name` - metadata KV table name.
96/// * `max_txn_ops` - maximum operations allowed in one metadata transaction.
97pub async fn build_mysql_kv_backend(
98    store_addrs: &[String],
99    tls_config: Option<&TlsOption>,
100    table_name: &str,
101    max_txn_ops: usize,
102) -> Result<KvBackendRef> {
103    let pool = create_mysql_pool(store_addrs, tls_config).await?;
104    MySqlStore::with_mysql_pool(pool, table_name, max_txn_ops)
105        .await
106        .context(error::KvBackendSnafu)
107}
108
109/// Builds a MySQL-backed election implementation.
110///
111/// * `store_addrs` - MySQL connection URLs; only the first address is used.
112/// * `tls_config` - optional TLS settings for the MySQL connection.
113/// * `leader_value` - advertised address of this election candidate.
114/// * `store_key_prefix` - prefix for election and candidate keys.
115/// * `candidate_lease_ttl` - TTL for registered candidate metadata.
116/// * `meta_lease_ttl` - TTL for the elected leader metadata.
117/// * `election_table_name` - dedicated table used for election locking and records.
118/// * `innodb_lock_wait_timeout` - session lock wait timeout for election transactions.
119#[allow(clippy::too_many_arguments)]
120pub async fn build_mysql_election(
121    store_addrs: &[String],
122    tls_config: Option<&TlsOption>,
123    leader_value: String,
124    store_key_prefix: String,
125    candidate_lease_ttl: std::time::Duration,
126    meta_lease_ttl: std::time::Duration,
127    election_table_name: &str,
128    innodb_lock_wait_timeout: std::time::Duration,
129) -> Result<ElectionRef> {
130    let pool = create_mysql_pool(store_addrs, tls_config).await?;
131    let election_client = ElectionMysqlClient::new(
132        pool,
133        meta_lease_ttl,
134        meta_lease_ttl,
135        innodb_lock_wait_timeout,
136        meta_lease_ttl,
137        election_table_name,
138    );
139    MySqlElection::with_mysql_client(
140        leader_value,
141        election_client,
142        store_key_prefix,
143        candidate_lease_ttl,
144        meta_lease_ttl,
145        election_table_name,
146    )
147    .await
148    .context(error::KvBackendSnafu)
149}