meta_srv/election/rds/
mysql.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::sync::atomic::{AtomicBool, Ordering};
17use std::time::Duration;
18
19use common_meta::key::{CANDIDATES_ROOT, ELECTION_KEY};
20use common_telemetry::{error, info, warn};
21use common_time::Timestamp;
22use snafu::{OptionExt, ResultExt, ensure};
23use sqlx::mysql::{MySqlArguments, MySqlRow};
24use sqlx::pool::PoolConnection;
25use sqlx::query::Query;
26use sqlx::{MySql, MySqlPool, MySqlTransaction, Row};
27use tokio::sync::{Mutex, MutexGuard, broadcast};
28use tokio::time::MissedTickBehavior;
29
30use crate::election::rds::{LEASE_SEP, Lease, RdsLeaderKey, parse_value_and_expire_time};
31use crate::election::{
32    Election, LeaderChangeMessage, listen_leader_change, send_leader_change_and_set_flags,
33};
34use crate::error::{
35    AcquireMySqlClientSnafu, DecodeSqlValueSnafu, DeserializeFromJsonSnafu,
36    LeaderLeaseChangedSnafu, LeaderLeaseExpiredSnafu, MySqlExecutionSnafu, NoLeaderSnafu, Result,
37    SerializeToJsonSnafu, SqlExecutionTimeoutSnafu, UnexpectedSnafu,
38};
39use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};
40
41struct ElectionSqlFactory<'a> {
42    table_name: &'a str,
43}
44
45struct ElectionSqlSet {
46    campaign: String,
47    // SQL to put a value with expire time.
48    //
49    // Parameters for the query:
50    // `$1`: key,
51    // `$2`: value,
52    // `$3`: lease time in seconds
53    //
54    // Returns:
55    // If the key already exists, return the previous value.
56    put_value_with_lease: String,
57    // SQL to update a value with expire time.
58    //
59    // Parameters for the query:
60    // `$1`: updated value,
61    // `$2`: lease time in seconds
62    // `$3`: key,
63    // `$4`: previous value,
64    update_value_with_lease: String,
65    // SQL to get a value with expire time.
66    //
67    // Parameters:
68    // `$1`: key
69    get_value_with_lease: String,
70    // SQL to get all values with expire time with the given key prefix.
71    //
72    // Parameters:
73    // `$1`: key prefix like 'prefix%'
74    //
75    // Returns:
76    // column 0: value,
77    // column 1: current timestamp
78    get_value_with_lease_by_prefix: String,
79    // SQL to delete a value.
80    //
81    // Parameters:
82    // `?`: key
83    //
84    // Returns:
85    // Rows affected
86    delete_value: String,
87}
88
89impl<'a> ElectionSqlFactory<'a> {
90    fn new(table_name: &'a str) -> Self {
91        Self { table_name }
92    }
93
94    fn build(self) -> ElectionSqlSet {
95        ElectionSqlSet {
96            campaign: self.campaign_sql(),
97            put_value_with_lease: self.put_value_with_lease_sql(),
98            update_value_with_lease: self.update_value_with_lease_sql(),
99            get_value_with_lease: self.get_value_with_lease_sql(),
100            get_value_with_lease_by_prefix: self.get_value_with_lease_by_prefix_sql(),
101            delete_value: self.delete_value_sql(),
102        }
103    }
104
105    /// Use `SELECT FOR UPDATE` to lock for compatibility with other MySQL-compatible databases
106    /// instead of directly using `GET_LOCK`.
107    fn campaign_sql(&self) -> String {
108        format!("SELECT * FROM `{}` FOR UPDATE;", self.table_name)
109    }
110
111    fn put_value_with_lease_sql(&self) -> String {
112        format!(
113            r#"
114            INSERT INTO `{}` (k, v) VALUES (
115                ?,
116                CONCAT(
117                    ?,
118                    '{}',
119                    DATE_FORMAT(DATE_ADD(NOW(4), INTERVAL ? SECOND), '%Y-%m-%d %T.%f')
120                )
121            )
122            ON DUPLICATE KEY UPDATE v = VALUES(v);
123            "#,
124            self.table_name, LEASE_SEP
125        )
126    }
127
128    fn update_value_with_lease_sql(&self) -> String {
129        format!(
130            r#"UPDATE `{}`
131               SET v = CONCAT(?, '{}', DATE_FORMAT(DATE_ADD(NOW(4), INTERVAL ? SECOND), '%Y-%m-%d %T.%f'))
132               WHERE k = ? AND v = ?"#,
133            self.table_name, LEASE_SEP
134        )
135    }
136
137    fn get_value_with_lease_sql(&self) -> String {
138        format!(
139            r#"SELECT v, DATE_FORMAT(NOW(4), '%Y-%m-%d %T.%f') FROM `{}` WHERE k = ?"#,
140            self.table_name
141        )
142    }
143
144    fn get_value_with_lease_by_prefix_sql(&self) -> String {
145        format!(
146            r#"SELECT v, DATE_FORMAT(NOW(4), '%Y-%m-%d %T.%f') FROM `{}` WHERE k LIKE ?"#,
147            self.table_name
148        )
149    }
150
151    fn delete_value_sql(&self) -> String {
152        format!("DELETE FROM {} WHERE k = ?;", self.table_name)
153    }
154}
155
156enum Executor<'a> {
157    Default(MutexGuard<'a, ElectionMysqlClient>),
158    Txn(TransactionWithExecutionTimeout<'a>),
159}
160
161impl Executor<'_> {
162    async fn query(
163        &mut self,
164        query: Query<'_, MySql, MySqlArguments>,
165        sql: &str,
166    ) -> Result<Vec<MySqlRow>> {
167        match self {
168            Executor::Default(client) => client.query(query, sql).await,
169            Executor::Txn(txn) => txn.query(query, sql).await,
170        }
171    }
172
173    async fn execute(&mut self, query: Query<'_, MySql, MySqlArguments>, sql: &str) -> Result<u64> {
174        match self {
175            Executor::Default(client) => client.execute(query, sql).await,
176            Executor::Txn(txn) => txn.execute(query, sql).await,
177        }
178    }
179
180    async fn commit(self) -> Result<()> {
181        match self {
182            Executor::Txn(txn) => txn.commit().await,
183            _ => Ok(()),
184        }
185    }
186}
187
188/// MySQL client for election.
189pub struct ElectionMysqlClient {
190    current: Option<PoolConnection<MySql>>,
191    pool: MySqlPool,
192
193    /// The client-side timeout for statement execution.
194    ///
195    /// This timeout is enforced by the client application and is independent of any server-side timeouts.
196    /// If a statement takes longer than this duration to execute, the client will abort the operation.
197    execution_timeout: Duration,
198
199    /// The maximum execution time for the statement.
200    ///
201    /// This timeout is enforced by the server and is independent of any client-side timeouts.
202    /// If a statement takes longer than this duration to execute, the server will abort the operation.
203    max_execution_time: Duration,
204
205    /// The lock wait timeout for the session.
206    ///
207    /// This timeout determines how long the server waits for a lock to be acquired before timing out.
208    /// If a lock cannot be acquired within this duration, the server will abort the operation.
209    innode_lock_wait_timeout: Duration,
210
211    /// The wait timeout for the session.
212    ///
213    /// This timeout determines how long the server waits for activity on a noninteractive connection
214    /// before closing it. If a connection is idle for longer than this duration, the server will
215    /// terminate it.
216    wait_timeout: Duration,
217
218    /// The table name for election.
219    table_for_election: String,
220}
221
222impl ElectionMysqlClient {
223    pub fn new(
224        pool: MySqlPool,
225        execution_timeout: Duration,
226        max_execution_time: Duration,
227        innode_lock_wait_timeout: Duration,
228        wait_timeout: Duration,
229        table_for_election: &str,
230    ) -> Self {
231        Self {
232            current: None,
233            pool,
234            execution_timeout,
235            max_execution_time,
236            innode_lock_wait_timeout,
237            wait_timeout,
238            table_for_election: table_for_election.to_string(),
239        }
240    }
241
242    fn create_table_sql(&self) -> String {
243        format!(
244            r#"
245            CREATE TABLE IF NOT EXISTS `{}` (
246                k VARBINARY(3072) PRIMARY KEY,
247                v BLOB
248            );"#,
249            self.table_for_election
250        )
251    }
252
253    fn insert_once_sql(&self) -> String {
254        format!(
255            "INSERT IGNORE INTO `{}` (k, v) VALUES ('__place_holder_for_lock', '');",
256            self.table_for_election
257        )
258    }
259
260    fn check_version_sql(&self) -> String {
261        "SELECT @@version;".to_string()
262    }
263
264    async fn reset_client(&mut self) -> Result<()> {
265        self.current = None;
266        self.maybe_init_client().await
267    }
268
269    async fn ensure_table_exists(&mut self) -> Result<()> {
270        let create_table_sql = self.create_table_sql();
271        let query = sqlx::query(&create_table_sql);
272        self.execute(query, &create_table_sql).await?;
273        // Insert at least one row for `SELECT * FOR UPDATE` to work.
274        let insert_once_sql = self.insert_once_sql();
275        let query = sqlx::query(&insert_once_sql);
276        self.execute(query, &insert_once_sql).await?;
277        Ok(())
278    }
279
280    async fn maybe_init_client(&mut self) -> Result<()> {
281        if self.current.is_none() {
282            let client = self.pool.acquire().await.context(AcquireMySqlClientSnafu)?;
283
284            self.current = Some(client);
285            let (query, sql) = if !self.wait_timeout.is_zero() {
286                let sql = "SET SESSION wait_timeout = ?, innodb_lock_wait_timeout = ?, max_execution_time = ?;";
287                (
288                    sqlx::query(sql)
289                        .bind(self.wait_timeout.as_secs())
290                        .bind(self.innode_lock_wait_timeout.as_secs())
291                        .bind(self.max_execution_time.as_millis() as u64),
292                    sql,
293                )
294            } else {
295                let sql = "SET SESSION innodb_lock_wait_timeout = ?, max_execution_time = ?;";
296                (
297                    sqlx::query(sql)
298                        .bind(self.innode_lock_wait_timeout.as_secs())
299                        .bind(self.max_execution_time.as_millis() as u64),
300                    sql,
301                )
302            };
303            self.set_session_isolation_level().await?;
304            self.execute(query, sql).await?;
305            self.check_version(&self.check_version_sql()).await?;
306        }
307        Ok(())
308    }
309
310    /// Set session isolation level to serializable.
311    ///
312    /// # Panics
313    /// if `current` is `None`.
314    async fn set_session_isolation_level(&mut self) -> Result<()> {
315        let sql = "SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE";
316        let query = sqlx::query(sql);
317        self.execute(query, sql).await?;
318        Ok(())
319    }
320
321    /// Check if the MySQL version is supported.
322    ///
323    /// # Panics
324    /// if `current` is `None`.
325    async fn check_version(&mut self, sql: &str) -> Result<()> {
326        // Check if the MySQL version is supported.
327        let query = sqlx::query(sql);
328        // Safety: `maybe_init_client` ensures `current` is not `None`.
329        let client = self.current.as_mut().unwrap();
330        let row = tokio::time::timeout(self.execution_timeout, query.fetch_one(&mut **client))
331            .await
332            .map_err(|_| {
333                SqlExecutionTimeoutSnafu {
334                    sql,
335                    duration: self.execution_timeout,
336                }
337                .build()
338            })?;
339        match row {
340            Ok(row) => {
341                let version: String = row.try_get(0).context(DecodeSqlValueSnafu {})?;
342                if !version.starts_with("8.0") && !version.starts_with("5.7") {
343                    warn!(
344                        "Unsupported MySQL version: {}, expected: [5.7, 8.0]",
345                        version
346                    );
347                }
348            }
349            Err(e) => {
350                warn!(e; "Failed to check MySQL version through sql: {}", sql);
351            }
352        }
353        Ok(())
354    }
355
356    /// Returns the result of the query.
357    ///
358    /// # Panics
359    /// if `current` is `None`.
360    async fn execute(&mut self, query: Query<'_, MySql, MySqlArguments>, sql: &str) -> Result<u64> {
361        // Safety: `maybe_init_client` ensures `current` is not `None`.
362        let client = self.current.as_mut().unwrap();
363        let future = query.execute(&mut **client);
364        let rows_affected = tokio::time::timeout(self.execution_timeout, future)
365            .await
366            .map_err(|_| {
367                SqlExecutionTimeoutSnafu {
368                    sql,
369                    duration: self.execution_timeout,
370                }
371                .build()
372            })?
373            .context(MySqlExecutionSnafu { sql })?
374            .rows_affected();
375        Ok(rows_affected)
376    }
377
378    /// Returns the result of the query.
379    ///
380    /// # Panics
381    /// if `current` is `None`.
382    async fn query(
383        &mut self,
384        query: Query<'_, MySql, MySqlArguments>,
385        sql: &str,
386    ) -> Result<Vec<MySqlRow>> {
387        // Safety: `maybe_init_client` ensures `current` is not `None`.
388        let client = self.current.as_mut().unwrap();
389        let future = query.fetch_all(&mut **client);
390        tokio::time::timeout(self.execution_timeout, future)
391            .await
392            .map_err(|_| {
393                SqlExecutionTimeoutSnafu {
394                    sql,
395                    duration: self.execution_timeout,
396                }
397                .build()
398            })?
399            .context(MySqlExecutionSnafu { sql })
400    }
401
402    async fn transaction(&mut self) -> Result<TransactionWithExecutionTimeout<'_>> {
403        use sqlx::Acquire;
404        let client = self.current.as_mut().unwrap();
405        let transaction = client
406            .begin()
407            .await
408            .context(MySqlExecutionSnafu { sql: "BEGIN" })?;
409
410        Ok(TransactionWithExecutionTimeout {
411            transaction,
412            execution_timeout: self.execution_timeout,
413        })
414    }
415}
416
417struct TransactionWithExecutionTimeout<'a> {
418    transaction: MySqlTransaction<'a>,
419    execution_timeout: Duration,
420}
421
422impl TransactionWithExecutionTimeout<'_> {
423    async fn query(
424        &mut self,
425        query: Query<'_, MySql, MySqlArguments>,
426        sql: &str,
427    ) -> Result<Vec<MySqlRow>> {
428        let res = tokio::time::timeout(
429            self.execution_timeout,
430            query.fetch_all(&mut *self.transaction),
431        )
432        .await
433        .map_err(|_| {
434            SqlExecutionTimeoutSnafu {
435                sql,
436                duration: self.execution_timeout,
437            }
438            .build()
439        })?
440        .context(MySqlExecutionSnafu { sql })?;
441        Ok(res)
442    }
443
444    async fn execute(&mut self, query: Query<'_, MySql, MySqlArguments>, sql: &str) -> Result<u64> {
445        let res = tokio::time::timeout(
446            self.execution_timeout,
447            query.execute(&mut *self.transaction),
448        )
449        .await
450        .map_err(|_| {
451            SqlExecutionTimeoutSnafu {
452                sql,
453                duration: self.execution_timeout,
454            }
455            .build()
456        })?
457        .context(MySqlExecutionSnafu { sql })?;
458        Ok(res.rows_affected())
459    }
460
461    async fn commit(self) -> Result<()> {
462        tokio::time::timeout(self.execution_timeout, self.transaction.commit())
463            .await
464            .map_err(|_| {
465                SqlExecutionTimeoutSnafu {
466                    sql: "COMMIT",
467                    duration: self.execution_timeout,
468                }
469                .build()
470            })?
471            .context(MySqlExecutionSnafu { sql: "COMMIT" })?;
472        Ok(())
473    }
474}
475
476/// MySQL implementation of Election.
477pub struct MySqlElection {
478    leader_value: String,
479    client: Mutex<ElectionMysqlClient>,
480    is_leader: AtomicBool,
481    leader_infancy: AtomicBool,
482    leader_watcher: broadcast::Sender<LeaderChangeMessage>,
483    store_key_prefix: String,
484    candidate_lease_ttl: Duration,
485    meta_lease_ttl: Duration,
486    sql_set: ElectionSqlSet,
487}
488
489impl MySqlElection {
490    pub async fn with_mysql_client(
491        leader_value: String,
492        mut client: ElectionMysqlClient,
493        store_key_prefix: String,
494        candidate_lease_ttl: Duration,
495        meta_lease_ttl: Duration,
496        table_name: &str,
497    ) -> Result<ElectionRef> {
498        let sql_factory = ElectionSqlFactory::new(table_name);
499        client.maybe_init_client().await?;
500        client.ensure_table_exists().await?;
501        let tx = listen_leader_change(leader_value.clone());
502        Ok(Arc::new(Self {
503            leader_value,
504            client: Mutex::new(client),
505            is_leader: AtomicBool::new(false),
506            leader_infancy: AtomicBool::new(false),
507            leader_watcher: tx,
508            store_key_prefix,
509            candidate_lease_ttl,
510            meta_lease_ttl,
511            sql_set: sql_factory.build(),
512        }))
513    }
514
515    fn election_key(&self) -> String {
516        format!("{}{}", self.store_key_prefix, ELECTION_KEY)
517    }
518
519    fn candidate_root(&self) -> String {
520        format!("{}{}", self.store_key_prefix, CANDIDATES_ROOT)
521    }
522
523    fn candidate_key(&self) -> String {
524        format!("{}{}", self.candidate_root(), self.leader_value)
525    }
526
527    async fn maybe_init_client(&self) -> Result<()> {
528        let mut client = self.client.lock().await;
529        client.maybe_init_client().await?;
530        Ok(())
531    }
532}
533
534#[async_trait::async_trait]
535impl Election for MySqlElection {
536    type Leader = LeaderValue;
537
538    fn is_leader(&self) -> bool {
539        self.is_leader.load(Ordering::Relaxed)
540    }
541
542    fn in_leader_infancy(&self) -> bool {
543        self.leader_infancy
544            .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
545            .is_ok()
546    }
547
548    async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> {
549        self.maybe_init_client().await?;
550        let key = self.candidate_key();
551        let node_info =
552            serde_json::to_string(node_info).with_context(|_| SerializeToJsonSnafu {
553                input: format!("{node_info:?}"),
554            })?;
555
556        {
557            let client = self.client.lock().await;
558            let mut executor = Executor::Default(client);
559            let res = self
560                .put_value_with_lease(
561                    &key,
562                    &node_info,
563                    self.candidate_lease_ttl.as_secs(),
564                    &mut executor,
565                )
566                .await?;
567            // May registered before, just update the lease.
568            if !res {
569                warn!("Candidate already registered, update the lease");
570                self.delete_value(&key, &mut executor).await?;
571                self.put_value_with_lease(
572                    &key,
573                    &node_info,
574                    self.candidate_lease_ttl.as_secs(),
575                    &mut executor,
576                )
577                .await?;
578            }
579        }
580
581        // Check if the current lease has expired and renew the lease.
582        let mut keep_alive_interval = tokio::time::interval(self.candidate_lease_ttl / 2);
583        loop {
584            let _ = keep_alive_interval.tick().await;
585            let client = self.client.lock().await;
586            let mut executor = Executor::Default(client);
587            let lease = self
588                .get_value_with_lease(&key, &mut executor)
589                .await?
590                .unwrap_or_default();
591
592            ensure!(
593                lease.expire_time > lease.current,
594                UnexpectedSnafu {
595                    violated: format!(
596                        "Candidate lease expired at {:?} (current time: {:?}), key: {:?}",
597                        lease.expire_time,
598                        lease.current,
599                        String::from_utf8_lossy(&key.into_bytes())
600                    ),
601                }
602            );
603
604            self.update_value_with_lease(
605                &key,
606                &lease.origin,
607                &node_info,
608                self.candidate_lease_ttl.as_secs(),
609                &mut executor,
610            )
611            .await?;
612            std::mem::drop(executor);
613        }
614    }
615
616    async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>> {
617        self.maybe_init_client().await?;
618        let key_prefix = self.candidate_root();
619        let client = self.client.lock().await;
620        let mut executor = Executor::Default(client);
621        let (mut candidates, current) = self
622            .get_value_with_lease_by_prefix(&key_prefix, &mut executor)
623            .await?;
624        // Remove expired candidates
625        candidates.retain(|c| c.1 > current);
626        let mut valid_candidates = Vec::with_capacity(candidates.len());
627        for (c, _) in candidates {
628            let node_info: MetasrvNodeInfo =
629                serde_json::from_str(&c).with_context(|_| DeserializeFromJsonSnafu {
630                    input: format!("{:?}", c),
631                })?;
632            valid_candidates.push(node_info);
633        }
634        Ok(valid_candidates)
635    }
636
637    async fn campaign(&self) -> Result<()> {
638        self.maybe_init_client().await?;
639        let mut keep_alive_interval = tokio::time::interval(self.meta_lease_ttl / 2);
640        keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
641        loop {
642            self.do_campaign().await?;
643            keep_alive_interval.tick().await;
644        }
645    }
646
647    async fn reset_campaign(&self) {
648        info!("Resetting campaign");
649        if self.is_leader.load(Ordering::Relaxed) {
650            if let Err(err) = self.step_down_without_lock().await {
651                error!(err; "Failed to step down without lock");
652            }
653            info!("Step down without lock successfully, due to reset campaign");
654        }
655        if let Err(err) = self.client.lock().await.reset_client().await {
656            error!(err; "Failed to reset client");
657        }
658    }
659
660    async fn leader(&self) -> Result<Self::Leader> {
661        self.maybe_init_client().await?;
662        if self.is_leader.load(Ordering::Relaxed) {
663            Ok(self.leader_value.as_bytes().into())
664        } else {
665            let key = self.election_key();
666
667            let client = self.client.lock().await;
668            let mut executor = Executor::Default(client);
669            if let Some(lease) = self.get_value_with_lease(&key, &mut executor).await? {
670                ensure!(lease.expire_time > lease.current, NoLeaderSnafu);
671                Ok(lease.leader_value.as_bytes().into())
672            } else {
673                NoLeaderSnafu.fail()
674            }
675        }
676    }
677
678    async fn resign(&self) -> Result<()> {
679        todo!()
680    }
681
682    fn subscribe_leader_change(&self) -> broadcast::Receiver<LeaderChangeMessage> {
683        self.leader_watcher.subscribe()
684    }
685}
686
687impl MySqlElection {
688    /// Returns value, expire time and current time.
689    async fn get_value_with_lease(
690        &self,
691        key: &str,
692        executor: &mut Executor<'_>,
693    ) -> Result<Option<Lease>> {
694        let key = key.as_bytes();
695        let query = sqlx::query(&self.sql_set.get_value_with_lease).bind(key);
696        let res = executor
697            .query(query, &self.sql_set.get_value_with_lease)
698            .await?;
699
700        if res.is_empty() {
701            return Ok(None);
702        }
703        // Safety: Checked if res is empty above.
704        let current_time_str = String::from_utf8_lossy(res[0].try_get(1).unwrap());
705        let current_time = match Timestamp::from_str(&current_time_str, None) {
706            Ok(ts) => ts,
707            Err(_) => UnexpectedSnafu {
708                violated: format!("Invalid timestamp: {}", current_time_str),
709            }
710            .fail()?,
711        };
712        // Safety: Checked if res is empty above.
713        let value_and_expire_time = String::from_utf8_lossy(res[0].try_get(0).unwrap_or_default());
714        let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
715
716        Ok(Some(Lease {
717            leader_value: value,
718            expire_time,
719            current: current_time,
720            origin: value_and_expire_time.to_string(),
721        }))
722    }
723
724    /// Returns all values and expire time with the given key prefix. Also returns the current time.
725    async fn get_value_with_lease_by_prefix(
726        &self,
727        key_prefix: &str,
728        executor: &mut Executor<'_>,
729    ) -> Result<(Vec<(String, Timestamp)>, Timestamp)> {
730        let key_prefix = format!("{}%", key_prefix).as_bytes().to_vec();
731        let query = sqlx::query(&self.sql_set.get_value_with_lease_by_prefix).bind(key_prefix);
732        let res = executor
733            .query(query, &self.sql_set.get_value_with_lease_by_prefix)
734            .await?;
735
736        let mut values_with_leases = vec![];
737        let mut current = Timestamp::default();
738        for row in res {
739            let current_time_str = row.try_get(1).unwrap_or_default();
740            current = match Timestamp::from_str(current_time_str, None) {
741                Ok(ts) => ts,
742                Err(_) => UnexpectedSnafu {
743                    violated: format!("Invalid timestamp: {}", current_time_str),
744                }
745                .fail()?,
746            };
747
748            let value_and_expire_time = String::from_utf8_lossy(row.try_get(0).unwrap_or_default());
749            let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
750
751            values_with_leases.push((value, expire_time));
752        }
753        Ok((values_with_leases, current))
754    }
755
756    async fn update_value_with_lease(
757        &self,
758        key: &str,
759        prev: &str,
760        updated: &str,
761        lease_ttl: u64,
762        executor: &mut Executor<'_>,
763    ) -> Result<()> {
764        let key = key.as_bytes();
765        let prev = prev.as_bytes();
766        let updated = updated.as_bytes();
767
768        let query = sqlx::query(&self.sql_set.update_value_with_lease)
769            .bind(updated)
770            .bind(lease_ttl as f64)
771            .bind(key)
772            .bind(prev);
773        let res = executor
774            .execute(query, &self.sql_set.update_value_with_lease)
775            .await?;
776
777        ensure!(
778            res == 1,
779            UnexpectedSnafu {
780                violated: format!("Failed to update key: {}", String::from_utf8_lossy(key)),
781            }
782        );
783
784        Ok(())
785    }
786
787    /// Returns `true` if the insertion is successful
788    async fn put_value_with_lease(
789        &self,
790        key: &str,
791        value: &str,
792        lease_ttl_secs: u64,
793        executor: &mut Executor<'_>,
794    ) -> Result<bool> {
795        let key = key.as_bytes();
796        let lease_ttl_secs = lease_ttl_secs as f64;
797        let query = sqlx::query(&self.sql_set.put_value_with_lease)
798            .bind(key)
799            .bind(value)
800            .bind(lease_ttl_secs);
801        let res = executor
802            .execute(query, &self.sql_set.put_value_with_lease)
803            .await?;
804        Ok(res == 1)
805    }
806
807    /// Returns `true` if the deletion is successful.
808    /// Caution: Should only delete the key if the lease is expired.
809    async fn delete_value(&self, key: &str, executor: &mut Executor<'_>) -> Result<bool> {
810        let key = key.as_bytes();
811        let query = sqlx::query(&self.sql_set.delete_value).bind(key);
812        let res = executor.execute(query, &self.sql_set.delete_value).await?;
813
814        Ok(res == 1)
815    }
816
817    /// Attempts to acquire leadership by executing a campaign. This function continuously checks
818    /// if the current lease is still valid.
819    async fn do_campaign(&self) -> Result<()> {
820        let lease = {
821            let client = self.client.lock().await;
822            let mut executor = Executor::Default(client);
823            self.get_value_with_lease(&self.election_key(), &mut executor)
824                .await?
825        };
826
827        let is_leader = self.is_leader();
828        // If current leader value is the same as the leader value in the remote lease,
829        // it means the current leader is still valid.
830        let is_current_leader = lease
831            .as_ref()
832            .map(|lease| lease.leader_value == self.leader_value)
833            .unwrap_or(false);
834        match (self.lease_check(&lease), is_leader, is_current_leader) {
835            // If the leader lease is valid and I'm the leader, renew the lease.
836            (Ok(_), true, true) => {
837                let mut client = self.client.lock().await;
838                let txn = client.transaction().await?;
839                let mut executor = Executor::Txn(txn);
840                let query = sqlx::query(&self.sql_set.campaign);
841                executor.query(query, &self.sql_set.campaign).await?;
842                // Safety: Checked if lease is not None above.
843                self.renew_lease(executor, lease.unwrap()).await?;
844            }
845            // If the leader lease expires and I'm the leader, notify the leader watcher and step down.
846            // Another instance should be elected as the leader in this case.
847            (Err(err), true, _) => {
848                warn!(err; "Leader lease expired, step down...");
849                self.step_down_without_lock().await?;
850            }
851            (Ok(_), true, false) => {
852                warn!("Leader lease expired, step down...");
853                self.step_down_without_lock().await?;
854            }
855            // If the leader lease expires and I'm not the leader, elect myself.
856            (Err(err), false, _) => {
857                warn!(err; "Leader lease expired, elect myself.");
858                let mut client = self.client.lock().await;
859                let txn = client.transaction().await?;
860                let mut executor = Executor::Txn(txn);
861                let query = sqlx::query(&self.sql_set.campaign);
862                executor.query(query, &self.sql_set.campaign).await?;
863                self.elected(executor, lease).await?;
864            }
865            // If the leader lease is valid and I'm the leader, but I don't think I'm the leader.
866            // Just re-elect myself.
867            (Ok(_), false, true) => {
868                warn!("I should be the leader, but I don't think so. Something went wrong.");
869                let mut client = self.client.lock().await;
870                let txn = client.transaction().await?;
871                let mut executor = Executor::Txn(txn);
872                let query = sqlx::query(&self.sql_set.campaign);
873                executor.query(query, &self.sql_set.campaign).await?;
874                // Safety: Checked if lease is not None above.
875                self.renew_lease(executor, lease.unwrap()).await?;
876            }
877            // If the leader lease is valid and I'm not the leader, do nothing.
878            (Ok(_), false, false) => {}
879        }
880        Ok(())
881    }
882
883    /// Renew the lease
884    async fn renew_lease(&self, mut executor: Executor<'_>, lease: Lease) -> Result<()> {
885        let key = self.election_key();
886        self.update_value_with_lease(
887            &key,
888            &lease.origin,
889            &self.leader_value,
890            self.meta_lease_ttl.as_secs(),
891            &mut executor,
892        )
893        .await?;
894        executor.commit().await?;
895
896        if !self.is_leader() {
897            let key = self.election_key();
898            let leader_key = RdsLeaderKey {
899                name: self.leader_value.clone().into_bytes(),
900                key: key.clone().into_bytes(),
901                ..Default::default()
902            };
903            send_leader_change_and_set_flags(
904                &self.is_leader,
905                &self.leader_infancy,
906                &self.leader_watcher,
907                LeaderChangeMessage::Elected(Arc::new(leader_key)),
908            );
909        }
910
911        Ok(())
912    }
913
914    /// Performs a lease check during the election process.
915    ///
916    /// This function performs the following checks and actions:
917    ///
918    /// - **Case 1**: If the current instance is not the leader but the lease has expired, it raises an error
919    ///   to re-initiate the campaign. If the leader failed to renew the lease, its session will expire and the lock
920    ///   will be released.
921    /// - **Case 2**: If all checks pass, the function returns without performing any actions.
922    fn lease_check(&self, lease: &Option<Lease>) -> Result<Lease> {
923        let lease = lease.as_ref().context(NoLeaderSnafu)?;
924        // Case 1: Lease expired
925        ensure!(lease.expire_time > lease.current, LeaderLeaseExpiredSnafu);
926        // Case 2: Everything is fine
927        Ok(lease.clone())
928    }
929
930    /// Still consider itself as the leader locally but failed to acquire the lock. Step down without deleting the key.
931    async fn step_down_without_lock(&self) -> Result<()> {
932        let key = self.election_key().into_bytes();
933        let leader_key = RdsLeaderKey {
934            name: self.leader_value.clone().into_bytes(),
935            key: key.clone(),
936            ..Default::default()
937        };
938        send_leader_change_and_set_flags(
939            &self.is_leader,
940            &self.leader_infancy,
941            &self.leader_watcher,
942            LeaderChangeMessage::StepDown(Arc::new(leader_key)),
943        );
944        Ok(())
945    }
946
947    /// Elected as leader. The leader should put the key and notify the leader watcher.
948    /// Caution: Should only elected while holding the lock.
949    async fn elected(
950        &self,
951        mut executor: Executor<'_>,
952        expected_lease: Option<Lease>,
953    ) -> Result<()> {
954        let key = self.election_key();
955        let leader_key = RdsLeaderKey {
956            name: self.leader_value.clone().into_bytes(),
957            key: key.clone().into_bytes(),
958            ..Default::default()
959        };
960        let remote_lease = self.get_value_with_lease(&key, &mut executor).await?;
961        ensure!(
962            expected_lease.map(|lease| lease.origin) == remote_lease.map(|lease| lease.origin),
963            LeaderLeaseChangedSnafu
964        );
965        self.delete_value(&key, &mut executor).await?;
966        self.put_value_with_lease(
967            &key,
968            &self.leader_value,
969            self.meta_lease_ttl.as_secs(),
970            &mut executor,
971        )
972        .await?;
973        executor.commit().await?;
974
975        send_leader_change_and_set_flags(
976            &self.is_leader,
977            &self.leader_infancy,
978            &self.leader_watcher,
979            LeaderChangeMessage::Elected(Arc::new(leader_key)),
980        );
981        Ok(())
982    }
983}
984
985#[cfg(test)]
986mod tests {
987    use std::assert_matches::assert_matches;
988    use std::env;
989
990    use common_meta::maybe_skip_mysql_integration_test;
991    use common_telemetry::init_default_ut_logging;
992
993    use super::*;
994    use crate::error;
995    use crate::utils::mysql::create_mysql_pool;
996
997    async fn create_mysql_client(
998        table_name: Option<&str>,
999        execution_timeout: Duration,
1000        wait_timeout: Duration,
1001    ) -> Result<Mutex<ElectionMysqlClient>> {
1002        init_default_ut_logging();
1003        let endpoint = env::var("GT_MYSQL_ENDPOINTS").unwrap_or_default();
1004        if endpoint.is_empty() {
1005            return UnexpectedSnafu {
1006                violated: "MySQL endpoint is empty".to_string(),
1007            }
1008            .fail();
1009        }
1010        let pool = create_mysql_pool(&[endpoint], None).await.unwrap();
1011        let mut client = ElectionMysqlClient::new(
1012            pool,
1013            execution_timeout,
1014            execution_timeout,
1015            Duration::from_secs(1),
1016            wait_timeout,
1017            table_name.unwrap_or("default_greptime_metakv_election"),
1018        );
1019        client.maybe_init_client().await?;
1020        if table_name.is_some() {
1021            client.ensure_table_exists().await?;
1022        }
1023        Ok(Mutex::new(client))
1024    }
1025
1026    async fn drop_table(client: &Mutex<ElectionMysqlClient>, table_name: &str) {
1027        let mut client = client.lock().await;
1028        let sql = format!("DROP TABLE IF EXISTS {};", table_name);
1029        client.execute(sqlx::query(&sql), &sql).await.unwrap();
1030    }
1031
1032    #[tokio::test]
1033    async fn test_mysql_crud() {
1034        maybe_skip_mysql_integration_test!();
1035        let key = "test_key".to_string();
1036        let value = "test_value".to_string();
1037
1038        let uuid = uuid::Uuid::new_v4().to_string();
1039        let table_name = "test_mysql_crud_greptime_metakv";
1040        let candidate_lease_ttl = Duration::from_secs(10);
1041        let meta_lease_ttl = Duration::from_secs(2);
1042
1043        let execution_timeout = Duration::from_secs(10);
1044        let idle_session_timeout = Duration::from_secs(0);
1045        let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1046            .await
1047            .unwrap();
1048
1049        {
1050            let mut a = client.lock().await;
1051            let txn = a.transaction().await.unwrap();
1052            let mut executor = Executor::Txn(txn);
1053            let raw_query = format!("SELECT * FROM {} FOR UPDATE;", table_name);
1054            let query = sqlx::query(&raw_query);
1055            let _ = executor.query(query, &raw_query).await.unwrap();
1056        }
1057
1058        let (tx, _) = broadcast::channel(100);
1059        let mysql_election = MySqlElection {
1060            leader_value: "test_leader".to_string(),
1061            client,
1062            is_leader: AtomicBool::new(false),
1063            leader_infancy: AtomicBool::new(true),
1064            leader_watcher: tx,
1065            store_key_prefix: uuid,
1066            candidate_lease_ttl,
1067            meta_lease_ttl,
1068            sql_set: ElectionSqlFactory::new(table_name).build(),
1069        };
1070        let client = mysql_election.client.lock().await;
1071        let mut executor = Executor::Default(client);
1072        let res = mysql_election
1073            .put_value_with_lease(&key, &value, 10, &mut executor)
1074            .await
1075            .unwrap();
1076        assert!(res);
1077
1078        let lease = mysql_election
1079            .get_value_with_lease(&key, &mut executor)
1080            .await
1081            .unwrap()
1082            .unwrap();
1083        assert_eq!(lease.leader_value, value);
1084
1085        mysql_election
1086            .update_value_with_lease(&key, &lease.origin, &value, 10, &mut executor)
1087            .await
1088            .unwrap();
1089
1090        let res = mysql_election
1091            .delete_value(&key, &mut executor)
1092            .await
1093            .unwrap();
1094        assert!(res);
1095
1096        let res = mysql_election
1097            .get_value_with_lease(&key, &mut executor)
1098            .await
1099            .unwrap();
1100        assert!(res.is_none());
1101
1102        for i in 0..10 {
1103            let key = format!("test_key_{}", i);
1104            let value = format!("test_value_{}", i);
1105            mysql_election
1106                .put_value_with_lease(&key, &value, 10, &mut executor)
1107                .await
1108                .unwrap();
1109        }
1110
1111        let key_prefix = "test_key".to_string();
1112        let (res, _) = mysql_election
1113            .get_value_with_lease_by_prefix(&key_prefix, &mut executor)
1114            .await
1115            .unwrap();
1116        assert_eq!(res.len(), 10);
1117
1118        for i in 0..10 {
1119            let key = format!("test_key_{}", i);
1120            let res = mysql_election
1121                .delete_value(&key, &mut executor)
1122                .await
1123                .unwrap();
1124            assert!(res);
1125        }
1126
1127        let (res, current) = mysql_election
1128            .get_value_with_lease_by_prefix(&key_prefix, &mut executor)
1129            .await
1130            .unwrap();
1131        assert!(res.is_empty());
1132        assert!(current == Timestamp::default());
1133
1134        // Should drop manually.
1135        std::mem::drop(executor);
1136        drop_table(&mysql_election.client, table_name).await;
1137    }
1138
1139    async fn candidate(
1140        leader_value: String,
1141        candidate_lease_ttl: Duration,
1142        store_key_prefix: String,
1143        table_name: String,
1144    ) {
1145        let meta_lease_ttl = Duration::from_secs(2);
1146        let execution_timeout = Duration::from_secs(10);
1147        let idle_session_timeout = Duration::from_secs(0);
1148        let client =
1149            create_mysql_client(Some(&table_name), execution_timeout, idle_session_timeout)
1150                .await
1151                .unwrap();
1152
1153        let (tx, _) = broadcast::channel(100);
1154        let mysql_election = MySqlElection {
1155            leader_value,
1156            client,
1157            is_leader: AtomicBool::new(false),
1158            leader_infancy: AtomicBool::new(true),
1159            leader_watcher: tx,
1160            store_key_prefix,
1161            candidate_lease_ttl,
1162            meta_lease_ttl,
1163            sql_set: ElectionSqlFactory::new(&table_name).build(),
1164        };
1165
1166        let node_info = MetasrvNodeInfo {
1167            addr: "test_addr".to_string(),
1168            version: "test_version".to_string(),
1169            git_commit: "test_git_commit".to_string(),
1170            start_time_ms: 0,
1171            total_cpu_millicores: 0,
1172            total_memory_bytes: 0,
1173            cpu_usage_millicores: 0,
1174            memory_usage_bytes: 0,
1175            hostname: "test_hostname".to_string(),
1176        };
1177        mysql_election.register_candidate(&node_info).await.unwrap();
1178    }
1179
1180    #[tokio::test]
1181    async fn test_candidate_registration() {
1182        maybe_skip_mysql_integration_test!();
1183        let leader_value_prefix = "test_leader".to_string();
1184        let candidate_lease_ttl = Duration::from_secs(2);
1185        let execution_timeout = Duration::from_secs(10);
1186        let meta_lease_ttl = Duration::from_secs(2);
1187        let idle_session_timeout = Duration::from_secs(0);
1188        let uuid = uuid::Uuid::new_v4().to_string();
1189        let table_name = "test_candidate_registration_greptime_metakv";
1190        let mut handles = vec![];
1191        let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1192            .await
1193            .unwrap();
1194
1195        for i in 0..10 {
1196            let leader_value = format!("{}{}", leader_value_prefix, i);
1197            let handle = tokio::spawn(candidate(
1198                leader_value,
1199                candidate_lease_ttl,
1200                uuid.clone(),
1201                table_name.to_string(),
1202            ));
1203            handles.push(handle);
1204        }
1205        // Wait for candidates to register themselves and renew their leases at least once.
1206        tokio::time::sleep(candidate_lease_ttl / 2 + Duration::from_secs(1)).await;
1207
1208        let (tx, _) = broadcast::channel(100);
1209        let leader_value = "test_leader".to_string();
1210        let mysql_election = MySqlElection {
1211            leader_value,
1212            client,
1213            is_leader: AtomicBool::new(false),
1214            leader_infancy: AtomicBool::new(true),
1215            leader_watcher: tx,
1216            store_key_prefix: uuid.clone(),
1217            candidate_lease_ttl,
1218            meta_lease_ttl,
1219            sql_set: ElectionSqlFactory::new(table_name).build(),
1220        };
1221
1222        let candidates = mysql_election.all_candidates().await.unwrap();
1223        assert_eq!(candidates.len(), 10);
1224
1225        for handle in handles {
1226            handle.abort();
1227        }
1228
1229        // Wait for the candidate leases to expire.
1230        tokio::time::sleep(candidate_lease_ttl + Duration::from_secs(1)).await;
1231        let candidates = mysql_election.all_candidates().await.unwrap();
1232        assert!(candidates.is_empty());
1233
1234        // Garbage collection
1235        let client = mysql_election.client.lock().await;
1236        let mut executor = Executor::Default(client);
1237        for i in 0..10 {
1238            let key = format!("{}{}{}{}", uuid, CANDIDATES_ROOT, leader_value_prefix, i);
1239            let res = mysql_election
1240                .delete_value(&key, &mut executor)
1241                .await
1242                .unwrap();
1243            assert!(res);
1244        }
1245
1246        // Should drop manually.
1247        std::mem::drop(executor);
1248        drop_table(&mysql_election.client, table_name).await;
1249    }
1250
1251    async fn elected(
1252        election: &MySqlElection,
1253        table_name: &str,
1254        expected_lease: Option<Lease>,
1255    ) -> Result<()> {
1256        let mut client = election.client.lock().await;
1257        let txn = client.transaction().await.unwrap();
1258        let mut executor = Executor::Txn(txn);
1259        let raw_query = format!("SELECT * FROM {} FOR UPDATE;", table_name);
1260        let query = sqlx::query(&raw_query);
1261        let _ = executor.query(query, &raw_query).await.unwrap();
1262        election.elected(executor, expected_lease).await
1263    }
1264
1265    async fn get_lease(election: &MySqlElection) -> Option<Lease> {
1266        let client = election.client.lock().await;
1267        let mut executor = Executor::Default(client);
1268        election
1269            .get_value_with_lease(&election.election_key(), &mut executor)
1270            .await
1271            .unwrap()
1272    }
1273
1274    #[tokio::test]
1275    async fn test_elected_with_incorrect_lease_fails() {
1276        maybe_skip_mysql_integration_test!();
1277        let leader_value = "test_leader".to_string();
1278        let candidate_lease_ttl = Duration::from_secs(5);
1279        let meta_lease_ttl = Duration::from_secs(2);
1280        let execution_timeout = Duration::from_secs(10);
1281        let idle_session_timeout = Duration::from_secs(0);
1282        let uuid = uuid::Uuid::new_v4().to_string();
1283        let table_name = "test_elected_failed_greptime_metakv";
1284        let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1285            .await
1286            .unwrap();
1287
1288        let (tx, _) = broadcast::channel(100);
1289        let leader_mysql_election = MySqlElection {
1290            leader_value: leader_value.clone(),
1291            client,
1292            is_leader: AtomicBool::new(false),
1293            leader_infancy: AtomicBool::new(true),
1294            leader_watcher: tx,
1295            store_key_prefix: uuid,
1296            candidate_lease_ttl,
1297            meta_lease_ttl,
1298            sql_set: ElectionSqlFactory::new(table_name).build(),
1299        };
1300
1301        let incorrect_lease = Lease::default();
1302        let err = elected(&leader_mysql_election, table_name, Some(incorrect_lease))
1303            .await
1304            .unwrap_err();
1305        assert_matches!(err, error::Error::LeaderLeaseChanged { .. });
1306        let lease = get_lease(&leader_mysql_election).await;
1307        assert!(lease.is_none());
1308        drop_table(&leader_mysql_election.client, table_name).await;
1309    }
1310
1311    #[tokio::test]
1312    async fn test_reelection_with_idle_session_timeout() {
1313        maybe_skip_mysql_integration_test!();
1314        let leader_value = "test_leader".to_string();
1315        let uuid = uuid::Uuid::new_v4().to_string();
1316        let table_name = "test_reelection_greptime_metakv";
1317        let candidate_lease_ttl = Duration::from_secs(5);
1318        let meta_lease_ttl = Duration::from_secs(5);
1319        let execution_timeout = Duration::from_secs(10);
1320        let idle_session_timeout = Duration::from_secs(2);
1321        let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1322            .await
1323            .unwrap();
1324
1325        let (tx, _) = broadcast::channel(100);
1326        let leader_mysql_election = MySqlElection {
1327            leader_value: leader_value.clone(),
1328            client,
1329            is_leader: AtomicBool::new(false),
1330            leader_infancy: AtomicBool::new(true),
1331            leader_watcher: tx,
1332            store_key_prefix: uuid,
1333            candidate_lease_ttl,
1334            meta_lease_ttl,
1335            sql_set: ElectionSqlFactory::new(table_name).build(),
1336        };
1337
1338        elected(&leader_mysql_election, table_name, None)
1339            .await
1340            .unwrap();
1341        let lease = get_lease(&leader_mysql_election).await.unwrap();
1342        assert_eq!(lease.leader_value, leader_value);
1343        assert!(lease.expire_time > lease.current);
1344        assert!(leader_mysql_election.is_leader());
1345        // Wait for mysql server close the inactive connection.
1346        tokio::time::sleep(Duration::from_millis(2100)).await;
1347        // Should be failed.
1348        leader_mysql_election
1349            .client
1350            .lock()
1351            .await
1352            .query(sqlx::query("SELECT 1"), "SELECT 1")
1353            .await
1354            .unwrap_err();
1355        // Reset the client.
1356        leader_mysql_election
1357            .client
1358            .lock()
1359            .await
1360            .reset_client()
1361            .await
1362            .unwrap();
1363
1364        // Should able to re-elected.
1365        elected(&leader_mysql_election, table_name, Some(lease.clone()))
1366            .await
1367            .unwrap();
1368        let lease = get_lease(&leader_mysql_election).await.unwrap();
1369        assert_eq!(lease.leader_value, leader_value);
1370        assert!(lease.expire_time > lease.current);
1371        assert!(leader_mysql_election.is_leader());
1372        drop_table(&leader_mysql_election.client, table_name).await;
1373    }
1374
1375    #[tokio::test]
1376    async fn test_elected_and_step_down() {
1377        maybe_skip_mysql_integration_test!();
1378        let leader_value = "test_leader".to_string();
1379        let candidate_lease_ttl = Duration::from_secs(5);
1380        let meta_lease_ttl = Duration::from_secs(2);
1381        let execution_timeout = Duration::from_secs(10);
1382        let idle_session_timeout = Duration::from_secs(0);
1383        let uuid = uuid::Uuid::new_v4().to_string();
1384        let table_name = "test_elected_and_step_down_greptime_metakv";
1385        let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1386            .await
1387            .unwrap();
1388
1389        let (tx, mut rx) = broadcast::channel(100);
1390        let leader_mysql_election = MySqlElection {
1391            leader_value: leader_value.clone(),
1392            client,
1393            is_leader: AtomicBool::new(false),
1394            leader_infancy: AtomicBool::new(true),
1395            leader_watcher: tx,
1396            store_key_prefix: uuid,
1397            candidate_lease_ttl,
1398            meta_lease_ttl,
1399            sql_set: ElectionSqlFactory::new(table_name).build(),
1400        };
1401
1402        elected(&leader_mysql_election, table_name, None)
1403            .await
1404            .unwrap();
1405        let lease = get_lease(&leader_mysql_election).await.unwrap();
1406        assert_eq!(lease.leader_value, leader_value);
1407        assert!(lease.expire_time > lease.current);
1408        assert!(leader_mysql_election.is_leader());
1409
1410        match rx.recv().await {
1411            Ok(LeaderChangeMessage::Elected(key)) => {
1412                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1413                assert_eq!(
1414                    String::from_utf8_lossy(key.key()),
1415                    leader_mysql_election.election_key()
1416                );
1417                assert_eq!(key.lease_id(), i64::default());
1418                assert_eq!(key.revision(), i64::default());
1419            }
1420            _ => panic!("Expected LeaderChangeMessage::Elected"),
1421        }
1422
1423        leader_mysql_election
1424            .step_down_without_lock()
1425            .await
1426            .unwrap();
1427        let lease = get_lease(&leader_mysql_election).await.unwrap();
1428        assert_eq!(lease.leader_value, leader_value);
1429        assert!(!leader_mysql_election.is_leader());
1430
1431        match rx.recv().await {
1432            Ok(LeaderChangeMessage::StepDown(key)) => {
1433                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1434                assert_eq!(
1435                    String::from_utf8_lossy(key.key()),
1436                    leader_mysql_election.election_key()
1437                );
1438                assert_eq!(key.lease_id(), i64::default());
1439                assert_eq!(key.revision(), i64::default());
1440            }
1441            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1442        }
1443
1444        elected(&leader_mysql_election, table_name, Some(lease.clone()))
1445            .await
1446            .unwrap();
1447        let lease = get_lease(&leader_mysql_election).await.unwrap();
1448        assert_eq!(lease.leader_value, leader_value);
1449        assert!(lease.expire_time > lease.current);
1450        assert!(leader_mysql_election.is_leader());
1451
1452        match rx.recv().await {
1453            Ok(LeaderChangeMessage::Elected(key)) => {
1454                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1455                assert_eq!(
1456                    String::from_utf8_lossy(key.key()),
1457                    leader_mysql_election.election_key()
1458                );
1459                assert_eq!(key.lease_id(), i64::default());
1460                assert_eq!(key.revision(), i64::default());
1461            }
1462            _ => panic!("Expected LeaderChangeMessage::Elected"),
1463        }
1464
1465        drop_table(&leader_mysql_election.client, table_name).await;
1466    }
1467
1468    #[tokio::test]
1469    async fn test_campaign() {
1470        maybe_skip_mysql_integration_test!();
1471        let leader_value = "test_leader".to_string();
1472        let uuid = uuid::Uuid::new_v4().to_string();
1473        let table_name = "test_leader_action_greptime_metakv";
1474        let candidate_lease_ttl = Duration::from_secs(5);
1475        let meta_lease_ttl = Duration::from_secs(2);
1476        let execution_timeout = Duration::from_secs(10);
1477        let idle_session_timeout = Duration::from_secs(0);
1478        let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1479            .await
1480            .unwrap();
1481
1482        let (tx, mut rx) = broadcast::channel(100);
1483        let leader_mysql_election = MySqlElection {
1484            leader_value: leader_value.clone(),
1485            client,
1486            is_leader: AtomicBool::new(false),
1487            leader_infancy: AtomicBool::new(true),
1488            leader_watcher: tx,
1489            store_key_prefix: uuid,
1490            candidate_lease_ttl,
1491            meta_lease_ttl,
1492            sql_set: ElectionSqlFactory::new(table_name).build(),
1493        };
1494
1495        // Step 1: No leader exists, campaign and elected.
1496        leader_mysql_election.do_campaign().await.unwrap();
1497        let lease = get_lease(&leader_mysql_election).await.unwrap();
1498        assert_eq!(lease.leader_value, leader_value);
1499        assert!(lease.expire_time > lease.current);
1500        assert!(leader_mysql_election.is_leader());
1501
1502        match rx.recv().await {
1503            Ok(LeaderChangeMessage::Elected(key)) => {
1504                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1505                assert_eq!(
1506                    String::from_utf8_lossy(key.key()),
1507                    leader_mysql_election.election_key()
1508                );
1509                assert_eq!(key.lease_id(), i64::default());
1510                assert_eq!(key.revision(), i64::default());
1511            }
1512            _ => panic!("Expected LeaderChangeMessage::Elected"),
1513        }
1514
1515        // Step 2: As a leader, renew the lease.
1516        leader_mysql_election.do_campaign().await.unwrap();
1517        let new_lease = get_lease(&leader_mysql_election).await.unwrap();
1518        assert_eq!(lease.leader_value, leader_value);
1519        // The lease should be renewed.
1520        assert!(new_lease.expire_time > lease.expire_time);
1521        assert!(new_lease.expire_time > new_lease.current);
1522        assert!(leader_mysql_election.is_leader());
1523
1524        // Step 3: Something wrong, the leader lease expired.
1525        tokio::time::sleep(meta_lease_ttl + Duration::from_secs(1)).await;
1526        leader_mysql_election.do_campaign().await.unwrap();
1527        let lease = get_lease(&leader_mysql_election).await.unwrap();
1528        assert_eq!(lease.leader_value, leader_value);
1529        assert!(lease.expire_time <= lease.current);
1530        assert!(!leader_mysql_election.is_leader());
1531
1532        match rx.recv().await {
1533            Ok(LeaderChangeMessage::StepDown(key)) => {
1534                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1535                assert_eq!(
1536                    String::from_utf8_lossy(key.key()),
1537                    leader_mysql_election.election_key()
1538                );
1539                assert_eq!(key.lease_id(), i64::default());
1540                assert_eq!(key.revision(), i64::default());
1541            }
1542            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1543        }
1544
1545        // Step 4: Re-elect itself.
1546        leader_mysql_election.do_campaign().await.unwrap();
1547        let lease = get_lease(&leader_mysql_election).await.unwrap();
1548        assert_eq!(lease.leader_value, leader_value);
1549        assert!(lease.expire_time > lease.current);
1550        assert!(leader_mysql_election.is_leader());
1551
1552        match rx.recv().await {
1553            Ok(LeaderChangeMessage::Elected(key)) => {
1554                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1555                assert_eq!(
1556                    String::from_utf8_lossy(key.key()),
1557                    leader_mysql_election.election_key()
1558                );
1559                assert_eq!(key.lease_id(), i64::default());
1560                assert_eq!(key.revision(), i64::default());
1561            }
1562            _ => panic!("Expected LeaderChangeMessage::Elected"),
1563        }
1564
1565        // Step 5: Something wrong, the leader key is deleted by other followers.
1566        {
1567            let client = leader_mysql_election.client.lock().await;
1568            let mut executor = Executor::Default(client);
1569            leader_mysql_election
1570                .delete_value(&leader_mysql_election.election_key(), &mut executor)
1571                .await
1572                .unwrap();
1573        }
1574        leader_mysql_election.do_campaign().await.unwrap();
1575        let res = get_lease(&leader_mysql_election).await;
1576        assert!(res.is_none());
1577        assert!(!leader_mysql_election.is_leader());
1578
1579        match rx.recv().await {
1580            Ok(LeaderChangeMessage::StepDown(key)) => {
1581                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1582                assert_eq!(
1583                    String::from_utf8_lossy(key.key()),
1584                    leader_mysql_election.election_key()
1585                );
1586                assert_eq!(key.lease_id(), i64::default());
1587                assert_eq!(key.revision(), i64::default());
1588            }
1589            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1590        }
1591
1592        // Step 6: Re-elect itself.
1593        leader_mysql_election.do_campaign().await.unwrap();
1594        let lease = get_lease(&leader_mysql_election).await.unwrap();
1595        assert_eq!(lease.leader_value, leader_value);
1596        assert!(lease.expire_time > lease.current);
1597        assert!(leader_mysql_election.is_leader());
1598
1599        match rx.recv().await {
1600            Ok(LeaderChangeMessage::Elected(key)) => {
1601                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1602                assert_eq!(
1603                    String::from_utf8_lossy(key.key()),
1604                    leader_mysql_election.election_key()
1605                );
1606                assert_eq!(key.lease_id(), i64::default());
1607                assert_eq!(key.revision(), i64::default());
1608            }
1609            _ => panic!("Expected LeaderChangeMessage::Elected"),
1610        }
1611
1612        // Step 7: Something wrong, the leader key changed by others.
1613        let another_leader_key = "another_leader";
1614        {
1615            let client = leader_mysql_election.client.lock().await;
1616            let mut executor = Executor::Default(client);
1617            leader_mysql_election
1618                .delete_value(&leader_mysql_election.election_key(), &mut executor)
1619                .await
1620                .unwrap();
1621            leader_mysql_election
1622                .put_value_with_lease(
1623                    &leader_mysql_election.election_key(),
1624                    another_leader_key,
1625                    10,
1626                    &mut executor,
1627                )
1628                .await
1629                .unwrap();
1630        }
1631        leader_mysql_election.do_campaign().await.unwrap();
1632        let lease = get_lease(&leader_mysql_election).await.unwrap();
1633        // Different from pg, mysql will not delete the key, just step down.
1634        assert_eq!(lease.leader_value, another_leader_key);
1635        assert!(lease.expire_time > lease.current);
1636        assert!(!leader_mysql_election.is_leader());
1637
1638        match rx.recv().await {
1639            Ok(LeaderChangeMessage::StepDown(key)) => {
1640                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1641                assert_eq!(
1642                    String::from_utf8_lossy(key.key()),
1643                    leader_mysql_election.election_key()
1644                );
1645                assert_eq!(key.lease_id(), i64::default());
1646                assert_eq!(key.revision(), i64::default());
1647            }
1648            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1649        }
1650
1651        drop_table(&leader_mysql_election.client, table_name).await;
1652    }
1653
1654    #[tokio::test]
1655    async fn test_follower_action() {
1656        maybe_skip_mysql_integration_test!();
1657        common_telemetry::init_default_ut_logging();
1658        let candidate_lease_ttl = Duration::from_secs(5);
1659        let meta_lease_ttl = Duration::from_secs(1);
1660        let execution_timeout = Duration::from_secs(10);
1661        let idle_session_timeout = Duration::from_secs(0);
1662        let uuid = uuid::Uuid::new_v4().to_string();
1663        let table_name = "test_follower_action_greptime_metakv";
1664
1665        let follower_client =
1666            create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1667                .await
1668                .unwrap();
1669        let (tx, mut rx) = broadcast::channel(100);
1670        let follower_mysql_election = MySqlElection {
1671            leader_value: "test_follower".to_string(),
1672            client: follower_client,
1673            is_leader: AtomicBool::new(false),
1674            leader_infancy: AtomicBool::new(true),
1675            leader_watcher: tx,
1676            store_key_prefix: uuid.clone(),
1677            candidate_lease_ttl,
1678            meta_lease_ttl,
1679            sql_set: ElectionSqlFactory::new(table_name).build(),
1680        };
1681
1682        let leader_client =
1683            create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1684                .await
1685                .unwrap();
1686        let (tx, _) = broadcast::channel(100);
1687        let leader_mysql_election = MySqlElection {
1688            leader_value: "test_leader".to_string(),
1689            client: leader_client,
1690            is_leader: AtomicBool::new(false),
1691            leader_infancy: AtomicBool::new(true),
1692            leader_watcher: tx,
1693            store_key_prefix: uuid,
1694            candidate_lease_ttl,
1695            meta_lease_ttl,
1696            sql_set: ElectionSqlFactory::new(table_name).build(),
1697        };
1698
1699        leader_mysql_election.do_campaign().await.unwrap();
1700
1701        // Step 1: As a follower, the leader exists and the lease is not expired. Do nothing.
1702        follower_mysql_election.do_campaign().await.unwrap();
1703
1704        // Step 2: As a follower, the leader exists but the lease expired. Re-elect itself.
1705        tokio::time::sleep(meta_lease_ttl + Duration::from_secs(1)).await;
1706        follower_mysql_election.do_campaign().await.unwrap();
1707        assert!(follower_mysql_election.is_leader());
1708
1709        match rx.recv().await {
1710            Ok(LeaderChangeMessage::Elected(key)) => {
1711                assert_eq!(String::from_utf8_lossy(key.name()), "test_follower");
1712                assert_eq!(
1713                    String::from_utf8_lossy(key.key()),
1714                    follower_mysql_election.election_key()
1715                );
1716                assert_eq!(key.lease_id(), i64::default());
1717                assert_eq!(key.revision(), i64::default());
1718            }
1719            _ => panic!("Expected LeaderChangeMessage::Elected"),
1720        }
1721
1722        drop_table(&follower_mysql_election.client, table_name).await;
1723    }
1724
1725    #[tokio::test]
1726    async fn test_wait_timeout() {
1727        maybe_skip_mysql_integration_test!();
1728        common_telemetry::init_default_ut_logging();
1729        let execution_timeout = Duration::from_secs(10);
1730        let idle_session_timeout = Duration::from_secs(1);
1731
1732        let client = create_mysql_client(None, execution_timeout, idle_session_timeout)
1733            .await
1734            .unwrap();
1735        tokio::time::sleep(Duration::from_millis(1100)).await;
1736        // Wait for the idle session timeout.
1737        let err = client
1738            .lock()
1739            .await
1740            .query(sqlx::query("SELECT 1"), "SELECT 1")
1741            .await
1742            .unwrap_err();
1743        assert_matches!(err, error::Error::MySqlExecution { .. });
1744        // Reset the client and try again.
1745        client.lock().await.reset_client().await.unwrap();
1746        let _ = client
1747            .lock()
1748            .await
1749            .query(sqlx::query("SELECT 1"), "SELECT 1")
1750            .await
1751            .unwrap();
1752    }
1753}