Skip to main content

common_meta/election/rds/
mysql.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::sync::atomic::{AtomicBool, Ordering};
17use std::time::Duration;
18
19use common_telemetry::{error, info, warn};
20use common_time::Timestamp;
21use snafu::{OptionExt, ResultExt, ensure};
22use sqlx::mysql::{MySqlArguments, MySqlRow};
23use sqlx::pool::PoolConnection;
24use sqlx::query::Query;
25use sqlx::{MySql, MySqlPool, MySqlTransaction, Row};
26use tokio::sync::{Mutex, MutexGuard, broadcast};
27use tokio::time::MissedTickBehavior;
28
29use crate::election::rds::{LEASE_SEP, Lease, RdsLeaderKey, parse_value_and_expire_time};
30use crate::election::{
31    Election, ElectionRef, LeaderChangeMessage, LeaderValue, MetasrvNodeInfo, listen_leader_change,
32    send_leader_change_and_set_flags,
33};
34use crate::error::{
35    AcquireMySqlClientSnafu, DecodeSqlValueSnafu, DeserializeFromJsonSnafu,
36    ElectionLeaderLeaseChangedSnafu, ElectionLeaderLeaseExpiredSnafu, ElectionNoLeaderSnafu,
37    MySqlExecutionSnafu, Result, SerializeToJsonSnafu, SqlExecutionTimeoutSnafu, UnexpectedSnafu,
38};
39use crate::key::{CANDIDATES_ROOT, ELECTION_KEY};
40
41struct ElectionSqlFactory<'a> {
42    table_name: &'a str,
43}
44
45struct ElectionSqlSet {
46    campaign: String,
47    // SQL to put a value with expire time.
48    //
49    // Parameters for the query:
50    // `$1`: key,
51    // `$2`: value,
52    // `$3`: lease time in seconds
53    //
54    // Returns:
55    // If the key already exists, return the previous value.
56    put_value_with_lease: String,
57    // SQL to update a value with expire time.
58    //
59    // Parameters for the query:
60    // `$1`: updated value,
61    // `$2`: lease time in seconds
62    // `$3`: key,
63    // `$4`: previous value,
64    update_value_with_lease: String,
65    // SQL to get a value with expire time.
66    //
67    // Parameters:
68    // `$1`: key
69    get_value_with_lease: String,
70    // SQL to get all values with expire time with the given key prefix.
71    //
72    // Parameters:
73    // `$1`: key prefix like 'prefix%'
74    //
75    // Returns:
76    // column 0: value,
77    // column 1: current timestamp
78    get_value_with_lease_by_prefix: String,
79    // SQL to delete a value.
80    //
81    // Parameters:
82    // `?`: key
83    //
84    // Returns:
85    // Rows affected
86    delete_value: String,
87}
88
89impl<'a> ElectionSqlFactory<'a> {
90    fn new(table_name: &'a str) -> Self {
91        Self { table_name }
92    }
93
94    fn build(self) -> ElectionSqlSet {
95        ElectionSqlSet {
96            campaign: self.campaign_sql(),
97            put_value_with_lease: self.put_value_with_lease_sql(),
98            update_value_with_lease: self.update_value_with_lease_sql(),
99            get_value_with_lease: self.get_value_with_lease_sql(),
100            get_value_with_lease_by_prefix: self.get_value_with_lease_by_prefix_sql(),
101            delete_value: self.delete_value_sql(),
102        }
103    }
104
105    /// Use `SELECT FOR UPDATE` to lock for compatibility with other MySQL-compatible databases
106    /// instead of directly using `GET_LOCK`.
107    fn campaign_sql(&self) -> String {
108        format!("SELECT * FROM `{}` FOR UPDATE;", self.table_name)
109    }
110
111    fn put_value_with_lease_sql(&self) -> String {
112        format!(
113            r#"
114            INSERT INTO `{}` (k, v) VALUES (
115                ?,
116                CONCAT(
117                    ?,
118                    '{}',
119                    DATE_FORMAT(DATE_ADD(NOW(4), INTERVAL ? SECOND), '%Y-%m-%d %T.%f')
120                )
121            )
122            ON DUPLICATE KEY UPDATE v = VALUES(v);
123            "#,
124            self.table_name, LEASE_SEP
125        )
126    }
127
128    fn update_value_with_lease_sql(&self) -> String {
129        format!(
130            r#"UPDATE `{}`
131               SET v = CONCAT(?, '{}', DATE_FORMAT(DATE_ADD(NOW(4), INTERVAL ? SECOND), '%Y-%m-%d %T.%f'))
132               WHERE k = ? AND v = ?"#,
133            self.table_name, LEASE_SEP
134        )
135    }
136
137    fn get_value_with_lease_sql(&self) -> String {
138        format!(
139            r#"SELECT v, DATE_FORMAT(NOW(4), '%Y-%m-%d %T.%f') FROM `{}` WHERE k = ?"#,
140            self.table_name
141        )
142    }
143
144    fn get_value_with_lease_by_prefix_sql(&self) -> String {
145        format!(
146            r#"SELECT v, DATE_FORMAT(NOW(4), '%Y-%m-%d %T.%f') FROM `{}` WHERE k LIKE ?"#,
147            self.table_name
148        )
149    }
150
151    fn delete_value_sql(&self) -> String {
152        format!("DELETE FROM `{}` WHERE k = ?;", self.table_name)
153    }
154}
155
156enum Executor<'a> {
157    Default(MutexGuard<'a, ElectionMysqlClient>),
158    Txn(TransactionWithExecutionTimeout<'a>),
159}
160
161impl Executor<'_> {
162    async fn query(
163        &mut self,
164        query: Query<'_, MySql, MySqlArguments>,
165        sql: &str,
166    ) -> Result<Vec<MySqlRow>> {
167        match self {
168            Executor::Default(client) => client.query(query, sql).await,
169            Executor::Txn(txn) => txn.query(query, sql).await,
170        }
171    }
172
173    async fn execute(&mut self, query: Query<'_, MySql, MySqlArguments>, sql: &str) -> Result<u64> {
174        match self {
175            Executor::Default(client) => client.execute(query, sql).await,
176            Executor::Txn(txn) => txn.execute(query, sql).await,
177        }
178    }
179
180    async fn commit(self) -> Result<()> {
181        match self {
182            Executor::Txn(txn) => txn.commit().await,
183            _ => Ok(()),
184        }
185    }
186}
187
188/// MySQL client for election.
189pub struct ElectionMysqlClient {
190    current: Option<PoolConnection<MySql>>,
191    pool: MySqlPool,
192
193    /// The client-side timeout for statement execution.
194    ///
195    /// This timeout is enforced by the client application and is independent of any server-side timeouts.
196    /// If a statement takes longer than this duration to execute, the client will abort the operation.
197    execution_timeout: Duration,
198
199    /// The maximum execution time for the statement.
200    ///
201    /// This timeout is enforced by the server and is independent of any client-side timeouts.
202    /// If a statement takes longer than this duration to execute, the server will abort the operation.
203    max_execution_time: Duration,
204
205    /// The lock wait timeout for the session.
206    ///
207    /// This timeout determines how long the server waits for a lock to be acquired before timing out.
208    /// If a lock cannot be acquired within this duration, the server will abort the operation.
209    innode_lock_wait_timeout: Duration,
210
211    /// The wait timeout for the session.
212    ///
213    /// This timeout determines how long the server waits for activity on a noninteractive connection
214    /// before closing it. If a connection is idle for longer than this duration, the server will
215    /// terminate it.
216    wait_timeout: Duration,
217
218    /// The table name for election.
219    table_for_election: String,
220}
221
222impl ElectionMysqlClient {
223    pub fn new(
224        pool: MySqlPool,
225        execution_timeout: Duration,
226        max_execution_time: Duration,
227        innode_lock_wait_timeout: Duration,
228        wait_timeout: Duration,
229        table_for_election: &str,
230    ) -> Self {
231        Self {
232            current: None,
233            pool,
234            execution_timeout,
235            max_execution_time,
236            innode_lock_wait_timeout,
237            wait_timeout,
238            table_for_election: table_for_election.to_string(),
239        }
240    }
241
242    fn create_table_sql(&self) -> String {
243        format!(
244            r#"
245            CREATE TABLE IF NOT EXISTS `{}` (
246                k VARBINARY(3072) PRIMARY KEY,
247                v BLOB
248            );"#,
249            self.table_for_election
250        )
251    }
252
253    fn insert_once_sql(&self) -> String {
254        format!(
255            "INSERT IGNORE INTO `{}` (k, v) VALUES ('__place_holder_for_lock', '');",
256            self.table_for_election
257        )
258    }
259
260    fn check_version_sql(&self) -> String {
261        "SELECT @@version;".to_string()
262    }
263
264    async fn reset_client(&mut self) -> Result<()> {
265        self.current = None;
266        self.maybe_init_client().await
267    }
268
269    async fn ensure_table_exists(&mut self) -> Result<()> {
270        let create_table_sql = self.create_table_sql();
271        let query = sqlx::query(&create_table_sql);
272        self.execute(query, &create_table_sql).await?;
273        // Insert at least one row for `SELECT * FOR UPDATE` to work.
274        let insert_once_sql = self.insert_once_sql();
275        let query = sqlx::query(&insert_once_sql);
276        self.execute(query, &insert_once_sql).await?;
277        Ok(())
278    }
279
280    async fn maybe_init_client(&mut self) -> Result<()> {
281        if self.current.is_none() {
282            let client = self.pool.acquire().await.context(AcquireMySqlClientSnafu)?;
283
284            self.current = Some(client);
285            let (query, sql) = if !self.wait_timeout.is_zero() {
286                let sql = "SET SESSION wait_timeout = ?, innodb_lock_wait_timeout = ?, max_execution_time = ?;";
287                (
288                    sqlx::query(sql)
289                        .bind(self.wait_timeout.as_secs())
290                        .bind(self.innode_lock_wait_timeout.as_secs())
291                        .bind(self.max_execution_time.as_millis() as u64),
292                    sql,
293                )
294            } else {
295                let sql = "SET SESSION innodb_lock_wait_timeout = ?, max_execution_time = ?;";
296                (
297                    sqlx::query(sql)
298                        .bind(self.innode_lock_wait_timeout.as_secs())
299                        .bind(self.max_execution_time.as_millis() as u64),
300                    sql,
301                )
302            };
303            self.set_session_isolation_level().await?;
304            self.execute(query, sql).await?;
305            self.check_version(&self.check_version_sql()).await?;
306        }
307        Ok(())
308    }
309
310    /// Set session isolation level to serializable.
311    ///
312    /// # Panics
313    /// if `current` is `None`.
314    async fn set_session_isolation_level(&mut self) -> Result<()> {
315        let sql = "SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE";
316        let query = sqlx::query(sql);
317        self.execute(query, sql).await?;
318        Ok(())
319    }
320
321    /// Check if the MySQL version is supported.
322    ///
323    /// # Panics
324    /// if `current` is `None`.
325    async fn check_version(&mut self, sql: &str) -> Result<()> {
326        // Check if the MySQL version is supported.
327        let query = sqlx::query(sql);
328        // Safety: `maybe_init_client` ensures `current` is not `None`.
329        let client = self.current.as_mut().unwrap();
330        let row = tokio::time::timeout(self.execution_timeout, query.fetch_one(&mut **client))
331            .await
332            .map_err(|_| {
333                SqlExecutionTimeoutSnafu {
334                    sql,
335                    duration: self.execution_timeout,
336                }
337                .build()
338            })?;
339        match row {
340            Ok(row) => {
341                let version: String = row.try_get(0).context(DecodeSqlValueSnafu {})?;
342                if !version.starts_with("8.0") && !version.starts_with("5.7") {
343                    warn!(
344                        "Unsupported MySQL version: {}, expected: [5.7, 8.0]",
345                        version
346                    );
347                }
348            }
349            Err(e) => {
350                warn!(e; "Failed to check MySQL version through sql: {}", sql);
351            }
352        }
353        Ok(())
354    }
355
356    /// Returns the result of the query.
357    ///
358    /// # Panics
359    /// if `current` is `None`.
360    async fn execute(&mut self, query: Query<'_, MySql, MySqlArguments>, sql: &str) -> Result<u64> {
361        // Safety: `maybe_init_client` ensures `current` is not `None`.
362        let client = self.current.as_mut().unwrap();
363        let future = query.execute(&mut **client);
364        let rows_affected = tokio::time::timeout(self.execution_timeout, future)
365            .await
366            .map_err(|_| {
367                SqlExecutionTimeoutSnafu {
368                    sql,
369                    duration: self.execution_timeout,
370                }
371                .build()
372            })?
373            .context(MySqlExecutionSnafu { sql })?
374            .rows_affected();
375        Ok(rows_affected)
376    }
377
378    /// Returns the result of the query.
379    ///
380    /// # Panics
381    /// if `current` is `None`.
382    async fn query(
383        &mut self,
384        query: Query<'_, MySql, MySqlArguments>,
385        sql: &str,
386    ) -> Result<Vec<MySqlRow>> {
387        // Safety: `maybe_init_client` ensures `current` is not `None`.
388        let client = self.current.as_mut().unwrap();
389        let future = query.fetch_all(&mut **client);
390        tokio::time::timeout(self.execution_timeout, future)
391            .await
392            .map_err(|_| {
393                SqlExecutionTimeoutSnafu {
394                    sql,
395                    duration: self.execution_timeout,
396                }
397                .build()
398            })?
399            .context(MySqlExecutionSnafu { sql })
400    }
401
402    async fn transaction(&mut self) -> Result<TransactionWithExecutionTimeout<'_>> {
403        use sqlx::Acquire;
404        let client = self.current.as_mut().unwrap();
405        let transaction = client
406            .begin()
407            .await
408            .context(MySqlExecutionSnafu { sql: "BEGIN" })?;
409
410        Ok(TransactionWithExecutionTimeout {
411            transaction,
412            execution_timeout: self.execution_timeout,
413        })
414    }
415}
416
417struct TransactionWithExecutionTimeout<'a> {
418    transaction: MySqlTransaction<'a>,
419    execution_timeout: Duration,
420}
421
422impl TransactionWithExecutionTimeout<'_> {
423    async fn query(
424        &mut self,
425        query: Query<'_, MySql, MySqlArguments>,
426        sql: &str,
427    ) -> Result<Vec<MySqlRow>> {
428        let res = tokio::time::timeout(
429            self.execution_timeout,
430            query.fetch_all(&mut *self.transaction),
431        )
432        .await
433        .map_err(|_| {
434            SqlExecutionTimeoutSnafu {
435                sql,
436                duration: self.execution_timeout,
437            }
438            .build()
439        })?
440        .context(MySqlExecutionSnafu { sql })?;
441        Ok(res)
442    }
443
444    async fn execute(&mut self, query: Query<'_, MySql, MySqlArguments>, sql: &str) -> Result<u64> {
445        let res = tokio::time::timeout(
446            self.execution_timeout,
447            query.execute(&mut *self.transaction),
448        )
449        .await
450        .map_err(|_| {
451            SqlExecutionTimeoutSnafu {
452                sql,
453                duration: self.execution_timeout,
454            }
455            .build()
456        })?
457        .context(MySqlExecutionSnafu { sql })?;
458        Ok(res.rows_affected())
459    }
460
461    async fn commit(self) -> Result<()> {
462        tokio::time::timeout(self.execution_timeout, self.transaction.commit())
463            .await
464            .map_err(|_| {
465                SqlExecutionTimeoutSnafu {
466                    sql: "COMMIT",
467                    duration: self.execution_timeout,
468                }
469                .build()
470            })?
471            .context(MySqlExecutionSnafu { sql: "COMMIT" })?;
472        Ok(())
473    }
474}
475
476/// MySQL implementation of Election.
477pub struct MySqlElection {
478    leader_value: String,
479    client: Mutex<ElectionMysqlClient>,
480    is_leader: AtomicBool,
481    leader_infancy: AtomicBool,
482    leader_watcher: broadcast::Sender<LeaderChangeMessage>,
483    store_key_prefix: String,
484    candidate_lease_ttl: Duration,
485    meta_lease_ttl: Duration,
486    sql_set: ElectionSqlSet,
487}
488
489impl MySqlElection {
490    pub async fn with_mysql_client(
491        leader_value: String,
492        mut client: ElectionMysqlClient,
493        store_key_prefix: String,
494        candidate_lease_ttl: Duration,
495        meta_lease_ttl: Duration,
496        table_name: &str,
497    ) -> Result<ElectionRef> {
498        let sql_factory = ElectionSqlFactory::new(table_name);
499        client.maybe_init_client().await?;
500        client.ensure_table_exists().await?;
501        let tx = listen_leader_change(leader_value.clone());
502        Ok(Arc::new(Self {
503            leader_value,
504            client: Mutex::new(client),
505            is_leader: AtomicBool::new(false),
506            leader_infancy: AtomicBool::new(false),
507            leader_watcher: tx,
508            store_key_prefix,
509            candidate_lease_ttl,
510            meta_lease_ttl,
511            sql_set: sql_factory.build(),
512        }))
513    }
514
515    fn election_key(&self) -> String {
516        format!("{}{}", self.store_key_prefix, ELECTION_KEY)
517    }
518
519    fn candidate_root(&self) -> String {
520        format!("{}{}", self.store_key_prefix, CANDIDATES_ROOT)
521    }
522
523    fn candidate_key(&self) -> String {
524        format!("{}{}", self.candidate_root(), self.leader_value)
525    }
526
527    async fn maybe_init_client(&self) -> Result<()> {
528        let mut client = self.client.lock().await;
529        client.maybe_init_client().await?;
530        Ok(())
531    }
532}
533
534#[async_trait::async_trait]
535impl Election for MySqlElection {
536    type Leader = LeaderValue;
537
538    fn is_leader(&self) -> bool {
539        self.is_leader.load(Ordering::Relaxed)
540    }
541
542    fn in_leader_infancy(&self) -> bool {
543        self.leader_infancy
544            .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
545            .is_ok()
546    }
547
548    async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> {
549        self.maybe_init_client().await?;
550        let key = self.candidate_key();
551        let node_info =
552            serde_json::to_string(node_info).with_context(|_| SerializeToJsonSnafu {
553                input: format!("{node_info:?}"),
554            })?;
555
556        {
557            let client = self.client.lock().await;
558            let mut executor = Executor::Default(client);
559            let res = self
560                .put_value_with_lease(
561                    &key,
562                    &node_info,
563                    self.candidate_lease_ttl.as_secs(),
564                    &mut executor,
565                )
566                .await?;
567            // May registered before, just update the lease.
568            if !res {
569                warn!("Candidate already registered, update the lease");
570                self.delete_value(&key, &mut executor).await?;
571                self.put_value_with_lease(
572                    &key,
573                    &node_info,
574                    self.candidate_lease_ttl.as_secs(),
575                    &mut executor,
576                )
577                .await?;
578            }
579        }
580
581        // Check if the current lease has expired and renew the lease.
582        let mut keep_alive_interval = tokio::time::interval(self.candidate_lease_ttl / 2);
583        loop {
584            let _ = keep_alive_interval.tick().await;
585            let client = self.client.lock().await;
586            let mut executor = Executor::Default(client);
587            let lease = self
588                .get_value_with_lease(&key, &mut executor)
589                .await?
590                .unwrap_or_default();
591
592            ensure!(
593                lease.expire_time > lease.current,
594                UnexpectedSnafu {
595                    err_msg: format!(
596                        "Candidate lease expired at {:?} (current time: {:?}), key: {:?}",
597                        lease.expire_time,
598                        lease.current,
599                        String::from_utf8_lossy(&key.into_bytes())
600                    ),
601                }
602            );
603
604            self.update_value_with_lease(
605                &key,
606                &lease.origin,
607                &node_info,
608                self.candidate_lease_ttl.as_secs(),
609                &mut executor,
610            )
611            .await?;
612            std::mem::drop(executor);
613        }
614    }
615
616    async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>> {
617        self.maybe_init_client().await?;
618        let key_prefix = self.candidate_root();
619        let client = self.client.lock().await;
620        let mut executor = Executor::Default(client);
621        let (mut candidates, current) = self
622            .get_value_with_lease_by_prefix(&key_prefix, &mut executor)
623            .await?;
624        // Remove expired candidates
625        candidates.retain(|c| c.1 > current);
626        let mut valid_candidates = Vec::with_capacity(candidates.len());
627        for (c, _) in candidates {
628            let node_info: MetasrvNodeInfo =
629                serde_json::from_str(&c).with_context(|_| DeserializeFromJsonSnafu {
630                    input: format!("{:?}", c),
631                })?;
632            valid_candidates.push(node_info);
633        }
634        Ok(valid_candidates)
635    }
636
637    async fn campaign(&self) -> Result<()> {
638        self.maybe_init_client().await?;
639        let mut keep_alive_interval = tokio::time::interval(self.meta_lease_ttl / 2);
640        keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
641        loop {
642            self.do_campaign().await?;
643            keep_alive_interval.tick().await;
644        }
645    }
646
647    async fn reset_campaign(&self) {
648        info!("Resetting campaign");
649        if self.is_leader.load(Ordering::Relaxed) {
650            if let Err(err) = self.step_down_without_lock().await {
651                error!(err; "Failed to step down without lock");
652            }
653            info!("Step down without lock successfully, due to reset campaign");
654        }
655        if let Err(err) = self.client.lock().await.reset_client().await {
656            error!(err; "Failed to reset client");
657        }
658    }
659
660    async fn leader(&self) -> Result<Self::Leader> {
661        self.maybe_init_client().await?;
662        if self.is_leader.load(Ordering::Relaxed) {
663            Ok(self.leader_value.as_bytes().into())
664        } else {
665            let key = self.election_key();
666
667            let client = self.client.lock().await;
668            let mut executor = Executor::Default(client);
669            if let Some(lease) = self.get_value_with_lease(&key, &mut executor).await? {
670                ensure!(lease.expire_time > lease.current, ElectionNoLeaderSnafu);
671                Ok(lease.leader_value.as_bytes().into())
672            } else {
673                ElectionNoLeaderSnafu.fail()
674            }
675        }
676    }
677
678    async fn resign(&self) -> Result<()> {
679        todo!()
680    }
681
682    fn subscribe_leader_change(&self) -> broadcast::Receiver<LeaderChangeMessage> {
683        self.leader_watcher.subscribe()
684    }
685}
686
687impl MySqlElection {
688    /// Returns value, expire time and current time.
689    async fn get_value_with_lease(
690        &self,
691        key: &str,
692        executor: &mut Executor<'_>,
693    ) -> Result<Option<Lease>> {
694        let key = key.as_bytes();
695        let query = sqlx::query(&self.sql_set.get_value_with_lease).bind(key);
696        let res = executor
697            .query(query, &self.sql_set.get_value_with_lease)
698            .await?;
699
700        if res.is_empty() {
701            return Ok(None);
702        }
703        // Safety: Checked if res is empty above.
704        let current_time_str = String::from_utf8_lossy(res[0].try_get(1).unwrap());
705        let current_time = match Timestamp::from_str(&current_time_str, None) {
706            Ok(ts) => ts,
707            Err(_) => UnexpectedSnafu {
708                err_msg: format!("Invalid timestamp: {}", current_time_str),
709            }
710            .fail()?,
711        };
712        // Safety: Checked if res is empty above.
713        let value_and_expire_time = String::from_utf8_lossy(res[0].try_get(0).unwrap_or_default());
714        let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
715
716        Ok(Some(Lease {
717            leader_value: value,
718            expire_time,
719            current: current_time,
720            origin: value_and_expire_time.to_string(),
721        }))
722    }
723
724    /// Returns all values and expire time with the given key prefix. Also returns the current time.
725    async fn get_value_with_lease_by_prefix(
726        &self,
727        key_prefix: &str,
728        executor: &mut Executor<'_>,
729    ) -> Result<(Vec<(String, Timestamp)>, Timestamp)> {
730        let key_prefix = format!("{}%", key_prefix).as_bytes().to_vec();
731        let query = sqlx::query(&self.sql_set.get_value_with_lease_by_prefix).bind(key_prefix);
732        let res = executor
733            .query(query, &self.sql_set.get_value_with_lease_by_prefix)
734            .await?;
735
736        let mut values_with_leases = vec![];
737        let mut current = Timestamp::default();
738        for row in res {
739            let current_time_str = row.try_get(1).unwrap_or_default();
740            current = match Timestamp::from_str(current_time_str, None) {
741                Ok(ts) => ts,
742                Err(_) => UnexpectedSnafu {
743                    err_msg: format!("Invalid timestamp: {}", current_time_str),
744                }
745                .fail()?,
746            };
747
748            let value_and_expire_time = String::from_utf8_lossy(row.try_get(0).unwrap_or_default());
749            let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
750
751            values_with_leases.push((value, expire_time));
752        }
753        Ok((values_with_leases, current))
754    }
755
756    async fn update_value_with_lease(
757        &self,
758        key: &str,
759        prev: &str,
760        updated: &str,
761        lease_ttl: u64,
762        executor: &mut Executor<'_>,
763    ) -> Result<()> {
764        let key = key.as_bytes();
765        let prev = prev.as_bytes();
766        let updated = updated.as_bytes();
767
768        let query = sqlx::query(&self.sql_set.update_value_with_lease)
769            .bind(updated)
770            .bind(lease_ttl as f64)
771            .bind(key)
772            .bind(prev);
773        let res = executor
774            .execute(query, &self.sql_set.update_value_with_lease)
775            .await?;
776
777        ensure!(
778            res == 1,
779            UnexpectedSnafu {
780                err_msg: format!("Failed to update key: {}", String::from_utf8_lossy(key)),
781            }
782        );
783
784        Ok(())
785    }
786
787    /// Returns `true` if the insertion is successful
788    async fn put_value_with_lease(
789        &self,
790        key: &str,
791        value: &str,
792        lease_ttl_secs: u64,
793        executor: &mut Executor<'_>,
794    ) -> Result<bool> {
795        let key = key.as_bytes();
796        let lease_ttl_secs = lease_ttl_secs as f64;
797        let query = sqlx::query(&self.sql_set.put_value_with_lease)
798            .bind(key)
799            .bind(value)
800            .bind(lease_ttl_secs);
801        let res = executor
802            .execute(query, &self.sql_set.put_value_with_lease)
803            .await?;
804        Ok(res == 1)
805    }
806
807    /// Returns `true` if the deletion is successful.
808    /// Caution: Should only delete the key if the lease is expired.
809    async fn delete_value(&self, key: &str, executor: &mut Executor<'_>) -> Result<bool> {
810        let key = key.as_bytes();
811        let query = sqlx::query(&self.sql_set.delete_value).bind(key);
812        let res = executor.execute(query, &self.sql_set.delete_value).await?;
813
814        Ok(res == 1)
815    }
816
817    /// Attempts to acquire leadership by executing a campaign. This function continuously checks
818    /// if the current lease is still valid.
819    async fn do_campaign(&self) -> Result<()> {
820        let lease = {
821            let client = self.client.lock().await;
822            let mut executor = Executor::Default(client);
823            self.get_value_with_lease(&self.election_key(), &mut executor)
824                .await?
825        };
826
827        let is_leader = self.is_leader();
828        // If current leader value is the same as the leader value in the remote lease,
829        // it means the current leader is still valid.
830        let is_current_leader = lease
831            .as_ref()
832            .map(|lease| lease.leader_value == self.leader_value)
833            .unwrap_or(false);
834        match (self.lease_check(&lease), is_leader, is_current_leader) {
835            // If the leader lease is valid and I'm the leader, renew the lease.
836            (Ok(_), true, true) => {
837                let mut client = self.client.lock().await;
838                let txn = client.transaction().await?;
839                let mut executor = Executor::Txn(txn);
840                let query = sqlx::query(&self.sql_set.campaign);
841                executor.query(query, &self.sql_set.campaign).await?;
842                // Safety: Checked if lease is not None above.
843                self.renew_lease(executor, lease.unwrap()).await?;
844            }
845            // If the leader lease expires and I'm the leader, notify the leader watcher and step down.
846            // Another instance should be elected as the leader in this case.
847            (Err(err), true, _) => {
848                warn!(err; "Leader lease expired, step down...");
849                self.step_down_without_lock().await?;
850            }
851            (Ok(_), true, false) => {
852                warn!("Leader lease expired, step down...");
853                self.step_down_without_lock().await?;
854            }
855            // If the leader lease expires and I'm not the leader, elect myself.
856            (Err(err), false, _) => {
857                warn!(err; "Leader lease expired, elect myself.");
858                let mut client = self.client.lock().await;
859                let txn = client.transaction().await?;
860                let mut executor = Executor::Txn(txn);
861                let query = sqlx::query(&self.sql_set.campaign);
862                executor.query(query, &self.sql_set.campaign).await?;
863                self.elected(executor, lease).await?;
864            }
865            // If the leader lease is valid and I'm the leader, but I don't think I'm the leader.
866            // Just re-elect myself.
867            (Ok(_), false, true) => {
868                warn!("I should be the leader, but I don't think so. Something went wrong.");
869                let mut client = self.client.lock().await;
870                let txn = client.transaction().await?;
871                let mut executor = Executor::Txn(txn);
872                let query = sqlx::query(&self.sql_set.campaign);
873                executor.query(query, &self.sql_set.campaign).await?;
874                // Safety: Checked if lease is not None above.
875                self.renew_lease(executor, lease.unwrap()).await?;
876            }
877            // If the leader lease is valid and I'm not the leader, do nothing.
878            (Ok(_), false, false) => {}
879        }
880        Ok(())
881    }
882
883    /// Renew the lease
884    async fn renew_lease(&self, mut executor: Executor<'_>, lease: Lease) -> Result<()> {
885        let key = self.election_key();
886        self.update_value_with_lease(
887            &key,
888            &lease.origin,
889            &self.leader_value,
890            self.meta_lease_ttl.as_secs(),
891            &mut executor,
892        )
893        .await?;
894        executor.commit().await?;
895
896        if !self.is_leader() {
897            let key = self.election_key();
898            let leader_key = RdsLeaderKey {
899                name: self.leader_value.clone().into_bytes(),
900                key: key.clone().into_bytes(),
901                ..Default::default()
902            };
903            send_leader_change_and_set_flags(
904                &self.is_leader,
905                &self.leader_infancy,
906                &self.leader_watcher,
907                LeaderChangeMessage::Elected(Arc::new(leader_key)),
908            );
909        }
910
911        Ok(())
912    }
913
914    /// Performs a lease check during the election process.
915    ///
916    /// This function performs the following checks and actions:
917    ///
918    /// - **Case 1**: If the current instance is not the leader but the lease has expired, it raises an error
919    ///   to re-initiate the campaign. If the leader failed to renew the lease, its session will expire and the lock
920    ///   will be released.
921    /// - **Case 2**: If all checks pass, the function returns without performing any actions.
922    fn lease_check(&self, lease: &Option<Lease>) -> Result<Lease> {
923        let lease = lease.as_ref().context(ElectionNoLeaderSnafu)?;
924        // Case 1: Lease expired
925        ensure!(
926            lease.expire_time > lease.current,
927            ElectionLeaderLeaseExpiredSnafu
928        );
929        // Case 2: Everything is fine
930        Ok(lease.clone())
931    }
932
933    /// Still consider itself as the leader locally but failed to acquire the lock. Step down without deleting the key.
934    async fn step_down_without_lock(&self) -> Result<()> {
935        let key = self.election_key().into_bytes();
936        let leader_key = RdsLeaderKey {
937            name: self.leader_value.clone().into_bytes(),
938            key: key.clone(),
939            ..Default::default()
940        };
941        send_leader_change_and_set_flags(
942            &self.is_leader,
943            &self.leader_infancy,
944            &self.leader_watcher,
945            LeaderChangeMessage::StepDown(Arc::new(leader_key)),
946        );
947        Ok(())
948    }
949
950    /// Elected as leader. The leader should put the key and notify the leader watcher.
951    /// Caution: Should only elected while holding the lock.
952    async fn elected(
953        &self,
954        mut executor: Executor<'_>,
955        expected_lease: Option<Lease>,
956    ) -> Result<()> {
957        let key = self.election_key();
958        let leader_key = RdsLeaderKey {
959            name: self.leader_value.clone().into_bytes(),
960            key: key.clone().into_bytes(),
961            ..Default::default()
962        };
963        let remote_lease = self.get_value_with_lease(&key, &mut executor).await?;
964        ensure!(
965            expected_lease.map(|lease| lease.origin) == remote_lease.map(|lease| lease.origin),
966            ElectionLeaderLeaseChangedSnafu
967        );
968        self.delete_value(&key, &mut executor).await?;
969        self.put_value_with_lease(
970            &key,
971            &self.leader_value,
972            self.meta_lease_ttl.as_secs(),
973            &mut executor,
974        )
975        .await?;
976        executor.commit().await?;
977
978        send_leader_change_and_set_flags(
979            &self.is_leader,
980            &self.leader_infancy,
981            &self.leader_watcher,
982            LeaderChangeMessage::Elected(Arc::new(leader_key)),
983        );
984        Ok(())
985    }
986}
987
988#[cfg(test)]
989mod tests {
990    use std::{assert_matches, env};
991
992    use common_telemetry::init_default_ut_logging;
993    use sqlx::MySqlPool;
994
995    use super::*;
996    use crate::{error, maybe_skip_mysql_integration_test};
997
998    async fn create_mysql_client(
999        table_name: Option<&str>,
1000        execution_timeout: Duration,
1001        wait_timeout: Duration,
1002    ) -> Result<Mutex<ElectionMysqlClient>> {
1003        init_default_ut_logging();
1004        let endpoint = env::var("GT_MYSQL_ENDPOINTS").unwrap_or_default();
1005        if endpoint.is_empty() {
1006            return UnexpectedSnafu {
1007                err_msg: "MySQL endpoint is empty".to_string(),
1008            }
1009            .fail();
1010        }
1011        let pool = MySqlPool::connect(&endpoint).await.unwrap();
1012        let mut client = ElectionMysqlClient::new(
1013            pool,
1014            execution_timeout,
1015            execution_timeout,
1016            Duration::from_secs(1),
1017            wait_timeout,
1018            table_name.unwrap_or("default_greptime_metakv-election"),
1019        );
1020        client.maybe_init_client().await?;
1021        if table_name.is_some() {
1022            client.ensure_table_exists().await?;
1023        }
1024        Ok(Mutex::new(client))
1025    }
1026
1027    async fn drop_table(client: &Mutex<ElectionMysqlClient>, table_name: &str) {
1028        let mut client = client.lock().await;
1029        let sql = format!("DROP TABLE IF EXISTS `{}`;", table_name);
1030        client.execute(sqlx::query(&sql), &sql).await.unwrap();
1031    }
1032
1033    #[tokio::test]
1034    async fn test_mysql_crud() {
1035        maybe_skip_mysql_integration_test!();
1036        let key = "test_key".to_string();
1037        let value = "test_value".to_string();
1038
1039        let uuid = uuid::Uuid::new_v4().to_string();
1040        let table_name = "test_mysql_crud_greptime-metakv";
1041        let candidate_lease_ttl = Duration::from_secs(10);
1042        let meta_lease_ttl = Duration::from_secs(2);
1043
1044        let execution_timeout = Duration::from_secs(10);
1045        let idle_session_timeout = Duration::from_secs(0);
1046        let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1047            .await
1048            .unwrap();
1049
1050        {
1051            let mut a = client.lock().await;
1052            let txn = a.transaction().await.unwrap();
1053            let mut executor = Executor::Txn(txn);
1054            let raw_query = format!("SELECT * FROM `{}` FOR UPDATE;", table_name);
1055            let query = sqlx::query(&raw_query);
1056            let _ = executor.query(query, &raw_query).await.unwrap();
1057        }
1058
1059        let (tx, _) = broadcast::channel(100);
1060        let mysql_election = MySqlElection {
1061            leader_value: "test_leader".to_string(),
1062            client,
1063            is_leader: AtomicBool::new(false),
1064            leader_infancy: AtomicBool::new(true),
1065            leader_watcher: tx,
1066            store_key_prefix: uuid,
1067            candidate_lease_ttl,
1068            meta_lease_ttl,
1069            sql_set: ElectionSqlFactory::new(table_name).build(),
1070        };
1071        let client = mysql_election.client.lock().await;
1072        let mut executor = Executor::Default(client);
1073        let res = mysql_election
1074            .put_value_with_lease(&key, &value, 10, &mut executor)
1075            .await
1076            .unwrap();
1077        assert!(res);
1078
1079        let lease = mysql_election
1080            .get_value_with_lease(&key, &mut executor)
1081            .await
1082            .unwrap()
1083            .unwrap();
1084        assert_eq!(lease.leader_value, value);
1085
1086        mysql_election
1087            .update_value_with_lease(&key, &lease.origin, &value, 10, &mut executor)
1088            .await
1089            .unwrap();
1090
1091        let res = mysql_election
1092            .delete_value(&key, &mut executor)
1093            .await
1094            .unwrap();
1095        assert!(res);
1096
1097        let res = mysql_election
1098            .get_value_with_lease(&key, &mut executor)
1099            .await
1100            .unwrap();
1101        assert!(res.is_none());
1102
1103        for i in 0..10 {
1104            let key = format!("test_key_{}", i);
1105            let value = format!("test_value_{}", i);
1106            mysql_election
1107                .put_value_with_lease(&key, &value, 10, &mut executor)
1108                .await
1109                .unwrap();
1110        }
1111
1112        let key_prefix = "test_key".to_string();
1113        let (res, _) = mysql_election
1114            .get_value_with_lease_by_prefix(&key_prefix, &mut executor)
1115            .await
1116            .unwrap();
1117        assert_eq!(res.len(), 10);
1118
1119        for i in 0..10 {
1120            let key = format!("test_key_{}", i);
1121            let res = mysql_election
1122                .delete_value(&key, &mut executor)
1123                .await
1124                .unwrap();
1125            assert!(res);
1126        }
1127
1128        let (res, current) = mysql_election
1129            .get_value_with_lease_by_prefix(&key_prefix, &mut executor)
1130            .await
1131            .unwrap();
1132        assert!(res.is_empty());
1133        assert!(current == Timestamp::default());
1134
1135        // Should drop manually.
1136        std::mem::drop(executor);
1137        drop_table(&mysql_election.client, table_name).await;
1138    }
1139
1140    async fn candidate(
1141        leader_value: String,
1142        candidate_lease_ttl: Duration,
1143        store_key_prefix: String,
1144        table_name: String,
1145    ) {
1146        let meta_lease_ttl = Duration::from_secs(2);
1147        let execution_timeout = Duration::from_secs(10);
1148        let idle_session_timeout = Duration::from_secs(0);
1149        let client =
1150            create_mysql_client(Some(&table_name), execution_timeout, idle_session_timeout)
1151                .await
1152                .unwrap();
1153
1154        let (tx, _) = broadcast::channel(100);
1155        let mysql_election = MySqlElection {
1156            leader_value,
1157            client,
1158            is_leader: AtomicBool::new(false),
1159            leader_infancy: AtomicBool::new(true),
1160            leader_watcher: tx,
1161            store_key_prefix,
1162            candidate_lease_ttl,
1163            meta_lease_ttl,
1164            sql_set: ElectionSqlFactory::new(&table_name).build(),
1165        };
1166
1167        let node_info = MetasrvNodeInfo {
1168            addr: "test_addr".to_string(),
1169            version: "test_version".to_string(),
1170            git_commit: "test_git_commit".to_string(),
1171            start_time_ms: 0,
1172            total_cpu_millicores: 0,
1173            total_memory_bytes: 0,
1174            cpu_usage_millicores: 0,
1175            memory_usage_bytes: 0,
1176            hostname: "test_hostname".to_string(),
1177        };
1178        mysql_election.register_candidate(&node_info).await.unwrap();
1179    }
1180
1181    #[tokio::test]
1182    async fn test_candidate_registration() {
1183        maybe_skip_mysql_integration_test!();
1184        let leader_value_prefix = "test_leader".to_string();
1185        let candidate_lease_ttl = Duration::from_secs(2);
1186        let execution_timeout = Duration::from_secs(10);
1187        let meta_lease_ttl = Duration::from_secs(2);
1188        let idle_session_timeout = Duration::from_secs(0);
1189        let uuid = uuid::Uuid::new_v4().to_string();
1190        let table_name = "test_candidate_registration_greptime-metakv";
1191        let mut handles = vec![];
1192        let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1193            .await
1194            .unwrap();
1195
1196        for i in 0..10 {
1197            let leader_value = format!("{}{}", leader_value_prefix, i);
1198            let handle = tokio::spawn(candidate(
1199                leader_value,
1200                candidate_lease_ttl,
1201                uuid.clone(),
1202                table_name.to_string(),
1203            ));
1204            handles.push(handle);
1205        }
1206        // Wait for candidates to register themselves and renew their leases at least once.
1207        tokio::time::sleep(candidate_lease_ttl / 2 + Duration::from_secs(1)).await;
1208
1209        let (tx, _) = broadcast::channel(100);
1210        let leader_value = "test_leader".to_string();
1211        let mysql_election = MySqlElection {
1212            leader_value,
1213            client,
1214            is_leader: AtomicBool::new(false),
1215            leader_infancy: AtomicBool::new(true),
1216            leader_watcher: tx,
1217            store_key_prefix: uuid.clone(),
1218            candidate_lease_ttl,
1219            meta_lease_ttl,
1220            sql_set: ElectionSqlFactory::new(table_name).build(),
1221        };
1222
1223        let candidates = mysql_election.all_candidates().await.unwrap();
1224        assert_eq!(candidates.len(), 10);
1225
1226        for handle in handles {
1227            handle.abort();
1228        }
1229
1230        // Wait for the candidate leases to expire.
1231        tokio::time::sleep(candidate_lease_ttl + Duration::from_secs(1)).await;
1232        let candidates = mysql_election.all_candidates().await.unwrap();
1233        assert!(candidates.is_empty());
1234
1235        // Garbage collection
1236        let client = mysql_election.client.lock().await;
1237        let mut executor = Executor::Default(client);
1238        for i in 0..10 {
1239            let key = format!("{}{}{}{}", uuid, CANDIDATES_ROOT, leader_value_prefix, i);
1240            let res = mysql_election
1241                .delete_value(&key, &mut executor)
1242                .await
1243                .unwrap();
1244            assert!(res);
1245        }
1246
1247        // Should drop manually.
1248        std::mem::drop(executor);
1249        drop_table(&mysql_election.client, table_name).await;
1250    }
1251
1252    async fn elected(
1253        election: &MySqlElection,
1254        table_name: &str,
1255        expected_lease: Option<Lease>,
1256    ) -> Result<()> {
1257        let mut client = election.client.lock().await;
1258        let txn = client.transaction().await.unwrap();
1259        let mut executor = Executor::Txn(txn);
1260        let raw_query = format!("SELECT * FROM `{}` FOR UPDATE;", table_name);
1261        let query = sqlx::query(&raw_query);
1262        let _ = executor.query(query, &raw_query).await.unwrap();
1263        election.elected(executor, expected_lease).await
1264    }
1265
1266    async fn get_lease(election: &MySqlElection) -> Option<Lease> {
1267        let client = election.client.lock().await;
1268        let mut executor = Executor::Default(client);
1269        election
1270            .get_value_with_lease(&election.election_key(), &mut executor)
1271            .await
1272            .unwrap()
1273    }
1274
1275    #[tokio::test]
1276    async fn test_elected_with_incorrect_lease_fails() {
1277        maybe_skip_mysql_integration_test!();
1278        let leader_value = "test_leader".to_string();
1279        let candidate_lease_ttl = Duration::from_secs(5);
1280        let meta_lease_ttl = Duration::from_secs(2);
1281        let execution_timeout = Duration::from_secs(10);
1282        let idle_session_timeout = Duration::from_secs(0);
1283        let uuid = uuid::Uuid::new_v4().to_string();
1284        let table_name = "test_elected_failed_greptime-metakv";
1285        let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1286            .await
1287            .unwrap();
1288
1289        let (tx, _) = broadcast::channel(100);
1290        let leader_mysql_election = MySqlElection {
1291            leader_value: leader_value.clone(),
1292            client,
1293            is_leader: AtomicBool::new(false),
1294            leader_infancy: AtomicBool::new(true),
1295            leader_watcher: tx,
1296            store_key_prefix: uuid,
1297            candidate_lease_ttl,
1298            meta_lease_ttl,
1299            sql_set: ElectionSqlFactory::new(table_name).build(),
1300        };
1301
1302        let incorrect_lease = Lease::default();
1303        let err = elected(&leader_mysql_election, table_name, Some(incorrect_lease))
1304            .await
1305            .unwrap_err();
1306        assert_matches!(err, error::Error::ElectionLeaderLeaseChanged { .. });
1307        let lease = get_lease(&leader_mysql_election).await;
1308        assert!(lease.is_none());
1309        drop_table(&leader_mysql_election.client, table_name).await;
1310    }
1311
1312    #[tokio::test]
1313    async fn test_reelection_with_idle_session_timeout() {
1314        maybe_skip_mysql_integration_test!();
1315        let leader_value = "test_leader".to_string();
1316        let uuid = uuid::Uuid::new_v4().to_string();
1317        let table_name = "test_reelection_greptime-metakv";
1318        let candidate_lease_ttl = Duration::from_secs(5);
1319        let meta_lease_ttl = Duration::from_secs(5);
1320        let execution_timeout = Duration::from_secs(10);
1321        let idle_session_timeout = Duration::from_secs(2);
1322        let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1323            .await
1324            .unwrap();
1325
1326        let (tx, _) = broadcast::channel(100);
1327        let leader_mysql_election = MySqlElection {
1328            leader_value: leader_value.clone(),
1329            client,
1330            is_leader: AtomicBool::new(false),
1331            leader_infancy: AtomicBool::new(true),
1332            leader_watcher: tx,
1333            store_key_prefix: uuid,
1334            candidate_lease_ttl,
1335            meta_lease_ttl,
1336            sql_set: ElectionSqlFactory::new(table_name).build(),
1337        };
1338
1339        elected(&leader_mysql_election, table_name, None)
1340            .await
1341            .unwrap();
1342        let lease = get_lease(&leader_mysql_election).await.unwrap();
1343        assert_eq!(lease.leader_value, leader_value);
1344        assert!(lease.expire_time > lease.current);
1345        assert!(leader_mysql_election.is_leader());
1346        // Wait for mysql server close the inactive connection.
1347        tokio::time::sleep(Duration::from_millis(2100)).await;
1348        // Should be failed.
1349        leader_mysql_election
1350            .client
1351            .lock()
1352            .await
1353            .query(sqlx::query("SELECT 1"), "SELECT 1")
1354            .await
1355            .unwrap_err();
1356        // Reset the client.
1357        leader_mysql_election
1358            .client
1359            .lock()
1360            .await
1361            .reset_client()
1362            .await
1363            .unwrap();
1364
1365        // Should able to re-elected.
1366        elected(&leader_mysql_election, table_name, Some(lease.clone()))
1367            .await
1368            .unwrap();
1369        let lease = get_lease(&leader_mysql_election).await.unwrap();
1370        assert_eq!(lease.leader_value, leader_value);
1371        assert!(lease.expire_time > lease.current);
1372        assert!(leader_mysql_election.is_leader());
1373        drop_table(&leader_mysql_election.client, table_name).await;
1374    }
1375
1376    #[tokio::test]
1377    async fn test_elected_and_step_down() {
1378        maybe_skip_mysql_integration_test!();
1379        let leader_value = "test_leader".to_string();
1380        let candidate_lease_ttl = Duration::from_secs(5);
1381        let meta_lease_ttl = Duration::from_secs(2);
1382        let execution_timeout = Duration::from_secs(10);
1383        let idle_session_timeout = Duration::from_secs(0);
1384        let uuid = uuid::Uuid::new_v4().to_string();
1385        let table_name = "test_elected_and_step_down_greptime-metakv";
1386        let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1387            .await
1388            .unwrap();
1389
1390        let (tx, mut rx) = broadcast::channel(100);
1391        let leader_mysql_election = MySqlElection {
1392            leader_value: leader_value.clone(),
1393            client,
1394            is_leader: AtomicBool::new(false),
1395            leader_infancy: AtomicBool::new(true),
1396            leader_watcher: tx,
1397            store_key_prefix: uuid,
1398            candidate_lease_ttl,
1399            meta_lease_ttl,
1400            sql_set: ElectionSqlFactory::new(table_name).build(),
1401        };
1402
1403        elected(&leader_mysql_election, table_name, None)
1404            .await
1405            .unwrap();
1406        let lease = get_lease(&leader_mysql_election).await.unwrap();
1407        assert_eq!(lease.leader_value, leader_value);
1408        assert!(lease.expire_time > lease.current);
1409        assert!(leader_mysql_election.is_leader());
1410
1411        match rx.recv().await {
1412            Ok(LeaderChangeMessage::Elected(key)) => {
1413                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1414                assert_eq!(
1415                    String::from_utf8_lossy(key.key()),
1416                    leader_mysql_election.election_key()
1417                );
1418                assert_eq!(key.lease_id(), i64::default());
1419                assert_eq!(key.revision(), i64::default());
1420            }
1421            _ => panic!("Expected LeaderChangeMessage::Elected"),
1422        }
1423
1424        leader_mysql_election
1425            .step_down_without_lock()
1426            .await
1427            .unwrap();
1428        let lease = get_lease(&leader_mysql_election).await.unwrap();
1429        assert_eq!(lease.leader_value, leader_value);
1430        assert!(!leader_mysql_election.is_leader());
1431
1432        match rx.recv().await {
1433            Ok(LeaderChangeMessage::StepDown(key)) => {
1434                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1435                assert_eq!(
1436                    String::from_utf8_lossy(key.key()),
1437                    leader_mysql_election.election_key()
1438                );
1439                assert_eq!(key.lease_id(), i64::default());
1440                assert_eq!(key.revision(), i64::default());
1441            }
1442            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1443        }
1444
1445        elected(&leader_mysql_election, table_name, Some(lease.clone()))
1446            .await
1447            .unwrap();
1448        let lease = get_lease(&leader_mysql_election).await.unwrap();
1449        assert_eq!(lease.leader_value, leader_value);
1450        assert!(lease.expire_time > lease.current);
1451        assert!(leader_mysql_election.is_leader());
1452
1453        match rx.recv().await {
1454            Ok(LeaderChangeMessage::Elected(key)) => {
1455                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1456                assert_eq!(
1457                    String::from_utf8_lossy(key.key()),
1458                    leader_mysql_election.election_key()
1459                );
1460                assert_eq!(key.lease_id(), i64::default());
1461                assert_eq!(key.revision(), i64::default());
1462            }
1463            _ => panic!("Expected LeaderChangeMessage::Elected"),
1464        }
1465
1466        drop_table(&leader_mysql_election.client, table_name).await;
1467    }
1468
1469    #[tokio::test]
1470    async fn test_campaign() {
1471        maybe_skip_mysql_integration_test!();
1472        let leader_value = "test_leader".to_string();
1473        let uuid = uuid::Uuid::new_v4().to_string();
1474        let table_name = "test_leader_action_greptime-metakv";
1475        let candidate_lease_ttl = Duration::from_secs(5);
1476        let meta_lease_ttl = Duration::from_secs(2);
1477        let execution_timeout = Duration::from_secs(10);
1478        let idle_session_timeout = Duration::from_secs(0);
1479        let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1480            .await
1481            .unwrap();
1482
1483        let (tx, mut rx) = broadcast::channel(100);
1484        let leader_mysql_election = MySqlElection {
1485            leader_value: leader_value.clone(),
1486            client,
1487            is_leader: AtomicBool::new(false),
1488            leader_infancy: AtomicBool::new(true),
1489            leader_watcher: tx,
1490            store_key_prefix: uuid,
1491            candidate_lease_ttl,
1492            meta_lease_ttl,
1493            sql_set: ElectionSqlFactory::new(table_name).build(),
1494        };
1495
1496        // Step 1: No leader exists, campaign and elected.
1497        leader_mysql_election.do_campaign().await.unwrap();
1498        let lease = get_lease(&leader_mysql_election).await.unwrap();
1499        assert_eq!(lease.leader_value, leader_value);
1500        assert!(lease.expire_time > lease.current);
1501        assert!(leader_mysql_election.is_leader());
1502
1503        match rx.recv().await {
1504            Ok(LeaderChangeMessage::Elected(key)) => {
1505                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1506                assert_eq!(
1507                    String::from_utf8_lossy(key.key()),
1508                    leader_mysql_election.election_key()
1509                );
1510                assert_eq!(key.lease_id(), i64::default());
1511                assert_eq!(key.revision(), i64::default());
1512            }
1513            _ => panic!("Expected LeaderChangeMessage::Elected"),
1514        }
1515
1516        // Step 2: As a leader, renew the lease.
1517        leader_mysql_election.do_campaign().await.unwrap();
1518        let new_lease = get_lease(&leader_mysql_election).await.unwrap();
1519        assert_eq!(lease.leader_value, leader_value);
1520        // The lease should be renewed.
1521        assert!(new_lease.expire_time > lease.expire_time);
1522        assert!(new_lease.expire_time > new_lease.current);
1523        assert!(leader_mysql_election.is_leader());
1524
1525        // Step 3: Something wrong, the leader lease expired.
1526        tokio::time::sleep(meta_lease_ttl + Duration::from_secs(1)).await;
1527        leader_mysql_election.do_campaign().await.unwrap();
1528        let lease = get_lease(&leader_mysql_election).await.unwrap();
1529        assert_eq!(lease.leader_value, leader_value);
1530        assert!(lease.expire_time <= lease.current);
1531        assert!(!leader_mysql_election.is_leader());
1532
1533        match rx.recv().await {
1534            Ok(LeaderChangeMessage::StepDown(key)) => {
1535                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1536                assert_eq!(
1537                    String::from_utf8_lossy(key.key()),
1538                    leader_mysql_election.election_key()
1539                );
1540                assert_eq!(key.lease_id(), i64::default());
1541                assert_eq!(key.revision(), i64::default());
1542            }
1543            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1544        }
1545
1546        // Step 4: Re-elect itself.
1547        leader_mysql_election.do_campaign().await.unwrap();
1548        let lease = get_lease(&leader_mysql_election).await.unwrap();
1549        assert_eq!(lease.leader_value, leader_value);
1550        assert!(lease.expire_time > lease.current);
1551        assert!(leader_mysql_election.is_leader());
1552
1553        match rx.recv().await {
1554            Ok(LeaderChangeMessage::Elected(key)) => {
1555                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1556                assert_eq!(
1557                    String::from_utf8_lossy(key.key()),
1558                    leader_mysql_election.election_key()
1559                );
1560                assert_eq!(key.lease_id(), i64::default());
1561                assert_eq!(key.revision(), i64::default());
1562            }
1563            _ => panic!("Expected LeaderChangeMessage::Elected"),
1564        }
1565
1566        // Step 5: Something wrong, the leader key is deleted by other followers.
1567        {
1568            let client = leader_mysql_election.client.lock().await;
1569            let mut executor = Executor::Default(client);
1570            leader_mysql_election
1571                .delete_value(&leader_mysql_election.election_key(), &mut executor)
1572                .await
1573                .unwrap();
1574        }
1575        leader_mysql_election.do_campaign().await.unwrap();
1576        let res = get_lease(&leader_mysql_election).await;
1577        assert!(res.is_none());
1578        assert!(!leader_mysql_election.is_leader());
1579
1580        match rx.recv().await {
1581            Ok(LeaderChangeMessage::StepDown(key)) => {
1582                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1583                assert_eq!(
1584                    String::from_utf8_lossy(key.key()),
1585                    leader_mysql_election.election_key()
1586                );
1587                assert_eq!(key.lease_id(), i64::default());
1588                assert_eq!(key.revision(), i64::default());
1589            }
1590            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1591        }
1592
1593        // Step 6: Re-elect itself.
1594        leader_mysql_election.do_campaign().await.unwrap();
1595        let lease = get_lease(&leader_mysql_election).await.unwrap();
1596        assert_eq!(lease.leader_value, leader_value);
1597        assert!(lease.expire_time > lease.current);
1598        assert!(leader_mysql_election.is_leader());
1599
1600        match rx.recv().await {
1601            Ok(LeaderChangeMessage::Elected(key)) => {
1602                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1603                assert_eq!(
1604                    String::from_utf8_lossy(key.key()),
1605                    leader_mysql_election.election_key()
1606                );
1607                assert_eq!(key.lease_id(), i64::default());
1608                assert_eq!(key.revision(), i64::default());
1609            }
1610            _ => panic!("Expected LeaderChangeMessage::Elected"),
1611        }
1612
1613        // Step 7: Something wrong, the leader key changed by others.
1614        let another_leader_key = "another_leader";
1615        {
1616            let client = leader_mysql_election.client.lock().await;
1617            let mut executor = Executor::Default(client);
1618            leader_mysql_election
1619                .delete_value(&leader_mysql_election.election_key(), &mut executor)
1620                .await
1621                .unwrap();
1622            leader_mysql_election
1623                .put_value_with_lease(
1624                    &leader_mysql_election.election_key(),
1625                    another_leader_key,
1626                    10,
1627                    &mut executor,
1628                )
1629                .await
1630                .unwrap();
1631        }
1632        leader_mysql_election.do_campaign().await.unwrap();
1633        let lease = get_lease(&leader_mysql_election).await.unwrap();
1634        // Different from pg, mysql will not delete the key, just step down.
1635        assert_eq!(lease.leader_value, another_leader_key);
1636        assert!(lease.expire_time > lease.current);
1637        assert!(!leader_mysql_election.is_leader());
1638
1639        match rx.recv().await {
1640            Ok(LeaderChangeMessage::StepDown(key)) => {
1641                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1642                assert_eq!(
1643                    String::from_utf8_lossy(key.key()),
1644                    leader_mysql_election.election_key()
1645                );
1646                assert_eq!(key.lease_id(), i64::default());
1647                assert_eq!(key.revision(), i64::default());
1648            }
1649            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1650        }
1651
1652        drop_table(&leader_mysql_election.client, table_name).await;
1653    }
1654
1655    #[tokio::test]
1656    async fn test_reset_campaign() {
1657        maybe_skip_mysql_integration_test!();
1658        common_telemetry::init_default_ut_logging();
1659        let leader_value = "test_leader".to_string();
1660        let uuid = uuid::Uuid::new_v4().to_string();
1661        let table_name = "test_reset_campaign_greptime-metakv";
1662        let candidate_lease_ttl = Duration::from_secs(5);
1663        let meta_lease_ttl = Duration::from_secs(2);
1664        let execution_timeout = Duration::from_secs(10);
1665        let idle_session_timeout = Duration::from_secs(0);
1666        let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1667            .await
1668            .unwrap();
1669
1670        let (tx, _) = broadcast::channel(100);
1671        let leader_mysql_election = MySqlElection {
1672            leader_value,
1673            client,
1674            is_leader: AtomicBool::new(false),
1675            leader_infancy: AtomicBool::new(true),
1676            leader_watcher: tx,
1677            store_key_prefix: uuid,
1678            candidate_lease_ttl,
1679            meta_lease_ttl,
1680            sql_set: ElectionSqlFactory::new(table_name).build(),
1681        };
1682        leader_mysql_election
1683            .is_leader
1684            .store(true, Ordering::Relaxed);
1685        leader_mysql_election.reset_campaign().await;
1686        assert!(!leader_mysql_election.is_leader());
1687        drop_table(&leader_mysql_election.client, table_name).await;
1688    }
1689
1690    #[tokio::test]
1691    async fn test_follower_action() {
1692        maybe_skip_mysql_integration_test!();
1693        common_telemetry::init_default_ut_logging();
1694        let candidate_lease_ttl = Duration::from_secs(5);
1695        let meta_lease_ttl = Duration::from_secs(1);
1696        let execution_timeout = Duration::from_secs(10);
1697        let idle_session_timeout = Duration::from_secs(0);
1698        let uuid = uuid::Uuid::new_v4().to_string();
1699        let table_name = "test_follower_action_greptime-metakv";
1700
1701        let follower_client =
1702            create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1703                .await
1704                .unwrap();
1705        let (tx, mut rx) = broadcast::channel(100);
1706        let follower_mysql_election = MySqlElection {
1707            leader_value: "test_follower".to_string(),
1708            client: follower_client,
1709            is_leader: AtomicBool::new(false),
1710            leader_infancy: AtomicBool::new(true),
1711            leader_watcher: tx,
1712            store_key_prefix: uuid.clone(),
1713            candidate_lease_ttl,
1714            meta_lease_ttl,
1715            sql_set: ElectionSqlFactory::new(table_name).build(),
1716        };
1717
1718        let leader_client =
1719            create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1720                .await
1721                .unwrap();
1722        let (tx, _) = broadcast::channel(100);
1723        let leader_mysql_election = MySqlElection {
1724            leader_value: "test_leader".to_string(),
1725            client: leader_client,
1726            is_leader: AtomicBool::new(false),
1727            leader_infancy: AtomicBool::new(true),
1728            leader_watcher: tx,
1729            store_key_prefix: uuid,
1730            candidate_lease_ttl,
1731            meta_lease_ttl,
1732            sql_set: ElectionSqlFactory::new(table_name).build(),
1733        };
1734
1735        leader_mysql_election.do_campaign().await.unwrap();
1736
1737        // Step 1: As a follower, the leader exists and the lease is not expired. Do nothing.
1738        follower_mysql_election.do_campaign().await.unwrap();
1739
1740        // Step 2: As a follower, the leader exists but the lease expired. Re-elect itself.
1741        tokio::time::sleep(meta_lease_ttl + Duration::from_secs(1)).await;
1742        follower_mysql_election.do_campaign().await.unwrap();
1743        assert!(follower_mysql_election.is_leader());
1744
1745        match rx.recv().await {
1746            Ok(LeaderChangeMessage::Elected(key)) => {
1747                assert_eq!(String::from_utf8_lossy(key.name()), "test_follower");
1748                assert_eq!(
1749                    String::from_utf8_lossy(key.key()),
1750                    follower_mysql_election.election_key()
1751                );
1752                assert_eq!(key.lease_id(), i64::default());
1753                assert_eq!(key.revision(), i64::default());
1754            }
1755            _ => panic!("Expected LeaderChangeMessage::Elected"),
1756        }
1757
1758        drop_table(&follower_mysql_election.client, table_name).await;
1759    }
1760
1761    #[tokio::test]
1762    async fn test_wait_timeout() {
1763        maybe_skip_mysql_integration_test!();
1764        common_telemetry::init_default_ut_logging();
1765        let execution_timeout = Duration::from_secs(10);
1766        let idle_session_timeout = Duration::from_secs(1);
1767
1768        let client = create_mysql_client(None, execution_timeout, idle_session_timeout)
1769            .await
1770            .unwrap();
1771        tokio::time::sleep(Duration::from_millis(1100)).await;
1772        // Wait for the idle session timeout.
1773        let err = client
1774            .lock()
1775            .await
1776            .query(sqlx::query("SELECT 1"), "SELECT 1")
1777            .await
1778            .unwrap_err();
1779        assert_matches!(err, error::Error::MySqlExecution { .. });
1780        // Reset the client and try again.
1781        client.lock().await.reset_client().await.unwrap();
1782        let _ = client
1783            .lock()
1784            .await
1785            .query(sqlx::query("SELECT 1"), "SELECT 1")
1786            .await
1787            .unwrap();
1788    }
1789}