meta_srv/election/rds/
mysql.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::Arc;
17use std::time::Duration;
18
19use common_telemetry::{error, warn};
20use common_time::Timestamp;
21use snafu::{ensure, OptionExt, ResultExt};
22use sqlx::mysql::{MySqlArguments, MySqlRow};
23use sqlx::pool::PoolConnection;
24use sqlx::query::Query;
25use sqlx::{MySql, MySqlPool, MySqlTransaction, Row};
26use tokio::sync::{broadcast, Mutex, MutexGuard};
27use tokio::time::MissedTickBehavior;
28
29use crate::election::rds::{parse_value_and_expire_time, Lease, RdsLeaderKey, LEASE_SEP};
30use crate::election::{
31    listen_leader_change, send_leader_change_and_set_flags, Election, LeaderChangeMessage,
32    CANDIDATES_ROOT, ELECTION_KEY,
33};
34use crate::error::{
35    AcquireMySqlClientSnafu, DecodeSqlValueSnafu, DeserializeFromJsonSnafu,
36    LeaderLeaseChangedSnafu, LeaderLeaseExpiredSnafu, MySqlExecutionSnafu, NoLeaderSnafu, Result,
37    SerializeToJsonSnafu, SqlExecutionTimeoutSnafu, UnexpectedSnafu,
38};
39use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};
40
41struct ElectionSqlFactory<'a> {
42    table_name: &'a str,
43}
44
45struct ElectionSqlSet {
46    campaign: String,
47    // SQL to put a value with expire time.
48    //
49    // Parameters for the query:
50    // `$1`: key,
51    // `$2`: value,
52    // `$3`: lease time in seconds
53    //
54    // Returns:
55    // If the key already exists, return the previous value.
56    put_value_with_lease: String,
57    // SQL to update a value with expire time.
58    //
59    // Parameters for the query:
60    // `$1`: updated value,
61    // `$2`: lease time in seconds
62    // `$3`: key,
63    // `$4`: previous value,
64    update_value_with_lease: String,
65    // SQL to get a value with expire time.
66    //
67    // Parameters:
68    // `$1`: key
69    get_value_with_lease: String,
70    // SQL to get all values with expire time with the given key prefix.
71    //
72    // Parameters:
73    // `$1`: key prefix like 'prefix%'
74    //
75    // Returns:
76    // column 0: value,
77    // column 1: current timestamp
78    get_value_with_lease_by_prefix: String,
79    // SQL to delete a value.
80    //
81    // Parameters:
82    // `?`: key
83    //
84    // Returns:
85    // Rows affected
86    delete_value: String,
87}
88
89impl<'a> ElectionSqlFactory<'a> {
90    fn new(table_name: &'a str) -> Self {
91        Self { table_name }
92    }
93
94    fn build(self) -> ElectionSqlSet {
95        ElectionSqlSet {
96            campaign: self.campaign_sql(),
97            put_value_with_lease: self.put_value_with_lease_sql(),
98            update_value_with_lease: self.update_value_with_lease_sql(),
99            get_value_with_lease: self.get_value_with_lease_sql(),
100            get_value_with_lease_by_prefix: self.get_value_with_lease_by_prefix_sql(),
101            delete_value: self.delete_value_sql(),
102        }
103    }
104
105    /// Use `SELECT FOR UPDATE` to lock for compatibility with other MySQL-compatible databases
106    /// instead of directly using `GET_LOCK`.
107    fn campaign_sql(&self) -> String {
108        format!("SELECT * FROM `{}` FOR UPDATE;", self.table_name)
109    }
110
111    fn put_value_with_lease_sql(&self) -> String {
112        format!(
113            r#"
114            INSERT INTO `{}` (k, v) VALUES (
115                ?,
116                CONCAT(
117                    ?,
118                    '{}',
119                    DATE_FORMAT(DATE_ADD(NOW(4), INTERVAL ? SECOND), '%Y-%m-%d %T.%f')
120                )
121            )
122            ON DUPLICATE KEY UPDATE v = VALUES(v);
123            "#,
124            self.table_name, LEASE_SEP
125        )
126    }
127
128    fn update_value_with_lease_sql(&self) -> String {
129        format!(
130            r#"UPDATE `{}`
131               SET v = CONCAT(?, '{}', DATE_FORMAT(DATE_ADD(NOW(4), INTERVAL ? SECOND), '%Y-%m-%d %T.%f'))
132               WHERE k = ? AND v = ?"#,
133            self.table_name, LEASE_SEP
134        )
135    }
136
137    fn get_value_with_lease_sql(&self) -> String {
138        format!(
139            r#"SELECT v, DATE_FORMAT(NOW(4), '%Y-%m-%d %T.%f') FROM `{}` WHERE k = ?"#,
140            self.table_name
141        )
142    }
143
144    fn get_value_with_lease_by_prefix_sql(&self) -> String {
145        format!(
146            r#"SELECT v, DATE_FORMAT(NOW(4), '%Y-%m-%d %T.%f') FROM `{}` WHERE k LIKE ?"#,
147            self.table_name
148        )
149    }
150
151    fn delete_value_sql(&self) -> String {
152        format!("DELETE FROM {} WHERE k = ?;", self.table_name)
153    }
154}
155
156enum Executor<'a> {
157    Default(MutexGuard<'a, ElectionMysqlClient>),
158    Txn(TransactionWithExecutionTimeout<'a>),
159}
160
161impl Executor<'_> {
162    async fn query(
163        &mut self,
164        query: Query<'_, MySql, MySqlArguments>,
165        sql: &str,
166    ) -> Result<Vec<MySqlRow>> {
167        match self {
168            Executor::Default(client) => client.query(query, sql).await,
169            Executor::Txn(txn) => txn.query(query, sql).await,
170        }
171    }
172
173    async fn execute(&mut self, query: Query<'_, MySql, MySqlArguments>, sql: &str) -> Result<u64> {
174        match self {
175            Executor::Default(client) => client.execute(query, sql).await,
176            Executor::Txn(txn) => txn.execute(query, sql).await,
177        }
178    }
179
180    async fn commit(self) -> Result<()> {
181        match self {
182            Executor::Txn(txn) => txn.commit().await,
183            _ => Ok(()),
184        }
185    }
186}
187
188/// MySQL client for election.
189pub struct ElectionMysqlClient {
190    current: Option<PoolConnection<MySql>>,
191    pool: MySqlPool,
192
193    /// The client-side timeout for statement execution.
194    ///
195    /// This timeout is enforced by the client application and is independent of any server-side timeouts.
196    /// If a statement takes longer than this duration to execute, the client will abort the operation.
197    execution_timeout: Duration,
198
199    /// The maximum execution time for the statement.
200    ///
201    /// This timeout is enforced by the server and is independent of any client-side timeouts.
202    /// If a statement takes longer than this duration to execute, the server will abort the operation.
203    max_execution_time: Duration,
204
205    /// The lock wait timeout for the session.
206    ///
207    /// This timeout determines how long the server waits for a lock to be acquired before timing out.
208    /// If a lock cannot be acquired within this duration, the server will abort the operation.
209    innode_lock_wait_timeout: Duration,
210
211    /// The wait timeout for the session.
212    ///
213    /// This timeout determines how long the server waits for activity on a noninteractive connection
214    /// before closing it. If a connection is idle for longer than this duration, the server will
215    /// terminate it.
216    wait_timeout: Duration,
217
218    /// The table name for election.
219    table_for_election: String,
220}
221
222impl ElectionMysqlClient {
223    pub fn new(
224        pool: MySqlPool,
225        execution_timeout: Duration,
226        max_execution_time: Duration,
227        innode_lock_wait_timeout: Duration,
228        wait_timeout: Duration,
229        table_for_election: &str,
230    ) -> Self {
231        Self {
232            current: None,
233            pool,
234            execution_timeout,
235            max_execution_time,
236            innode_lock_wait_timeout,
237            wait_timeout,
238            table_for_election: table_for_election.to_string(),
239        }
240    }
241
242    fn create_table_sql(&self) -> String {
243        format!(
244            r#"
245            CREATE TABLE IF NOT EXISTS `{}` (
246                k VARBINARY(3072) PRIMARY KEY,
247                v BLOB
248            );"#,
249            self.table_for_election
250        )
251    }
252
253    fn insert_once_sql(&self) -> String {
254        format!(
255            "INSERT IGNORE INTO `{}` (k, v) VALUES ('__place_holder_for_lock', '');",
256            self.table_for_election
257        )
258    }
259
260    fn check_version_sql(&self) -> String {
261        "SELECT @@version;".to_string()
262    }
263
264    async fn reset_client(&mut self) -> Result<()> {
265        self.current = None;
266        self.maybe_init_client().await
267    }
268
269    async fn ensure_table_exists(&mut self) -> Result<()> {
270        let create_table_sql = self.create_table_sql();
271        let query = sqlx::query(&create_table_sql);
272        self.execute(query, &create_table_sql).await?;
273        // Insert at least one row for `SELECT * FOR UPDATE` to work.
274        let insert_once_sql = self.insert_once_sql();
275        let query = sqlx::query(&insert_once_sql);
276        self.execute(query, &insert_once_sql).await?;
277        Ok(())
278    }
279
280    async fn maybe_init_client(&mut self) -> Result<()> {
281        if self.current.is_none() {
282            let client = self.pool.acquire().await.context(AcquireMySqlClientSnafu)?;
283
284            self.current = Some(client);
285            let (query, sql) = if !self.wait_timeout.is_zero() {
286                let sql = "SET SESSION wait_timeout = ?, innodb_lock_wait_timeout = ?, max_execution_time = ?;";
287                (
288                    sqlx::query(sql)
289                        .bind(self.wait_timeout.as_secs())
290                        .bind(self.innode_lock_wait_timeout.as_secs())
291                        .bind(self.max_execution_time.as_millis() as u64),
292                    sql,
293                )
294            } else {
295                let sql = "SET SESSION innodb_lock_wait_timeout = ?, max_execution_time = ?;";
296                (
297                    sqlx::query(sql)
298                        .bind(self.innode_lock_wait_timeout.as_secs())
299                        .bind(self.max_execution_time.as_millis() as u64),
300                    sql,
301                )
302            };
303            self.set_session_isolation_level().await?;
304            self.execute(query, sql).await?;
305            self.check_version(&self.check_version_sql()).await?;
306        }
307        Ok(())
308    }
309
310    /// Set session isolation level to serializable.
311    ///
312    /// # Panics
313    /// if `current` is `None`.
314    async fn set_session_isolation_level(&mut self) -> Result<()> {
315        let sql = "SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE";
316        let query = sqlx::query(sql);
317        self.execute(query, sql).await?;
318        Ok(())
319    }
320
321    /// Check if the MySQL version is supported.
322    ///
323    /// # Panics
324    /// if `current` is `None`.
325    async fn check_version(&mut self, sql: &str) -> Result<()> {
326        // Check if the MySQL version is supported.
327        let query = sqlx::query(sql);
328        // Safety: `maybe_init_client` ensures `current` is not `None`.
329        let client = self.current.as_mut().unwrap();
330        let row = tokio::time::timeout(self.execution_timeout, query.fetch_one(&mut **client))
331            .await
332            .map_err(|_| {
333                SqlExecutionTimeoutSnafu {
334                    sql,
335                    duration: self.execution_timeout,
336                }
337                .build()
338            })?;
339        match row {
340            Ok(row) => {
341                let version: String = row.try_get(0).context(DecodeSqlValueSnafu {})?;
342                if !version.starts_with("8.0") && !version.starts_with("5.7") {
343                    warn!(
344                        "Unsupported MySQL version: {}, expected: [5.7, 8.0]",
345                        version
346                    );
347                }
348            }
349            Err(e) => {
350                warn!(e; "Failed to check MySQL version through sql: {}", sql);
351            }
352        }
353        Ok(())
354    }
355
356    /// Returns the result of the query.
357    ///
358    /// # Panics
359    /// if `current` is `None`.
360    async fn execute(&mut self, query: Query<'_, MySql, MySqlArguments>, sql: &str) -> Result<u64> {
361        // Safety: `maybe_init_client` ensures `current` is not `None`.
362        let client = self.current.as_mut().unwrap();
363        let future = query.execute(&mut **client);
364        let rows_affected = tokio::time::timeout(self.execution_timeout, future)
365            .await
366            .map_err(|_| {
367                SqlExecutionTimeoutSnafu {
368                    sql,
369                    duration: self.execution_timeout,
370                }
371                .build()
372            })?
373            .context(MySqlExecutionSnafu { sql })?
374            .rows_affected();
375        Ok(rows_affected)
376    }
377
378    /// Returns the result of the query.
379    ///
380    /// # Panics
381    /// if `current` is `None`.
382    async fn query(
383        &mut self,
384        query: Query<'_, MySql, MySqlArguments>,
385        sql: &str,
386    ) -> Result<Vec<MySqlRow>> {
387        // Safety: `maybe_init_client` ensures `current` is not `None`.
388        let client = self.current.as_mut().unwrap();
389        let future = query.fetch_all(&mut **client);
390        tokio::time::timeout(self.execution_timeout, future)
391            .await
392            .map_err(|_| {
393                SqlExecutionTimeoutSnafu {
394                    sql,
395                    duration: self.execution_timeout,
396                }
397                .build()
398            })?
399            .context(MySqlExecutionSnafu { sql })
400    }
401
402    async fn transaction(&mut self) -> Result<TransactionWithExecutionTimeout<'_>> {
403        use sqlx::Acquire;
404        let client = self.current.as_mut().unwrap();
405        let transaction = client
406            .begin()
407            .await
408            .context(MySqlExecutionSnafu { sql: "BEGIN" })?;
409
410        Ok(TransactionWithExecutionTimeout {
411            transaction,
412            execution_timeout: self.execution_timeout,
413        })
414    }
415}
416
417struct TransactionWithExecutionTimeout<'a> {
418    transaction: MySqlTransaction<'a>,
419    execution_timeout: Duration,
420}
421
422impl TransactionWithExecutionTimeout<'_> {
423    async fn query(
424        &mut self,
425        query: Query<'_, MySql, MySqlArguments>,
426        sql: &str,
427    ) -> Result<Vec<MySqlRow>> {
428        let res = tokio::time::timeout(
429            self.execution_timeout,
430            query.fetch_all(&mut *self.transaction),
431        )
432        .await
433        .map_err(|_| {
434            SqlExecutionTimeoutSnafu {
435                sql,
436                duration: self.execution_timeout,
437            }
438            .build()
439        })?
440        .context(MySqlExecutionSnafu { sql })?;
441        Ok(res)
442    }
443
444    async fn execute(&mut self, query: Query<'_, MySql, MySqlArguments>, sql: &str) -> Result<u64> {
445        let res = tokio::time::timeout(
446            self.execution_timeout,
447            query.execute(&mut *self.transaction),
448        )
449        .await
450        .map_err(|_| {
451            SqlExecutionTimeoutSnafu {
452                sql,
453                duration: self.execution_timeout,
454            }
455            .build()
456        })?
457        .context(MySqlExecutionSnafu { sql })?;
458        Ok(res.rows_affected())
459    }
460
461    async fn commit(self) -> Result<()> {
462        tokio::time::timeout(self.execution_timeout, self.transaction.commit())
463            .await
464            .map_err(|_| {
465                SqlExecutionTimeoutSnafu {
466                    sql: "COMMIT",
467                    duration: self.execution_timeout,
468                }
469                .build()
470            })?
471            .context(MySqlExecutionSnafu { sql: "COMMIT" })?;
472        Ok(())
473    }
474}
475
476/// MySQL implementation of Election.
477pub struct MySqlElection {
478    leader_value: String,
479    client: Mutex<ElectionMysqlClient>,
480    is_leader: AtomicBool,
481    leader_infancy: AtomicBool,
482    leader_watcher: broadcast::Sender<LeaderChangeMessage>,
483    store_key_prefix: String,
484    candidate_lease_ttl: Duration,
485    meta_lease_ttl: Duration,
486    sql_set: ElectionSqlSet,
487}
488
489impl MySqlElection {
490    pub async fn with_mysql_client(
491        leader_value: String,
492        mut client: ElectionMysqlClient,
493        store_key_prefix: String,
494        candidate_lease_ttl: Duration,
495        meta_lease_ttl: Duration,
496        table_name: &str,
497    ) -> Result<ElectionRef> {
498        let sql_factory = ElectionSqlFactory::new(table_name);
499        client.maybe_init_client().await?;
500        client.ensure_table_exists().await?;
501        let tx = listen_leader_change(leader_value.clone());
502        Ok(Arc::new(Self {
503            leader_value,
504            client: Mutex::new(client),
505            is_leader: AtomicBool::new(false),
506            leader_infancy: AtomicBool::new(false),
507            leader_watcher: tx,
508            store_key_prefix,
509            candidate_lease_ttl,
510            meta_lease_ttl,
511            sql_set: sql_factory.build(),
512        }))
513    }
514
515    fn election_key(&self) -> String {
516        format!("{}{}", self.store_key_prefix, ELECTION_KEY)
517    }
518
519    fn candidate_root(&self) -> String {
520        format!("{}{}", self.store_key_prefix, CANDIDATES_ROOT)
521    }
522
523    fn candidate_key(&self) -> String {
524        format!("{}{}", self.candidate_root(), self.leader_value)
525    }
526
527    async fn maybe_init_client(&self) -> Result<()> {
528        let mut client = self.client.lock().await;
529        client.maybe_init_client().await?;
530        Ok(())
531    }
532}
533
534#[async_trait::async_trait]
535impl Election for MySqlElection {
536    type Leader = LeaderValue;
537
538    fn is_leader(&self) -> bool {
539        self.is_leader.load(Ordering::Relaxed)
540    }
541
542    fn in_leader_infancy(&self) -> bool {
543        self.leader_infancy
544            .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
545            .is_ok()
546    }
547
548    async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> {
549        self.maybe_init_client().await?;
550        let key = self.candidate_key();
551        let node_info =
552            serde_json::to_string(node_info).with_context(|_| SerializeToJsonSnafu {
553                input: format!("{node_info:?}"),
554            })?;
555
556        {
557            let client = self.client.lock().await;
558            let mut executor = Executor::Default(client);
559            let res = self
560                .put_value_with_lease(
561                    &key,
562                    &node_info,
563                    self.candidate_lease_ttl.as_secs(),
564                    &mut executor,
565                )
566                .await?;
567            // May registered before, just update the lease.
568            if !res {
569                warn!("Candidate already registered, update the lease");
570                self.delete_value(&key, &mut executor).await?;
571                self.put_value_with_lease(
572                    &key,
573                    &node_info,
574                    self.candidate_lease_ttl.as_secs(),
575                    &mut executor,
576                )
577                .await?;
578            }
579        }
580
581        // Check if the current lease has expired and renew the lease.
582        let mut keep_alive_interval = tokio::time::interval(self.candidate_lease_ttl / 2);
583        loop {
584            let _ = keep_alive_interval.tick().await;
585            let client = self.client.lock().await;
586            let mut executor = Executor::Default(client);
587            let lease = self
588                .get_value_with_lease(&key, &mut executor)
589                .await?
590                .unwrap_or_default();
591
592            ensure!(
593                lease.expire_time > lease.current,
594                UnexpectedSnafu {
595                    violated: format!(
596                        "Candidate lease expired at {:?} (current time: {:?}), key: {:?}",
597                        lease.expire_time,
598                        lease.current,
599                        String::from_utf8_lossy(&key.into_bytes())
600                    ),
601                }
602            );
603
604            self.update_value_with_lease(
605                &key,
606                &lease.origin,
607                &node_info,
608                self.candidate_lease_ttl.as_secs(),
609                &mut executor,
610            )
611            .await?;
612            std::mem::drop(executor);
613        }
614    }
615
616    async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>> {
617        self.maybe_init_client().await?;
618        let key_prefix = self.candidate_root();
619        let client = self.client.lock().await;
620        let mut executor = Executor::Default(client);
621        let (mut candidates, current) = self
622            .get_value_with_lease_by_prefix(&key_prefix, &mut executor)
623            .await?;
624        // Remove expired candidates
625        candidates.retain(|c| c.1 > current);
626        let mut valid_candidates = Vec::with_capacity(candidates.len());
627        for (c, _) in candidates {
628            let node_info: MetasrvNodeInfo =
629                serde_json::from_str(&c).with_context(|_| DeserializeFromJsonSnafu {
630                    input: format!("{:?}", c),
631                })?;
632            valid_candidates.push(node_info);
633        }
634        Ok(valid_candidates)
635    }
636
637    async fn campaign(&self) -> Result<()> {
638        self.maybe_init_client().await?;
639        let mut keep_alive_interval = tokio::time::interval(self.meta_lease_ttl / 2);
640        keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
641        loop {
642            self.do_campaign().await?;
643            keep_alive_interval.tick().await;
644        }
645    }
646
647    async fn reset_campaign(&self) {
648        if let Err(err) = self.client.lock().await.reset_client().await {
649            error!(err; "Failed to reset client");
650        }
651    }
652
653    async fn leader(&self) -> Result<Self::Leader> {
654        self.maybe_init_client().await?;
655        if self.is_leader.load(Ordering::Relaxed) {
656            Ok(self.leader_value.as_bytes().into())
657        } else {
658            let key = self.election_key();
659
660            let client = self.client.lock().await;
661            let mut executor = Executor::Default(client);
662            if let Some(lease) = self.get_value_with_lease(&key, &mut executor).await? {
663                ensure!(lease.expire_time > lease.current, NoLeaderSnafu);
664                Ok(lease.leader_value.as_bytes().into())
665            } else {
666                NoLeaderSnafu.fail()
667            }
668        }
669    }
670
671    async fn resign(&self) -> Result<()> {
672        todo!()
673    }
674
675    fn subscribe_leader_change(&self) -> broadcast::Receiver<LeaderChangeMessage> {
676        self.leader_watcher.subscribe()
677    }
678}
679
680impl MySqlElection {
681    /// Returns value, expire time and current time.
682    async fn get_value_with_lease(
683        &self,
684        key: &str,
685        executor: &mut Executor<'_>,
686    ) -> Result<Option<Lease>> {
687        let key = key.as_bytes();
688        let query = sqlx::query(&self.sql_set.get_value_with_lease).bind(key);
689        let res = executor
690            .query(query, &self.sql_set.get_value_with_lease)
691            .await?;
692
693        if res.is_empty() {
694            return Ok(None);
695        }
696        // Safety: Checked if res is empty above.
697        let current_time_str = String::from_utf8_lossy(res[0].try_get(1).unwrap());
698        let current_time = match Timestamp::from_str(&current_time_str, None) {
699            Ok(ts) => ts,
700            Err(_) => UnexpectedSnafu {
701                violated: format!("Invalid timestamp: {}", current_time_str),
702            }
703            .fail()?,
704        };
705        // Safety: Checked if res is empty above.
706        let value_and_expire_time = String::from_utf8_lossy(res[0].try_get(0).unwrap_or_default());
707        let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
708
709        Ok(Some(Lease {
710            leader_value: value,
711            expire_time,
712            current: current_time,
713            origin: value_and_expire_time.to_string(),
714        }))
715    }
716
717    /// Returns all values and expire time with the given key prefix. Also returns the current time.
718    async fn get_value_with_lease_by_prefix(
719        &self,
720        key_prefix: &str,
721        executor: &mut Executor<'_>,
722    ) -> Result<(Vec<(String, Timestamp)>, Timestamp)> {
723        let key_prefix = format!("{}%", key_prefix).as_bytes().to_vec();
724        let query = sqlx::query(&self.sql_set.get_value_with_lease_by_prefix).bind(key_prefix);
725        let res = executor
726            .query(query, &self.sql_set.get_value_with_lease_by_prefix)
727            .await?;
728
729        let mut values_with_leases = vec![];
730        let mut current = Timestamp::default();
731        for row in res {
732            let current_time_str = row.try_get(1).unwrap_or_default();
733            current = match Timestamp::from_str(current_time_str, None) {
734                Ok(ts) => ts,
735                Err(_) => UnexpectedSnafu {
736                    violated: format!("Invalid timestamp: {}", current_time_str),
737                }
738                .fail()?,
739            };
740
741            let value_and_expire_time = String::from_utf8_lossy(row.try_get(0).unwrap_or_default());
742            let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
743
744            values_with_leases.push((value, expire_time));
745        }
746        Ok((values_with_leases, current))
747    }
748
749    async fn update_value_with_lease(
750        &self,
751        key: &str,
752        prev: &str,
753        updated: &str,
754        lease_ttl: u64,
755        executor: &mut Executor<'_>,
756    ) -> Result<()> {
757        let key = key.as_bytes();
758        let prev = prev.as_bytes();
759        let updated = updated.as_bytes();
760
761        let query = sqlx::query(&self.sql_set.update_value_with_lease)
762            .bind(updated)
763            .bind(lease_ttl as f64)
764            .bind(key)
765            .bind(prev);
766        let res = executor
767            .execute(query, &self.sql_set.update_value_with_lease)
768            .await?;
769
770        ensure!(
771            res == 1,
772            UnexpectedSnafu {
773                violated: format!("Failed to update key: {}", String::from_utf8_lossy(key)),
774            }
775        );
776
777        Ok(())
778    }
779
780    /// Returns `true` if the insertion is successful
781    async fn put_value_with_lease(
782        &self,
783        key: &str,
784        value: &str,
785        lease_ttl_secs: u64,
786        executor: &mut Executor<'_>,
787    ) -> Result<bool> {
788        let key = key.as_bytes();
789        let lease_ttl_secs = lease_ttl_secs as f64;
790        let query = sqlx::query(&self.sql_set.put_value_with_lease)
791            .bind(key)
792            .bind(value)
793            .bind(lease_ttl_secs);
794        let res = executor
795            .execute(query, &self.sql_set.put_value_with_lease)
796            .await?;
797        Ok(res == 1)
798    }
799
800    /// Returns `true` if the deletion is successful.
801    /// Caution: Should only delete the key if the lease is expired.
802    async fn delete_value(&self, key: &str, executor: &mut Executor<'_>) -> Result<bool> {
803        let key = key.as_bytes();
804        let query = sqlx::query(&self.sql_set.delete_value).bind(key);
805        let res = executor.execute(query, &self.sql_set.delete_value).await?;
806
807        Ok(res == 1)
808    }
809
810    /// Attempts to acquire leadership by executing a campaign. This function continuously checks
811    /// if the current lease is still valid.
812    async fn do_campaign(&self) -> Result<()> {
813        let lease = {
814            let client = self.client.lock().await;
815            let mut executor = Executor::Default(client);
816            self.get_value_with_lease(&self.election_key(), &mut executor)
817                .await?
818        };
819
820        let is_leader = self.is_leader();
821        // If current leader value is the same as the leader value in the remote lease,
822        // it means the current leader is still valid.
823        let is_current_leader = lease
824            .as_ref()
825            .map(|lease| lease.leader_value == self.leader_value)
826            .unwrap_or(false);
827        match (self.lease_check(&lease), is_leader, is_current_leader) {
828            // If the leader lease is valid and I'm the leader, renew the lease.
829            (Ok(_), true, true) => {
830                let mut client = self.client.lock().await;
831                let txn = client.transaction().await?;
832                let mut executor = Executor::Txn(txn);
833                let query = sqlx::query(&self.sql_set.campaign);
834                executor.query(query, &self.sql_set.campaign).await?;
835                // Safety: Checked if lease is not None above.
836                self.renew_lease(executor, lease.unwrap()).await?;
837            }
838            // If the leader lease expires and I'm the leader, notify the leader watcher and step down.
839            // Another instance should be elected as the leader in this case.
840            (Err(err), true, _) => {
841                warn!(err; "Leader lease expired, step down...");
842                self.step_down_without_lock().await?;
843            }
844            (Ok(_), true, false) => {
845                warn!("Leader lease expired, step down...");
846                self.step_down_without_lock().await?;
847            }
848            // If the leader lease expires and I'm not the leader, elect myself.
849            (Err(err), false, _) => {
850                warn!(err; "Leader lease expired, elect myself.");
851                let mut client = self.client.lock().await;
852                let txn = client.transaction().await?;
853                let mut executor = Executor::Txn(txn);
854                let query = sqlx::query(&self.sql_set.campaign);
855                executor.query(query, &self.sql_set.campaign).await?;
856                self.elected(executor, lease).await?;
857            }
858            // If the leader lease is valid and I'm the leader, but I don't think I'm the leader.
859            // Just re-elect myself.
860            (Ok(_), false, true) => {
861                warn!("I should be the leader, but I don't think so. Something went wrong.");
862                let mut client = self.client.lock().await;
863                let txn = client.transaction().await?;
864                let mut executor = Executor::Txn(txn);
865                let query = sqlx::query(&self.sql_set.campaign);
866                executor.query(query, &self.sql_set.campaign).await?;
867                // Safety: Checked if lease is not None above.
868                self.renew_lease(executor, lease.unwrap()).await?;
869            }
870            // If the leader lease is valid and I'm not the leader, do nothing.
871            (Ok(_), false, false) => {}
872        }
873        Ok(())
874    }
875
876    /// Renew the lease
877    async fn renew_lease(&self, mut executor: Executor<'_>, lease: Lease) -> Result<()> {
878        let key = self.election_key();
879        self.update_value_with_lease(
880            &key,
881            &lease.origin,
882            &self.leader_value,
883            self.meta_lease_ttl.as_secs(),
884            &mut executor,
885        )
886        .await?;
887        executor.commit().await?;
888
889        if !self.is_leader() {
890            let key = self.election_key();
891            let leader_key = RdsLeaderKey {
892                name: self.leader_value.clone().into_bytes(),
893                key: key.clone().into_bytes(),
894                ..Default::default()
895            };
896            send_leader_change_and_set_flags(
897                &self.is_leader,
898                &self.leader_infancy,
899                &self.leader_watcher,
900                LeaderChangeMessage::Elected(Arc::new(leader_key)),
901            );
902        }
903
904        Ok(())
905    }
906
907    /// Performs a lease check during the election process.
908    ///
909    /// This function performs the following checks and actions:
910    ///
911    /// - **Case 1**: If the current instance is not the leader but the lease has expired, it raises an error
912    ///   to re-initiate the campaign. If the leader failed to renew the lease, its session will expire and the lock
913    ///   will be released.
914    /// - **Case 2**: If all checks pass, the function returns without performing any actions.
915    fn lease_check(&self, lease: &Option<Lease>) -> Result<Lease> {
916        let lease = lease.as_ref().context(NoLeaderSnafu)?;
917        // Case 1: Lease expired
918        ensure!(lease.expire_time > lease.current, LeaderLeaseExpiredSnafu);
919        // Case 2: Everything is fine
920        Ok(lease.clone())
921    }
922
923    /// Still consider itself as the leader locally but failed to acquire the lock. Step down without deleting the key.
924    async fn step_down_without_lock(&self) -> Result<()> {
925        let key = self.election_key().into_bytes();
926        let leader_key = RdsLeaderKey {
927            name: self.leader_value.clone().into_bytes(),
928            key: key.clone(),
929            ..Default::default()
930        };
931        send_leader_change_and_set_flags(
932            &self.is_leader,
933            &self.leader_infancy,
934            &self.leader_watcher,
935            LeaderChangeMessage::StepDown(Arc::new(leader_key)),
936        );
937        Ok(())
938    }
939
940    /// Elected as leader. The leader should put the key and notify the leader watcher.
941    /// Caution: Should only elected while holding the lock.
942    async fn elected(
943        &self,
944        mut executor: Executor<'_>,
945        expected_lease: Option<Lease>,
946    ) -> Result<()> {
947        let key = self.election_key();
948        let leader_key = RdsLeaderKey {
949            name: self.leader_value.clone().into_bytes(),
950            key: key.clone().into_bytes(),
951            ..Default::default()
952        };
953        let remote_lease = self.get_value_with_lease(&key, &mut executor).await?;
954        ensure!(
955            expected_lease.map(|lease| lease.origin) == remote_lease.map(|lease| lease.origin),
956            LeaderLeaseChangedSnafu
957        );
958        self.delete_value(&key, &mut executor).await?;
959        self.put_value_with_lease(
960            &key,
961            &self.leader_value,
962            self.meta_lease_ttl.as_secs(),
963            &mut executor,
964        )
965        .await?;
966        executor.commit().await?;
967
968        send_leader_change_and_set_flags(
969            &self.is_leader,
970            &self.leader_infancy,
971            &self.leader_watcher,
972            LeaderChangeMessage::Elected(Arc::new(leader_key)),
973        );
974        Ok(())
975    }
976}
977
978#[cfg(test)]
979mod tests {
980    use std::assert_matches::assert_matches;
981    use std::env;
982
983    use common_meta::maybe_skip_mysql_integration_test;
984    use common_telemetry::init_default_ut_logging;
985
986    use super::*;
987    use crate::bootstrap::create_mysql_pool;
988    use crate::error;
989
990    async fn create_mysql_client(
991        table_name: Option<&str>,
992        execution_timeout: Duration,
993        wait_timeout: Duration,
994    ) -> Result<Mutex<ElectionMysqlClient>> {
995        init_default_ut_logging();
996        let endpoint = env::var("GT_MYSQL_ENDPOINTS").unwrap_or_default();
997        if endpoint.is_empty() {
998            return UnexpectedSnafu {
999                violated: "MySQL endpoint is empty".to_string(),
1000            }
1001            .fail();
1002        }
1003        let pool = create_mysql_pool(&[endpoint]).await.unwrap();
1004        let mut client = ElectionMysqlClient::new(
1005            pool,
1006            execution_timeout,
1007            execution_timeout,
1008            Duration::from_secs(1),
1009            wait_timeout,
1010            table_name.unwrap_or("default_greptime_metakv_election"),
1011        );
1012        client.maybe_init_client().await?;
1013        if table_name.is_some() {
1014            client.ensure_table_exists().await?;
1015        }
1016        Ok(Mutex::new(client))
1017    }
1018
1019    async fn drop_table(client: &Mutex<ElectionMysqlClient>, table_name: &str) {
1020        let mut client = client.lock().await;
1021        let sql = format!("DROP TABLE IF EXISTS {};", table_name);
1022        client.execute(sqlx::query(&sql), &sql).await.unwrap();
1023    }
1024
1025    #[tokio::test]
1026    async fn test_mysql_crud() {
1027        maybe_skip_mysql_integration_test!();
1028        let key = "test_key".to_string();
1029        let value = "test_value".to_string();
1030
1031        let uuid = uuid::Uuid::new_v4().to_string();
1032        let table_name = "test_mysql_crud_greptime_metakv";
1033        let candidate_lease_ttl = Duration::from_secs(10);
1034        let meta_lease_ttl = Duration::from_secs(2);
1035
1036        let execution_timeout = Duration::from_secs(10);
1037        let idle_session_timeout = Duration::from_secs(0);
1038        let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1039            .await
1040            .unwrap();
1041
1042        {
1043            let mut a = client.lock().await;
1044            let txn = a.transaction().await.unwrap();
1045            let mut executor = Executor::Txn(txn);
1046            let raw_query = format!("SELECT * FROM {} FOR UPDATE;", table_name);
1047            let query = sqlx::query(&raw_query);
1048            let _ = executor.query(query, &raw_query).await.unwrap();
1049        }
1050
1051        let (tx, _) = broadcast::channel(100);
1052        let mysql_election = MySqlElection {
1053            leader_value: "test_leader".to_string(),
1054            client,
1055            is_leader: AtomicBool::new(false),
1056            leader_infancy: AtomicBool::new(true),
1057            leader_watcher: tx,
1058            store_key_prefix: uuid,
1059            candidate_lease_ttl,
1060            meta_lease_ttl,
1061            sql_set: ElectionSqlFactory::new(table_name).build(),
1062        };
1063        let client = mysql_election.client.lock().await;
1064        let mut executor = Executor::Default(client);
1065        let res = mysql_election
1066            .put_value_with_lease(&key, &value, 10, &mut executor)
1067            .await
1068            .unwrap();
1069        assert!(res);
1070
1071        let lease = mysql_election
1072            .get_value_with_lease(&key, &mut executor)
1073            .await
1074            .unwrap()
1075            .unwrap();
1076        assert_eq!(lease.leader_value, value);
1077
1078        mysql_election
1079            .update_value_with_lease(&key, &lease.origin, &value, 10, &mut executor)
1080            .await
1081            .unwrap();
1082
1083        let res = mysql_election
1084            .delete_value(&key, &mut executor)
1085            .await
1086            .unwrap();
1087        assert!(res);
1088
1089        let res = mysql_election
1090            .get_value_with_lease(&key, &mut executor)
1091            .await
1092            .unwrap();
1093        assert!(res.is_none());
1094
1095        for i in 0..10 {
1096            let key = format!("test_key_{}", i);
1097            let value = format!("test_value_{}", i);
1098            mysql_election
1099                .put_value_with_lease(&key, &value, 10, &mut executor)
1100                .await
1101                .unwrap();
1102        }
1103
1104        let key_prefix = "test_key".to_string();
1105        let (res, _) = mysql_election
1106            .get_value_with_lease_by_prefix(&key_prefix, &mut executor)
1107            .await
1108            .unwrap();
1109        assert_eq!(res.len(), 10);
1110
1111        for i in 0..10 {
1112            let key = format!("test_key_{}", i);
1113            let res = mysql_election
1114                .delete_value(&key, &mut executor)
1115                .await
1116                .unwrap();
1117            assert!(res);
1118        }
1119
1120        let (res, current) = mysql_election
1121            .get_value_with_lease_by_prefix(&key_prefix, &mut executor)
1122            .await
1123            .unwrap();
1124        assert!(res.is_empty());
1125        assert!(current == Timestamp::default());
1126
1127        // Should drop manually.
1128        std::mem::drop(executor);
1129        drop_table(&mysql_election.client, table_name).await;
1130    }
1131
1132    async fn candidate(
1133        leader_value: String,
1134        candidate_lease_ttl: Duration,
1135        store_key_prefix: String,
1136        table_name: String,
1137    ) {
1138        let meta_lease_ttl = Duration::from_secs(2);
1139        let execution_timeout = Duration::from_secs(10);
1140        let idle_session_timeout = Duration::from_secs(0);
1141        let client =
1142            create_mysql_client(Some(&table_name), execution_timeout, idle_session_timeout)
1143                .await
1144                .unwrap();
1145
1146        let (tx, _) = broadcast::channel(100);
1147        let mysql_election = MySqlElection {
1148            leader_value,
1149            client,
1150            is_leader: AtomicBool::new(false),
1151            leader_infancy: AtomicBool::new(true),
1152            leader_watcher: tx,
1153            store_key_prefix,
1154            candidate_lease_ttl,
1155            meta_lease_ttl,
1156            sql_set: ElectionSqlFactory::new(&table_name).build(),
1157        };
1158
1159        let node_info = MetasrvNodeInfo {
1160            addr: "test_addr".to_string(),
1161            version: "test_version".to_string(),
1162            git_commit: "test_git_commit".to_string(),
1163            start_time_ms: 0,
1164        };
1165        mysql_election.register_candidate(&node_info).await.unwrap();
1166    }
1167
1168    #[tokio::test]
1169    async fn test_candidate_registration() {
1170        maybe_skip_mysql_integration_test!();
1171        let leader_value_prefix = "test_leader".to_string();
1172        let candidate_lease_ttl = Duration::from_secs(2);
1173        let execution_timeout = Duration::from_secs(10);
1174        let meta_lease_ttl = Duration::from_secs(2);
1175        let idle_session_timeout = Duration::from_secs(0);
1176        let uuid = uuid::Uuid::new_v4().to_string();
1177        let table_name = "test_candidate_registration_greptime_metakv";
1178        let mut handles = vec![];
1179        let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1180            .await
1181            .unwrap();
1182
1183        for i in 0..10 {
1184            let leader_value = format!("{}{}", leader_value_prefix, i);
1185            let handle = tokio::spawn(candidate(
1186                leader_value,
1187                candidate_lease_ttl,
1188                uuid.clone(),
1189                table_name.to_string(),
1190            ));
1191            handles.push(handle);
1192        }
1193        // Wait for candidates to registrate themselves and renew their leases at least once.
1194        tokio::time::sleep(candidate_lease_ttl / 2 + Duration::from_secs(1)).await;
1195
1196        let (tx, _) = broadcast::channel(100);
1197        let leader_value = "test_leader".to_string();
1198        let mysql_election = MySqlElection {
1199            leader_value,
1200            client,
1201            is_leader: AtomicBool::new(false),
1202            leader_infancy: AtomicBool::new(true),
1203            leader_watcher: tx,
1204            store_key_prefix: uuid.clone(),
1205            candidate_lease_ttl,
1206            meta_lease_ttl,
1207            sql_set: ElectionSqlFactory::new(table_name).build(),
1208        };
1209
1210        let candidates = mysql_election.all_candidates().await.unwrap();
1211        assert_eq!(candidates.len(), 10);
1212
1213        for handle in handles {
1214            handle.abort();
1215        }
1216
1217        // Wait for the candidate leases to expire.
1218        tokio::time::sleep(candidate_lease_ttl + Duration::from_secs(1)).await;
1219        let candidates = mysql_election.all_candidates().await.unwrap();
1220        assert!(candidates.is_empty());
1221
1222        // Garbage collection
1223        let client = mysql_election.client.lock().await;
1224        let mut executor = Executor::Default(client);
1225        for i in 0..10 {
1226            let key = format!("{}{}{}{}", uuid, CANDIDATES_ROOT, leader_value_prefix, i);
1227            let res = mysql_election
1228                .delete_value(&key, &mut executor)
1229                .await
1230                .unwrap();
1231            assert!(res);
1232        }
1233
1234        // Should drop manually.
1235        std::mem::drop(executor);
1236        drop_table(&mysql_election.client, table_name).await;
1237    }
1238
1239    async fn elected(
1240        election: &MySqlElection,
1241        table_name: &str,
1242        expected_lease: Option<Lease>,
1243    ) -> Result<()> {
1244        let mut client = election.client.lock().await;
1245        let txn = client.transaction().await.unwrap();
1246        let mut executor = Executor::Txn(txn);
1247        let raw_query = format!("SELECT * FROM {} FOR UPDATE;", table_name);
1248        let query = sqlx::query(&raw_query);
1249        let _ = executor.query(query, &raw_query).await.unwrap();
1250        election.elected(executor, expected_lease).await
1251    }
1252
1253    async fn get_lease(election: &MySqlElection) -> Option<Lease> {
1254        let client = election.client.lock().await;
1255        let mut executor = Executor::Default(client);
1256        election
1257            .get_value_with_lease(&election.election_key(), &mut executor)
1258            .await
1259            .unwrap()
1260    }
1261
1262    #[tokio::test]
1263    async fn test_elected_with_incorrect_lease_fails() {
1264        maybe_skip_mysql_integration_test!();
1265        let leader_value = "test_leader".to_string();
1266        let candidate_lease_ttl = Duration::from_secs(5);
1267        let meta_lease_ttl = Duration::from_secs(2);
1268        let execution_timeout = Duration::from_secs(10);
1269        let idle_session_timeout = Duration::from_secs(0);
1270        let uuid = uuid::Uuid::new_v4().to_string();
1271        let table_name = "test_elected_failed_greptime_metakv";
1272        let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1273            .await
1274            .unwrap();
1275
1276        let (tx, _) = broadcast::channel(100);
1277        let leader_mysql_election = MySqlElection {
1278            leader_value: leader_value.clone(),
1279            client,
1280            is_leader: AtomicBool::new(false),
1281            leader_infancy: AtomicBool::new(true),
1282            leader_watcher: tx,
1283            store_key_prefix: uuid,
1284            candidate_lease_ttl,
1285            meta_lease_ttl,
1286            sql_set: ElectionSqlFactory::new(table_name).build(),
1287        };
1288
1289        let incorrect_lease = Lease::default();
1290        let err = elected(&leader_mysql_election, table_name, Some(incorrect_lease))
1291            .await
1292            .unwrap_err();
1293        assert_matches!(err, error::Error::LeaderLeaseChanged { .. });
1294        let lease = get_lease(&leader_mysql_election).await;
1295        assert!(lease.is_none());
1296        drop_table(&leader_mysql_election.client, table_name).await;
1297    }
1298
1299    #[tokio::test]
1300    async fn test_reelection_with_idle_session_timeout() {
1301        maybe_skip_mysql_integration_test!();
1302        let leader_value = "test_leader".to_string();
1303        let uuid = uuid::Uuid::new_v4().to_string();
1304        let table_name = "test_reelection_greptime_metakv";
1305        let candidate_lease_ttl = Duration::from_secs(5);
1306        let meta_lease_ttl = Duration::from_secs(5);
1307        let execution_timeout = Duration::from_secs(10);
1308        let idle_session_timeout = Duration::from_secs(2);
1309        let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1310            .await
1311            .unwrap();
1312
1313        let (tx, _) = broadcast::channel(100);
1314        let leader_mysql_election = MySqlElection {
1315            leader_value: leader_value.clone(),
1316            client,
1317            is_leader: AtomicBool::new(false),
1318            leader_infancy: AtomicBool::new(true),
1319            leader_watcher: tx,
1320            store_key_prefix: uuid,
1321            candidate_lease_ttl,
1322            meta_lease_ttl,
1323            sql_set: ElectionSqlFactory::new(table_name).build(),
1324        };
1325
1326        elected(&leader_mysql_election, table_name, None)
1327            .await
1328            .unwrap();
1329        let lease = get_lease(&leader_mysql_election).await.unwrap();
1330        assert_eq!(lease.leader_value, leader_value);
1331        assert!(lease.expire_time > lease.current);
1332        assert!(leader_mysql_election.is_leader());
1333        // Wait for mysql server close the inactive connection.
1334        tokio::time::sleep(Duration::from_millis(2100)).await;
1335        // Should be failed.
1336        leader_mysql_election
1337            .client
1338            .lock()
1339            .await
1340            .query(sqlx::query("SELECT 1"), "SELECT 1")
1341            .await
1342            .unwrap_err();
1343        // Reset the client.
1344        leader_mysql_election
1345            .client
1346            .lock()
1347            .await
1348            .reset_client()
1349            .await
1350            .unwrap();
1351
1352        // Should able to re-elected.
1353        elected(&leader_mysql_election, table_name, Some(lease.clone()))
1354            .await
1355            .unwrap();
1356        let lease = get_lease(&leader_mysql_election).await.unwrap();
1357        assert_eq!(lease.leader_value, leader_value);
1358        assert!(lease.expire_time > lease.current);
1359        assert!(leader_mysql_election.is_leader());
1360        drop_table(&leader_mysql_election.client, table_name).await;
1361    }
1362
1363    #[tokio::test]
1364    async fn test_elected_and_step_down() {
1365        maybe_skip_mysql_integration_test!();
1366        let leader_value = "test_leader".to_string();
1367        let candidate_lease_ttl = Duration::from_secs(5);
1368        let meta_lease_ttl = Duration::from_secs(2);
1369        let execution_timeout = Duration::from_secs(10);
1370        let idle_session_timeout = Duration::from_secs(0);
1371        let uuid = uuid::Uuid::new_v4().to_string();
1372        let table_name = "test_elected_and_step_down_greptime_metakv";
1373        let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1374            .await
1375            .unwrap();
1376
1377        let (tx, mut rx) = broadcast::channel(100);
1378        let leader_mysql_election = MySqlElection {
1379            leader_value: leader_value.clone(),
1380            client,
1381            is_leader: AtomicBool::new(false),
1382            leader_infancy: AtomicBool::new(true),
1383            leader_watcher: tx,
1384            store_key_prefix: uuid,
1385            candidate_lease_ttl,
1386            meta_lease_ttl,
1387            sql_set: ElectionSqlFactory::new(table_name).build(),
1388        };
1389
1390        elected(&leader_mysql_election, table_name, None)
1391            .await
1392            .unwrap();
1393        let lease = get_lease(&leader_mysql_election).await.unwrap();
1394        assert_eq!(lease.leader_value, leader_value);
1395        assert!(lease.expire_time > lease.current);
1396        assert!(leader_mysql_election.is_leader());
1397
1398        match rx.recv().await {
1399            Ok(LeaderChangeMessage::Elected(key)) => {
1400                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1401                assert_eq!(
1402                    String::from_utf8_lossy(key.key()),
1403                    leader_mysql_election.election_key()
1404                );
1405                assert_eq!(key.lease_id(), i64::default());
1406                assert_eq!(key.revision(), i64::default());
1407            }
1408            _ => panic!("Expected LeaderChangeMessage::Elected"),
1409        }
1410
1411        leader_mysql_election
1412            .step_down_without_lock()
1413            .await
1414            .unwrap();
1415        let lease = get_lease(&leader_mysql_election).await.unwrap();
1416        assert_eq!(lease.leader_value, leader_value);
1417        assert!(!leader_mysql_election.is_leader());
1418
1419        match rx.recv().await {
1420            Ok(LeaderChangeMessage::StepDown(key)) => {
1421                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1422                assert_eq!(
1423                    String::from_utf8_lossy(key.key()),
1424                    leader_mysql_election.election_key()
1425                );
1426                assert_eq!(key.lease_id(), i64::default());
1427                assert_eq!(key.revision(), i64::default());
1428            }
1429            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1430        }
1431
1432        elected(&leader_mysql_election, table_name, Some(lease.clone()))
1433            .await
1434            .unwrap();
1435        let lease = get_lease(&leader_mysql_election).await.unwrap();
1436        assert_eq!(lease.leader_value, leader_value);
1437        assert!(lease.expire_time > lease.current);
1438        assert!(leader_mysql_election.is_leader());
1439
1440        match rx.recv().await {
1441            Ok(LeaderChangeMessage::Elected(key)) => {
1442                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1443                assert_eq!(
1444                    String::from_utf8_lossy(key.key()),
1445                    leader_mysql_election.election_key()
1446                );
1447                assert_eq!(key.lease_id(), i64::default());
1448                assert_eq!(key.revision(), i64::default());
1449            }
1450            _ => panic!("Expected LeaderChangeMessage::Elected"),
1451        }
1452
1453        drop_table(&leader_mysql_election.client, table_name).await;
1454    }
1455
1456    #[tokio::test]
1457    async fn test_campaign() {
1458        maybe_skip_mysql_integration_test!();
1459        let leader_value = "test_leader".to_string();
1460        let uuid = uuid::Uuid::new_v4().to_string();
1461        let table_name = "test_leader_action_greptime_metakv";
1462        let candidate_lease_ttl = Duration::from_secs(5);
1463        let meta_lease_ttl = Duration::from_secs(2);
1464        let execution_timeout = Duration::from_secs(10);
1465        let idle_session_timeout = Duration::from_secs(0);
1466        let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1467            .await
1468            .unwrap();
1469
1470        let (tx, mut rx) = broadcast::channel(100);
1471        let leader_mysql_election = MySqlElection {
1472            leader_value: leader_value.clone(),
1473            client,
1474            is_leader: AtomicBool::new(false),
1475            leader_infancy: AtomicBool::new(true),
1476            leader_watcher: tx,
1477            store_key_prefix: uuid,
1478            candidate_lease_ttl,
1479            meta_lease_ttl,
1480            sql_set: ElectionSqlFactory::new(table_name).build(),
1481        };
1482
1483        // Step 1: No leader exists, campaign and elected.
1484        leader_mysql_election.do_campaign().await.unwrap();
1485        let lease = get_lease(&leader_mysql_election).await.unwrap();
1486        assert_eq!(lease.leader_value, leader_value);
1487        assert!(lease.expire_time > lease.current);
1488        assert!(leader_mysql_election.is_leader());
1489
1490        match rx.recv().await {
1491            Ok(LeaderChangeMessage::Elected(key)) => {
1492                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1493                assert_eq!(
1494                    String::from_utf8_lossy(key.key()),
1495                    leader_mysql_election.election_key()
1496                );
1497                assert_eq!(key.lease_id(), i64::default());
1498                assert_eq!(key.revision(), i64::default());
1499            }
1500            _ => panic!("Expected LeaderChangeMessage::Elected"),
1501        }
1502
1503        // Step 2: As a leader, renew the lease.
1504        leader_mysql_election.do_campaign().await.unwrap();
1505        let new_lease = get_lease(&leader_mysql_election).await.unwrap();
1506        assert_eq!(lease.leader_value, leader_value);
1507        // The lease should be renewed.
1508        assert!(new_lease.expire_time > lease.expire_time);
1509        assert!(new_lease.expire_time > new_lease.current);
1510        assert!(leader_mysql_election.is_leader());
1511
1512        // Step 3: Something wrong, the leader lease expired.
1513        tokio::time::sleep(meta_lease_ttl + Duration::from_secs(1)).await;
1514        leader_mysql_election.do_campaign().await.unwrap();
1515        let lease = get_lease(&leader_mysql_election).await.unwrap();
1516        assert_eq!(lease.leader_value, leader_value);
1517        assert!(lease.expire_time <= lease.current);
1518        assert!(!leader_mysql_election.is_leader());
1519
1520        match rx.recv().await {
1521            Ok(LeaderChangeMessage::StepDown(key)) => {
1522                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1523                assert_eq!(
1524                    String::from_utf8_lossy(key.key()),
1525                    leader_mysql_election.election_key()
1526                );
1527                assert_eq!(key.lease_id(), i64::default());
1528                assert_eq!(key.revision(), i64::default());
1529            }
1530            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1531        }
1532
1533        // Step 4: Re-elect itself.
1534        leader_mysql_election.do_campaign().await.unwrap();
1535        let lease = get_lease(&leader_mysql_election).await.unwrap();
1536        assert_eq!(lease.leader_value, leader_value);
1537        assert!(lease.expire_time > lease.current);
1538        assert!(leader_mysql_election.is_leader());
1539
1540        match rx.recv().await {
1541            Ok(LeaderChangeMessage::Elected(key)) => {
1542                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1543                assert_eq!(
1544                    String::from_utf8_lossy(key.key()),
1545                    leader_mysql_election.election_key()
1546                );
1547                assert_eq!(key.lease_id(), i64::default());
1548                assert_eq!(key.revision(), i64::default());
1549            }
1550            _ => panic!("Expected LeaderChangeMessage::Elected"),
1551        }
1552
1553        // Step 5: Something wrong, the leader key is deleted by other followers.
1554        {
1555            let client = leader_mysql_election.client.lock().await;
1556            let mut executor = Executor::Default(client);
1557            leader_mysql_election
1558                .delete_value(&leader_mysql_election.election_key(), &mut executor)
1559                .await
1560                .unwrap();
1561        }
1562        leader_mysql_election.do_campaign().await.unwrap();
1563        let res = get_lease(&leader_mysql_election).await;
1564        assert!(res.is_none());
1565        assert!(!leader_mysql_election.is_leader());
1566
1567        match rx.recv().await {
1568            Ok(LeaderChangeMessage::StepDown(key)) => {
1569                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1570                assert_eq!(
1571                    String::from_utf8_lossy(key.key()),
1572                    leader_mysql_election.election_key()
1573                );
1574                assert_eq!(key.lease_id(), i64::default());
1575                assert_eq!(key.revision(), i64::default());
1576            }
1577            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1578        }
1579
1580        // Step 6: Re-elect itself.
1581        leader_mysql_election.do_campaign().await.unwrap();
1582        let lease = get_lease(&leader_mysql_election).await.unwrap();
1583        assert_eq!(lease.leader_value, leader_value);
1584        assert!(lease.expire_time > lease.current);
1585        assert!(leader_mysql_election.is_leader());
1586
1587        match rx.recv().await {
1588            Ok(LeaderChangeMessage::Elected(key)) => {
1589                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1590                assert_eq!(
1591                    String::from_utf8_lossy(key.key()),
1592                    leader_mysql_election.election_key()
1593                );
1594                assert_eq!(key.lease_id(), i64::default());
1595                assert_eq!(key.revision(), i64::default());
1596            }
1597            _ => panic!("Expected LeaderChangeMessage::Elected"),
1598        }
1599
1600        // Step 7: Something wrong, the leader key changed by others.
1601        let another_leader_key = "another_leader";
1602        {
1603            let client = leader_mysql_election.client.lock().await;
1604            let mut executor = Executor::Default(client);
1605            leader_mysql_election
1606                .delete_value(&leader_mysql_election.election_key(), &mut executor)
1607                .await
1608                .unwrap();
1609            leader_mysql_election
1610                .put_value_with_lease(
1611                    &leader_mysql_election.election_key(),
1612                    another_leader_key,
1613                    10,
1614                    &mut executor,
1615                )
1616                .await
1617                .unwrap();
1618        }
1619        leader_mysql_election.do_campaign().await.unwrap();
1620        let lease = get_lease(&leader_mysql_election).await.unwrap();
1621        // Different from pg, mysql will not delete the key, just step down.
1622        assert_eq!(lease.leader_value, another_leader_key);
1623        assert!(lease.expire_time > lease.current);
1624        assert!(!leader_mysql_election.is_leader());
1625
1626        match rx.recv().await {
1627            Ok(LeaderChangeMessage::StepDown(key)) => {
1628                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1629                assert_eq!(
1630                    String::from_utf8_lossy(key.key()),
1631                    leader_mysql_election.election_key()
1632                );
1633                assert_eq!(key.lease_id(), i64::default());
1634                assert_eq!(key.revision(), i64::default());
1635            }
1636            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1637        }
1638
1639        drop_table(&leader_mysql_election.client, table_name).await;
1640    }
1641
1642    #[tokio::test]
1643    async fn test_follower_action() {
1644        maybe_skip_mysql_integration_test!();
1645        common_telemetry::init_default_ut_logging();
1646        let candidate_lease_ttl = Duration::from_secs(5);
1647        let meta_lease_ttl = Duration::from_secs(1);
1648        let execution_timeout = Duration::from_secs(10);
1649        let idle_session_timeout = Duration::from_secs(0);
1650        let uuid = uuid::Uuid::new_v4().to_string();
1651        let table_name = "test_follower_action_greptime_metakv";
1652
1653        let follower_client =
1654            create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1655                .await
1656                .unwrap();
1657        let (tx, mut rx) = broadcast::channel(100);
1658        let follower_mysql_election = MySqlElection {
1659            leader_value: "test_follower".to_string(),
1660            client: follower_client,
1661            is_leader: AtomicBool::new(false),
1662            leader_infancy: AtomicBool::new(true),
1663            leader_watcher: tx,
1664            store_key_prefix: uuid.clone(),
1665            candidate_lease_ttl,
1666            meta_lease_ttl,
1667            sql_set: ElectionSqlFactory::new(table_name).build(),
1668        };
1669
1670        let leader_client =
1671            create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1672                .await
1673                .unwrap();
1674        let (tx, _) = broadcast::channel(100);
1675        let leader_mysql_election = MySqlElection {
1676            leader_value: "test_leader".to_string(),
1677            client: leader_client,
1678            is_leader: AtomicBool::new(false),
1679            leader_infancy: AtomicBool::new(true),
1680            leader_watcher: tx,
1681            store_key_prefix: uuid,
1682            candidate_lease_ttl,
1683            meta_lease_ttl,
1684            sql_set: ElectionSqlFactory::new(table_name).build(),
1685        };
1686
1687        leader_mysql_election.do_campaign().await.unwrap();
1688
1689        // Step 1: As a follower, the leader exists and the lease is not expired. Do nothing.
1690        follower_mysql_election.do_campaign().await.unwrap();
1691
1692        // Step 2: As a follower, the leader exists but the lease expired. Re-elect itself.
1693        tokio::time::sleep(meta_lease_ttl + Duration::from_secs(1)).await;
1694        follower_mysql_election.do_campaign().await.unwrap();
1695        assert!(follower_mysql_election.is_leader());
1696
1697        match rx.recv().await {
1698            Ok(LeaderChangeMessage::Elected(key)) => {
1699                assert_eq!(String::from_utf8_lossy(key.name()), "test_follower");
1700                assert_eq!(
1701                    String::from_utf8_lossy(key.key()),
1702                    follower_mysql_election.election_key()
1703                );
1704                assert_eq!(key.lease_id(), i64::default());
1705                assert_eq!(key.revision(), i64::default());
1706            }
1707            _ => panic!("Expected LeaderChangeMessage::Elected"),
1708        }
1709
1710        drop_table(&follower_mysql_election.client, table_name).await;
1711    }
1712
1713    #[tokio::test]
1714    async fn test_wait_timeout() {
1715        maybe_skip_mysql_integration_test!();
1716        common_telemetry::init_default_ut_logging();
1717        let execution_timeout = Duration::from_secs(10);
1718        let idle_session_timeout = Duration::from_secs(1);
1719
1720        let client = create_mysql_client(None, execution_timeout, idle_session_timeout)
1721            .await
1722            .unwrap();
1723        tokio::time::sleep(Duration::from_millis(1100)).await;
1724        // Wait for the idle session timeout.
1725        let err = client
1726            .lock()
1727            .await
1728            .query(sqlx::query("SELECT 1"), "SELECT 1")
1729            .await
1730            .unwrap_err();
1731        assert_matches!(err, error::Error::MySqlExecution { .. });
1732        // Reset the client and try again.
1733        client.lock().await.reset_client().await.unwrap();
1734        let _ = client
1735            .lock()
1736            .await
1737            .query(sqlx::query("SELECT 1"), "SELECT 1")
1738            .await
1739            .unwrap();
1740    }
1741}