1use std::sync::Arc;
16use std::sync::atomic::{AtomicBool, Ordering};
17use std::time::Duration;
18
19use common_meta::key::{CANDIDATES_ROOT, ELECTION_KEY};
20use common_telemetry::{error, warn};
21use common_time::Timestamp;
22use snafu::{OptionExt, ResultExt, ensure};
23use sqlx::mysql::{MySqlArguments, MySqlRow};
24use sqlx::pool::PoolConnection;
25use sqlx::query::Query;
26use sqlx::{MySql, MySqlPool, MySqlTransaction, Row};
27use tokio::sync::{Mutex, MutexGuard, broadcast};
28use tokio::time::MissedTickBehavior;
29
30use crate::election::rds::{LEASE_SEP, Lease, RdsLeaderKey, parse_value_and_expire_time};
31use crate::election::{
32 Election, LeaderChangeMessage, listen_leader_change, send_leader_change_and_set_flags,
33};
34use crate::error::{
35 AcquireMySqlClientSnafu, DecodeSqlValueSnafu, DeserializeFromJsonSnafu,
36 LeaderLeaseChangedSnafu, LeaderLeaseExpiredSnafu, MySqlExecutionSnafu, NoLeaderSnafu, Result,
37 SerializeToJsonSnafu, SqlExecutionTimeoutSnafu, UnexpectedSnafu,
38};
39use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};
40
41struct ElectionSqlFactory<'a> {
42 table_name: &'a str,
43}
44
45struct ElectionSqlSet {
46 campaign: String,
47 put_value_with_lease: String,
57 update_value_with_lease: String,
65 get_value_with_lease: String,
70 get_value_with_lease_by_prefix: String,
79 delete_value: String,
87}
88
89impl<'a> ElectionSqlFactory<'a> {
90 fn new(table_name: &'a str) -> Self {
91 Self { table_name }
92 }
93
94 fn build(self) -> ElectionSqlSet {
95 ElectionSqlSet {
96 campaign: self.campaign_sql(),
97 put_value_with_lease: self.put_value_with_lease_sql(),
98 update_value_with_lease: self.update_value_with_lease_sql(),
99 get_value_with_lease: self.get_value_with_lease_sql(),
100 get_value_with_lease_by_prefix: self.get_value_with_lease_by_prefix_sql(),
101 delete_value: self.delete_value_sql(),
102 }
103 }
104
105 fn campaign_sql(&self) -> String {
108 format!("SELECT * FROM `{}` FOR UPDATE;", self.table_name)
109 }
110
111 fn put_value_with_lease_sql(&self) -> String {
112 format!(
113 r#"
114 INSERT INTO `{}` (k, v) VALUES (
115 ?,
116 CONCAT(
117 ?,
118 '{}',
119 DATE_FORMAT(DATE_ADD(NOW(4), INTERVAL ? SECOND), '%Y-%m-%d %T.%f')
120 )
121 )
122 ON DUPLICATE KEY UPDATE v = VALUES(v);
123 "#,
124 self.table_name, LEASE_SEP
125 )
126 }
127
128 fn update_value_with_lease_sql(&self) -> String {
129 format!(
130 r#"UPDATE `{}`
131 SET v = CONCAT(?, '{}', DATE_FORMAT(DATE_ADD(NOW(4), INTERVAL ? SECOND), '%Y-%m-%d %T.%f'))
132 WHERE k = ? AND v = ?"#,
133 self.table_name, LEASE_SEP
134 )
135 }
136
137 fn get_value_with_lease_sql(&self) -> String {
138 format!(
139 r#"SELECT v, DATE_FORMAT(NOW(4), '%Y-%m-%d %T.%f') FROM `{}` WHERE k = ?"#,
140 self.table_name
141 )
142 }
143
144 fn get_value_with_lease_by_prefix_sql(&self) -> String {
145 format!(
146 r#"SELECT v, DATE_FORMAT(NOW(4), '%Y-%m-%d %T.%f') FROM `{}` WHERE k LIKE ?"#,
147 self.table_name
148 )
149 }
150
151 fn delete_value_sql(&self) -> String {
152 format!("DELETE FROM {} WHERE k = ?;", self.table_name)
153 }
154}
155
156enum Executor<'a> {
157 Default(MutexGuard<'a, ElectionMysqlClient>),
158 Txn(TransactionWithExecutionTimeout<'a>),
159}
160
161impl Executor<'_> {
162 async fn query(
163 &mut self,
164 query: Query<'_, MySql, MySqlArguments>,
165 sql: &str,
166 ) -> Result<Vec<MySqlRow>> {
167 match self {
168 Executor::Default(client) => client.query(query, sql).await,
169 Executor::Txn(txn) => txn.query(query, sql).await,
170 }
171 }
172
173 async fn execute(&mut self, query: Query<'_, MySql, MySqlArguments>, sql: &str) -> Result<u64> {
174 match self {
175 Executor::Default(client) => client.execute(query, sql).await,
176 Executor::Txn(txn) => txn.execute(query, sql).await,
177 }
178 }
179
180 async fn commit(self) -> Result<()> {
181 match self {
182 Executor::Txn(txn) => txn.commit().await,
183 _ => Ok(()),
184 }
185 }
186}
187
188pub struct ElectionMysqlClient {
190 current: Option<PoolConnection<MySql>>,
191 pool: MySqlPool,
192
193 execution_timeout: Duration,
198
199 max_execution_time: Duration,
204
205 innode_lock_wait_timeout: Duration,
210
211 wait_timeout: Duration,
217
218 table_for_election: String,
220}
221
222impl ElectionMysqlClient {
223 pub fn new(
224 pool: MySqlPool,
225 execution_timeout: Duration,
226 max_execution_time: Duration,
227 innode_lock_wait_timeout: Duration,
228 wait_timeout: Duration,
229 table_for_election: &str,
230 ) -> Self {
231 Self {
232 current: None,
233 pool,
234 execution_timeout,
235 max_execution_time,
236 innode_lock_wait_timeout,
237 wait_timeout,
238 table_for_election: table_for_election.to_string(),
239 }
240 }
241
242 fn create_table_sql(&self) -> String {
243 format!(
244 r#"
245 CREATE TABLE IF NOT EXISTS `{}` (
246 k VARBINARY(3072) PRIMARY KEY,
247 v BLOB
248 );"#,
249 self.table_for_election
250 )
251 }
252
253 fn insert_once_sql(&self) -> String {
254 format!(
255 "INSERT IGNORE INTO `{}` (k, v) VALUES ('__place_holder_for_lock', '');",
256 self.table_for_election
257 )
258 }
259
260 fn check_version_sql(&self) -> String {
261 "SELECT @@version;".to_string()
262 }
263
264 async fn reset_client(&mut self) -> Result<()> {
265 self.current = None;
266 self.maybe_init_client().await
267 }
268
269 async fn ensure_table_exists(&mut self) -> Result<()> {
270 let create_table_sql = self.create_table_sql();
271 let query = sqlx::query(&create_table_sql);
272 self.execute(query, &create_table_sql).await?;
273 let insert_once_sql = self.insert_once_sql();
275 let query = sqlx::query(&insert_once_sql);
276 self.execute(query, &insert_once_sql).await?;
277 Ok(())
278 }
279
280 async fn maybe_init_client(&mut self) -> Result<()> {
281 if self.current.is_none() {
282 let client = self.pool.acquire().await.context(AcquireMySqlClientSnafu)?;
283
284 self.current = Some(client);
285 let (query, sql) = if !self.wait_timeout.is_zero() {
286 let sql = "SET SESSION wait_timeout = ?, innodb_lock_wait_timeout = ?, max_execution_time = ?;";
287 (
288 sqlx::query(sql)
289 .bind(self.wait_timeout.as_secs())
290 .bind(self.innode_lock_wait_timeout.as_secs())
291 .bind(self.max_execution_time.as_millis() as u64),
292 sql,
293 )
294 } else {
295 let sql = "SET SESSION innodb_lock_wait_timeout = ?, max_execution_time = ?;";
296 (
297 sqlx::query(sql)
298 .bind(self.innode_lock_wait_timeout.as_secs())
299 .bind(self.max_execution_time.as_millis() as u64),
300 sql,
301 )
302 };
303 self.set_session_isolation_level().await?;
304 self.execute(query, sql).await?;
305 self.check_version(&self.check_version_sql()).await?;
306 }
307 Ok(())
308 }
309
310 async fn set_session_isolation_level(&mut self) -> Result<()> {
315 let sql = "SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE";
316 let query = sqlx::query(sql);
317 self.execute(query, sql).await?;
318 Ok(())
319 }
320
321 async fn check_version(&mut self, sql: &str) -> Result<()> {
326 let query = sqlx::query(sql);
328 let client = self.current.as_mut().unwrap();
330 let row = tokio::time::timeout(self.execution_timeout, query.fetch_one(&mut **client))
331 .await
332 .map_err(|_| {
333 SqlExecutionTimeoutSnafu {
334 sql,
335 duration: self.execution_timeout,
336 }
337 .build()
338 })?;
339 match row {
340 Ok(row) => {
341 let version: String = row.try_get(0).context(DecodeSqlValueSnafu {})?;
342 if !version.starts_with("8.0") && !version.starts_with("5.7") {
343 warn!(
344 "Unsupported MySQL version: {}, expected: [5.7, 8.0]",
345 version
346 );
347 }
348 }
349 Err(e) => {
350 warn!(e; "Failed to check MySQL version through sql: {}", sql);
351 }
352 }
353 Ok(())
354 }
355
356 async fn execute(&mut self, query: Query<'_, MySql, MySqlArguments>, sql: &str) -> Result<u64> {
361 let client = self.current.as_mut().unwrap();
363 let future = query.execute(&mut **client);
364 let rows_affected = tokio::time::timeout(self.execution_timeout, future)
365 .await
366 .map_err(|_| {
367 SqlExecutionTimeoutSnafu {
368 sql,
369 duration: self.execution_timeout,
370 }
371 .build()
372 })?
373 .context(MySqlExecutionSnafu { sql })?
374 .rows_affected();
375 Ok(rows_affected)
376 }
377
378 async fn query(
383 &mut self,
384 query: Query<'_, MySql, MySqlArguments>,
385 sql: &str,
386 ) -> Result<Vec<MySqlRow>> {
387 let client = self.current.as_mut().unwrap();
389 let future = query.fetch_all(&mut **client);
390 tokio::time::timeout(self.execution_timeout, future)
391 .await
392 .map_err(|_| {
393 SqlExecutionTimeoutSnafu {
394 sql,
395 duration: self.execution_timeout,
396 }
397 .build()
398 })?
399 .context(MySqlExecutionSnafu { sql })
400 }
401
402 async fn transaction(&mut self) -> Result<TransactionWithExecutionTimeout<'_>> {
403 use sqlx::Acquire;
404 let client = self.current.as_mut().unwrap();
405 let transaction = client
406 .begin()
407 .await
408 .context(MySqlExecutionSnafu { sql: "BEGIN" })?;
409
410 Ok(TransactionWithExecutionTimeout {
411 transaction,
412 execution_timeout: self.execution_timeout,
413 })
414 }
415}
416
417struct TransactionWithExecutionTimeout<'a> {
418 transaction: MySqlTransaction<'a>,
419 execution_timeout: Duration,
420}
421
422impl TransactionWithExecutionTimeout<'_> {
423 async fn query(
424 &mut self,
425 query: Query<'_, MySql, MySqlArguments>,
426 sql: &str,
427 ) -> Result<Vec<MySqlRow>> {
428 let res = tokio::time::timeout(
429 self.execution_timeout,
430 query.fetch_all(&mut *self.transaction),
431 )
432 .await
433 .map_err(|_| {
434 SqlExecutionTimeoutSnafu {
435 sql,
436 duration: self.execution_timeout,
437 }
438 .build()
439 })?
440 .context(MySqlExecutionSnafu { sql })?;
441 Ok(res)
442 }
443
444 async fn execute(&mut self, query: Query<'_, MySql, MySqlArguments>, sql: &str) -> Result<u64> {
445 let res = tokio::time::timeout(
446 self.execution_timeout,
447 query.execute(&mut *self.transaction),
448 )
449 .await
450 .map_err(|_| {
451 SqlExecutionTimeoutSnafu {
452 sql,
453 duration: self.execution_timeout,
454 }
455 .build()
456 })?
457 .context(MySqlExecutionSnafu { sql })?;
458 Ok(res.rows_affected())
459 }
460
461 async fn commit(self) -> Result<()> {
462 tokio::time::timeout(self.execution_timeout, self.transaction.commit())
463 .await
464 .map_err(|_| {
465 SqlExecutionTimeoutSnafu {
466 sql: "COMMIT",
467 duration: self.execution_timeout,
468 }
469 .build()
470 })?
471 .context(MySqlExecutionSnafu { sql: "COMMIT" })?;
472 Ok(())
473 }
474}
475
476pub struct MySqlElection {
478 leader_value: String,
479 client: Mutex<ElectionMysqlClient>,
480 is_leader: AtomicBool,
481 leader_infancy: AtomicBool,
482 leader_watcher: broadcast::Sender<LeaderChangeMessage>,
483 store_key_prefix: String,
484 candidate_lease_ttl: Duration,
485 meta_lease_ttl: Duration,
486 sql_set: ElectionSqlSet,
487}
488
489impl MySqlElection {
490 pub async fn with_mysql_client(
491 leader_value: String,
492 mut client: ElectionMysqlClient,
493 store_key_prefix: String,
494 candidate_lease_ttl: Duration,
495 meta_lease_ttl: Duration,
496 table_name: &str,
497 ) -> Result<ElectionRef> {
498 let sql_factory = ElectionSqlFactory::new(table_name);
499 client.maybe_init_client().await?;
500 client.ensure_table_exists().await?;
501 let tx = listen_leader_change(leader_value.clone());
502 Ok(Arc::new(Self {
503 leader_value,
504 client: Mutex::new(client),
505 is_leader: AtomicBool::new(false),
506 leader_infancy: AtomicBool::new(false),
507 leader_watcher: tx,
508 store_key_prefix,
509 candidate_lease_ttl,
510 meta_lease_ttl,
511 sql_set: sql_factory.build(),
512 }))
513 }
514
515 fn election_key(&self) -> String {
516 format!("{}{}", self.store_key_prefix, ELECTION_KEY)
517 }
518
519 fn candidate_root(&self) -> String {
520 format!("{}{}", self.store_key_prefix, CANDIDATES_ROOT)
521 }
522
523 fn candidate_key(&self) -> String {
524 format!("{}{}", self.candidate_root(), self.leader_value)
525 }
526
527 async fn maybe_init_client(&self) -> Result<()> {
528 let mut client = self.client.lock().await;
529 client.maybe_init_client().await?;
530 Ok(())
531 }
532}
533
534#[async_trait::async_trait]
535impl Election for MySqlElection {
536 type Leader = LeaderValue;
537
538 fn is_leader(&self) -> bool {
539 self.is_leader.load(Ordering::Relaxed)
540 }
541
542 fn in_leader_infancy(&self) -> bool {
543 self.leader_infancy
544 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
545 .is_ok()
546 }
547
548 async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> {
549 self.maybe_init_client().await?;
550 let key = self.candidate_key();
551 let node_info =
552 serde_json::to_string(node_info).with_context(|_| SerializeToJsonSnafu {
553 input: format!("{node_info:?}"),
554 })?;
555
556 {
557 let client = self.client.lock().await;
558 let mut executor = Executor::Default(client);
559 let res = self
560 .put_value_with_lease(
561 &key,
562 &node_info,
563 self.candidate_lease_ttl.as_secs(),
564 &mut executor,
565 )
566 .await?;
567 if !res {
569 warn!("Candidate already registered, update the lease");
570 self.delete_value(&key, &mut executor).await?;
571 self.put_value_with_lease(
572 &key,
573 &node_info,
574 self.candidate_lease_ttl.as_secs(),
575 &mut executor,
576 )
577 .await?;
578 }
579 }
580
581 let mut keep_alive_interval = tokio::time::interval(self.candidate_lease_ttl / 2);
583 loop {
584 let _ = keep_alive_interval.tick().await;
585 let client = self.client.lock().await;
586 let mut executor = Executor::Default(client);
587 let lease = self
588 .get_value_with_lease(&key, &mut executor)
589 .await?
590 .unwrap_or_default();
591
592 ensure!(
593 lease.expire_time > lease.current,
594 UnexpectedSnafu {
595 violated: format!(
596 "Candidate lease expired at {:?} (current time: {:?}), key: {:?}",
597 lease.expire_time,
598 lease.current,
599 String::from_utf8_lossy(&key.into_bytes())
600 ),
601 }
602 );
603
604 self.update_value_with_lease(
605 &key,
606 &lease.origin,
607 &node_info,
608 self.candidate_lease_ttl.as_secs(),
609 &mut executor,
610 )
611 .await?;
612 std::mem::drop(executor);
613 }
614 }
615
616 async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>> {
617 self.maybe_init_client().await?;
618 let key_prefix = self.candidate_root();
619 let client = self.client.lock().await;
620 let mut executor = Executor::Default(client);
621 let (mut candidates, current) = self
622 .get_value_with_lease_by_prefix(&key_prefix, &mut executor)
623 .await?;
624 candidates.retain(|c| c.1 > current);
626 let mut valid_candidates = Vec::with_capacity(candidates.len());
627 for (c, _) in candidates {
628 let node_info: MetasrvNodeInfo =
629 serde_json::from_str(&c).with_context(|_| DeserializeFromJsonSnafu {
630 input: format!("{:?}", c),
631 })?;
632 valid_candidates.push(node_info);
633 }
634 Ok(valid_candidates)
635 }
636
637 async fn campaign(&self) -> Result<()> {
638 self.maybe_init_client().await?;
639 let mut keep_alive_interval = tokio::time::interval(self.meta_lease_ttl / 2);
640 keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
641 loop {
642 self.do_campaign().await?;
643 keep_alive_interval.tick().await;
644 }
645 }
646
647 async fn reset_campaign(&self) {
648 if let Err(err) = self.client.lock().await.reset_client().await {
649 error!(err; "Failed to reset client");
650 }
651 }
652
653 async fn leader(&self) -> Result<Self::Leader> {
654 self.maybe_init_client().await?;
655 if self.is_leader.load(Ordering::Relaxed) {
656 Ok(self.leader_value.as_bytes().into())
657 } else {
658 let key = self.election_key();
659
660 let client = self.client.lock().await;
661 let mut executor = Executor::Default(client);
662 if let Some(lease) = self.get_value_with_lease(&key, &mut executor).await? {
663 ensure!(lease.expire_time > lease.current, NoLeaderSnafu);
664 Ok(lease.leader_value.as_bytes().into())
665 } else {
666 NoLeaderSnafu.fail()
667 }
668 }
669 }
670
671 async fn resign(&self) -> Result<()> {
672 todo!()
673 }
674
675 fn subscribe_leader_change(&self) -> broadcast::Receiver<LeaderChangeMessage> {
676 self.leader_watcher.subscribe()
677 }
678}
679
680impl MySqlElection {
681 async fn get_value_with_lease(
683 &self,
684 key: &str,
685 executor: &mut Executor<'_>,
686 ) -> Result<Option<Lease>> {
687 let key = key.as_bytes();
688 let query = sqlx::query(&self.sql_set.get_value_with_lease).bind(key);
689 let res = executor
690 .query(query, &self.sql_set.get_value_with_lease)
691 .await?;
692
693 if res.is_empty() {
694 return Ok(None);
695 }
696 let current_time_str = String::from_utf8_lossy(res[0].try_get(1).unwrap());
698 let current_time = match Timestamp::from_str(¤t_time_str, None) {
699 Ok(ts) => ts,
700 Err(_) => UnexpectedSnafu {
701 violated: format!("Invalid timestamp: {}", current_time_str),
702 }
703 .fail()?,
704 };
705 let value_and_expire_time = String::from_utf8_lossy(res[0].try_get(0).unwrap_or_default());
707 let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
708
709 Ok(Some(Lease {
710 leader_value: value,
711 expire_time,
712 current: current_time,
713 origin: value_and_expire_time.to_string(),
714 }))
715 }
716
717 async fn get_value_with_lease_by_prefix(
719 &self,
720 key_prefix: &str,
721 executor: &mut Executor<'_>,
722 ) -> Result<(Vec<(String, Timestamp)>, Timestamp)> {
723 let key_prefix = format!("{}%", key_prefix).as_bytes().to_vec();
724 let query = sqlx::query(&self.sql_set.get_value_with_lease_by_prefix).bind(key_prefix);
725 let res = executor
726 .query(query, &self.sql_set.get_value_with_lease_by_prefix)
727 .await?;
728
729 let mut values_with_leases = vec![];
730 let mut current = Timestamp::default();
731 for row in res {
732 let current_time_str = row.try_get(1).unwrap_or_default();
733 current = match Timestamp::from_str(current_time_str, None) {
734 Ok(ts) => ts,
735 Err(_) => UnexpectedSnafu {
736 violated: format!("Invalid timestamp: {}", current_time_str),
737 }
738 .fail()?,
739 };
740
741 let value_and_expire_time = String::from_utf8_lossy(row.try_get(0).unwrap_or_default());
742 let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
743
744 values_with_leases.push((value, expire_time));
745 }
746 Ok((values_with_leases, current))
747 }
748
749 async fn update_value_with_lease(
750 &self,
751 key: &str,
752 prev: &str,
753 updated: &str,
754 lease_ttl: u64,
755 executor: &mut Executor<'_>,
756 ) -> Result<()> {
757 let key = key.as_bytes();
758 let prev = prev.as_bytes();
759 let updated = updated.as_bytes();
760
761 let query = sqlx::query(&self.sql_set.update_value_with_lease)
762 .bind(updated)
763 .bind(lease_ttl as f64)
764 .bind(key)
765 .bind(prev);
766 let res = executor
767 .execute(query, &self.sql_set.update_value_with_lease)
768 .await?;
769
770 ensure!(
771 res == 1,
772 UnexpectedSnafu {
773 violated: format!("Failed to update key: {}", String::from_utf8_lossy(key)),
774 }
775 );
776
777 Ok(())
778 }
779
780 async fn put_value_with_lease(
782 &self,
783 key: &str,
784 value: &str,
785 lease_ttl_secs: u64,
786 executor: &mut Executor<'_>,
787 ) -> Result<bool> {
788 let key = key.as_bytes();
789 let lease_ttl_secs = lease_ttl_secs as f64;
790 let query = sqlx::query(&self.sql_set.put_value_with_lease)
791 .bind(key)
792 .bind(value)
793 .bind(lease_ttl_secs);
794 let res = executor
795 .execute(query, &self.sql_set.put_value_with_lease)
796 .await?;
797 Ok(res == 1)
798 }
799
800 async fn delete_value(&self, key: &str, executor: &mut Executor<'_>) -> Result<bool> {
803 let key = key.as_bytes();
804 let query = sqlx::query(&self.sql_set.delete_value).bind(key);
805 let res = executor.execute(query, &self.sql_set.delete_value).await?;
806
807 Ok(res == 1)
808 }
809
810 async fn do_campaign(&self) -> Result<()> {
813 let lease = {
814 let client = self.client.lock().await;
815 let mut executor = Executor::Default(client);
816 self.get_value_with_lease(&self.election_key(), &mut executor)
817 .await?
818 };
819
820 let is_leader = self.is_leader();
821 let is_current_leader = lease
824 .as_ref()
825 .map(|lease| lease.leader_value == self.leader_value)
826 .unwrap_or(false);
827 match (self.lease_check(&lease), is_leader, is_current_leader) {
828 (Ok(_), true, true) => {
830 let mut client = self.client.lock().await;
831 let txn = client.transaction().await?;
832 let mut executor = Executor::Txn(txn);
833 let query = sqlx::query(&self.sql_set.campaign);
834 executor.query(query, &self.sql_set.campaign).await?;
835 self.renew_lease(executor, lease.unwrap()).await?;
837 }
838 (Err(err), true, _) => {
841 warn!(err; "Leader lease expired, step down...");
842 self.step_down_without_lock().await?;
843 }
844 (Ok(_), true, false) => {
845 warn!("Leader lease expired, step down...");
846 self.step_down_without_lock().await?;
847 }
848 (Err(err), false, _) => {
850 warn!(err; "Leader lease expired, elect myself.");
851 let mut client = self.client.lock().await;
852 let txn = client.transaction().await?;
853 let mut executor = Executor::Txn(txn);
854 let query = sqlx::query(&self.sql_set.campaign);
855 executor.query(query, &self.sql_set.campaign).await?;
856 self.elected(executor, lease).await?;
857 }
858 (Ok(_), false, true) => {
861 warn!("I should be the leader, but I don't think so. Something went wrong.");
862 let mut client = self.client.lock().await;
863 let txn = client.transaction().await?;
864 let mut executor = Executor::Txn(txn);
865 let query = sqlx::query(&self.sql_set.campaign);
866 executor.query(query, &self.sql_set.campaign).await?;
867 self.renew_lease(executor, lease.unwrap()).await?;
869 }
870 (Ok(_), false, false) => {}
872 }
873 Ok(())
874 }
875
876 async fn renew_lease(&self, mut executor: Executor<'_>, lease: Lease) -> Result<()> {
878 let key = self.election_key();
879 self.update_value_with_lease(
880 &key,
881 &lease.origin,
882 &self.leader_value,
883 self.meta_lease_ttl.as_secs(),
884 &mut executor,
885 )
886 .await?;
887 executor.commit().await?;
888
889 if !self.is_leader() {
890 let key = self.election_key();
891 let leader_key = RdsLeaderKey {
892 name: self.leader_value.clone().into_bytes(),
893 key: key.clone().into_bytes(),
894 ..Default::default()
895 };
896 send_leader_change_and_set_flags(
897 &self.is_leader,
898 &self.leader_infancy,
899 &self.leader_watcher,
900 LeaderChangeMessage::Elected(Arc::new(leader_key)),
901 );
902 }
903
904 Ok(())
905 }
906
907 fn lease_check(&self, lease: &Option<Lease>) -> Result<Lease> {
916 let lease = lease.as_ref().context(NoLeaderSnafu)?;
917 ensure!(lease.expire_time > lease.current, LeaderLeaseExpiredSnafu);
919 Ok(lease.clone())
921 }
922
923 async fn step_down_without_lock(&self) -> Result<()> {
925 let key = self.election_key().into_bytes();
926 let leader_key = RdsLeaderKey {
927 name: self.leader_value.clone().into_bytes(),
928 key: key.clone(),
929 ..Default::default()
930 };
931 send_leader_change_and_set_flags(
932 &self.is_leader,
933 &self.leader_infancy,
934 &self.leader_watcher,
935 LeaderChangeMessage::StepDown(Arc::new(leader_key)),
936 );
937 Ok(())
938 }
939
940 async fn elected(
943 &self,
944 mut executor: Executor<'_>,
945 expected_lease: Option<Lease>,
946 ) -> Result<()> {
947 let key = self.election_key();
948 let leader_key = RdsLeaderKey {
949 name: self.leader_value.clone().into_bytes(),
950 key: key.clone().into_bytes(),
951 ..Default::default()
952 };
953 let remote_lease = self.get_value_with_lease(&key, &mut executor).await?;
954 ensure!(
955 expected_lease.map(|lease| lease.origin) == remote_lease.map(|lease| lease.origin),
956 LeaderLeaseChangedSnafu
957 );
958 self.delete_value(&key, &mut executor).await?;
959 self.put_value_with_lease(
960 &key,
961 &self.leader_value,
962 self.meta_lease_ttl.as_secs(),
963 &mut executor,
964 )
965 .await?;
966 executor.commit().await?;
967
968 send_leader_change_and_set_flags(
969 &self.is_leader,
970 &self.leader_infancy,
971 &self.leader_watcher,
972 LeaderChangeMessage::Elected(Arc::new(leader_key)),
973 );
974 Ok(())
975 }
976}
977
978#[cfg(test)]
979mod tests {
980 use std::assert_matches::assert_matches;
981 use std::env;
982
983 use common_meta::maybe_skip_mysql_integration_test;
984 use common_telemetry::init_default_ut_logging;
985
986 use super::*;
987 use crate::error;
988 use crate::utils::mysql::create_mysql_pool;
989
990 async fn create_mysql_client(
991 table_name: Option<&str>,
992 execution_timeout: Duration,
993 wait_timeout: Duration,
994 ) -> Result<Mutex<ElectionMysqlClient>> {
995 init_default_ut_logging();
996 let endpoint = env::var("GT_MYSQL_ENDPOINTS").unwrap_or_default();
997 if endpoint.is_empty() {
998 return UnexpectedSnafu {
999 violated: "MySQL endpoint is empty".to_string(),
1000 }
1001 .fail();
1002 }
1003 let pool = create_mysql_pool(&[endpoint], None).await.unwrap();
1004 let mut client = ElectionMysqlClient::new(
1005 pool,
1006 execution_timeout,
1007 execution_timeout,
1008 Duration::from_secs(1),
1009 wait_timeout,
1010 table_name.unwrap_or("default_greptime_metakv_election"),
1011 );
1012 client.maybe_init_client().await?;
1013 if table_name.is_some() {
1014 client.ensure_table_exists().await?;
1015 }
1016 Ok(Mutex::new(client))
1017 }
1018
1019 async fn drop_table(client: &Mutex<ElectionMysqlClient>, table_name: &str) {
1020 let mut client = client.lock().await;
1021 let sql = format!("DROP TABLE IF EXISTS {};", table_name);
1022 client.execute(sqlx::query(&sql), &sql).await.unwrap();
1023 }
1024
1025 #[tokio::test]
1026 async fn test_mysql_crud() {
1027 maybe_skip_mysql_integration_test!();
1028 let key = "test_key".to_string();
1029 let value = "test_value".to_string();
1030
1031 let uuid = uuid::Uuid::new_v4().to_string();
1032 let table_name = "test_mysql_crud_greptime_metakv";
1033 let candidate_lease_ttl = Duration::from_secs(10);
1034 let meta_lease_ttl = Duration::from_secs(2);
1035
1036 let execution_timeout = Duration::from_secs(10);
1037 let idle_session_timeout = Duration::from_secs(0);
1038 let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1039 .await
1040 .unwrap();
1041
1042 {
1043 let mut a = client.lock().await;
1044 let txn = a.transaction().await.unwrap();
1045 let mut executor = Executor::Txn(txn);
1046 let raw_query = format!("SELECT * FROM {} FOR UPDATE;", table_name);
1047 let query = sqlx::query(&raw_query);
1048 let _ = executor.query(query, &raw_query).await.unwrap();
1049 }
1050
1051 let (tx, _) = broadcast::channel(100);
1052 let mysql_election = MySqlElection {
1053 leader_value: "test_leader".to_string(),
1054 client,
1055 is_leader: AtomicBool::new(false),
1056 leader_infancy: AtomicBool::new(true),
1057 leader_watcher: tx,
1058 store_key_prefix: uuid,
1059 candidate_lease_ttl,
1060 meta_lease_ttl,
1061 sql_set: ElectionSqlFactory::new(table_name).build(),
1062 };
1063 let client = mysql_election.client.lock().await;
1064 let mut executor = Executor::Default(client);
1065 let res = mysql_election
1066 .put_value_with_lease(&key, &value, 10, &mut executor)
1067 .await
1068 .unwrap();
1069 assert!(res);
1070
1071 let lease = mysql_election
1072 .get_value_with_lease(&key, &mut executor)
1073 .await
1074 .unwrap()
1075 .unwrap();
1076 assert_eq!(lease.leader_value, value);
1077
1078 mysql_election
1079 .update_value_with_lease(&key, &lease.origin, &value, 10, &mut executor)
1080 .await
1081 .unwrap();
1082
1083 let res = mysql_election
1084 .delete_value(&key, &mut executor)
1085 .await
1086 .unwrap();
1087 assert!(res);
1088
1089 let res = mysql_election
1090 .get_value_with_lease(&key, &mut executor)
1091 .await
1092 .unwrap();
1093 assert!(res.is_none());
1094
1095 for i in 0..10 {
1096 let key = format!("test_key_{}", i);
1097 let value = format!("test_value_{}", i);
1098 mysql_election
1099 .put_value_with_lease(&key, &value, 10, &mut executor)
1100 .await
1101 .unwrap();
1102 }
1103
1104 let key_prefix = "test_key".to_string();
1105 let (res, _) = mysql_election
1106 .get_value_with_lease_by_prefix(&key_prefix, &mut executor)
1107 .await
1108 .unwrap();
1109 assert_eq!(res.len(), 10);
1110
1111 for i in 0..10 {
1112 let key = format!("test_key_{}", i);
1113 let res = mysql_election
1114 .delete_value(&key, &mut executor)
1115 .await
1116 .unwrap();
1117 assert!(res);
1118 }
1119
1120 let (res, current) = mysql_election
1121 .get_value_with_lease_by_prefix(&key_prefix, &mut executor)
1122 .await
1123 .unwrap();
1124 assert!(res.is_empty());
1125 assert!(current == Timestamp::default());
1126
1127 std::mem::drop(executor);
1129 drop_table(&mysql_election.client, table_name).await;
1130 }
1131
1132 async fn candidate(
1133 leader_value: String,
1134 candidate_lease_ttl: Duration,
1135 store_key_prefix: String,
1136 table_name: String,
1137 ) {
1138 let meta_lease_ttl = Duration::from_secs(2);
1139 let execution_timeout = Duration::from_secs(10);
1140 let idle_session_timeout = Duration::from_secs(0);
1141 let client =
1142 create_mysql_client(Some(&table_name), execution_timeout, idle_session_timeout)
1143 .await
1144 .unwrap();
1145
1146 let (tx, _) = broadcast::channel(100);
1147 let mysql_election = MySqlElection {
1148 leader_value,
1149 client,
1150 is_leader: AtomicBool::new(false),
1151 leader_infancy: AtomicBool::new(true),
1152 leader_watcher: tx,
1153 store_key_prefix,
1154 candidate_lease_ttl,
1155 meta_lease_ttl,
1156 sql_set: ElectionSqlFactory::new(&table_name).build(),
1157 };
1158
1159 let node_info = MetasrvNodeInfo {
1160 addr: "test_addr".to_string(),
1161 version: "test_version".to_string(),
1162 git_commit: "test_git_commit".to_string(),
1163 start_time_ms: 0,
1164 cpus: 0,
1165 memory_bytes: 0,
1166 };
1167 mysql_election.register_candidate(&node_info).await.unwrap();
1168 }
1169
1170 #[tokio::test]
1171 async fn test_candidate_registration() {
1172 maybe_skip_mysql_integration_test!();
1173 let leader_value_prefix = "test_leader".to_string();
1174 let candidate_lease_ttl = Duration::from_secs(2);
1175 let execution_timeout = Duration::from_secs(10);
1176 let meta_lease_ttl = Duration::from_secs(2);
1177 let idle_session_timeout = Duration::from_secs(0);
1178 let uuid = uuid::Uuid::new_v4().to_string();
1179 let table_name = "test_candidate_registration_greptime_metakv";
1180 let mut handles = vec![];
1181 let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1182 .await
1183 .unwrap();
1184
1185 for i in 0..10 {
1186 let leader_value = format!("{}{}", leader_value_prefix, i);
1187 let handle = tokio::spawn(candidate(
1188 leader_value,
1189 candidate_lease_ttl,
1190 uuid.clone(),
1191 table_name.to_string(),
1192 ));
1193 handles.push(handle);
1194 }
1195 tokio::time::sleep(candidate_lease_ttl / 2 + Duration::from_secs(1)).await;
1197
1198 let (tx, _) = broadcast::channel(100);
1199 let leader_value = "test_leader".to_string();
1200 let mysql_election = MySqlElection {
1201 leader_value,
1202 client,
1203 is_leader: AtomicBool::new(false),
1204 leader_infancy: AtomicBool::new(true),
1205 leader_watcher: tx,
1206 store_key_prefix: uuid.clone(),
1207 candidate_lease_ttl,
1208 meta_lease_ttl,
1209 sql_set: ElectionSqlFactory::new(table_name).build(),
1210 };
1211
1212 let candidates = mysql_election.all_candidates().await.unwrap();
1213 assert_eq!(candidates.len(), 10);
1214
1215 for handle in handles {
1216 handle.abort();
1217 }
1218
1219 tokio::time::sleep(candidate_lease_ttl + Duration::from_secs(1)).await;
1221 let candidates = mysql_election.all_candidates().await.unwrap();
1222 assert!(candidates.is_empty());
1223
1224 let client = mysql_election.client.lock().await;
1226 let mut executor = Executor::Default(client);
1227 for i in 0..10 {
1228 let key = format!("{}{}{}{}", uuid, CANDIDATES_ROOT, leader_value_prefix, i);
1229 let res = mysql_election
1230 .delete_value(&key, &mut executor)
1231 .await
1232 .unwrap();
1233 assert!(res);
1234 }
1235
1236 std::mem::drop(executor);
1238 drop_table(&mysql_election.client, table_name).await;
1239 }
1240
1241 async fn elected(
1242 election: &MySqlElection,
1243 table_name: &str,
1244 expected_lease: Option<Lease>,
1245 ) -> Result<()> {
1246 let mut client = election.client.lock().await;
1247 let txn = client.transaction().await.unwrap();
1248 let mut executor = Executor::Txn(txn);
1249 let raw_query = format!("SELECT * FROM {} FOR UPDATE;", table_name);
1250 let query = sqlx::query(&raw_query);
1251 let _ = executor.query(query, &raw_query).await.unwrap();
1252 election.elected(executor, expected_lease).await
1253 }
1254
1255 async fn get_lease(election: &MySqlElection) -> Option<Lease> {
1256 let client = election.client.lock().await;
1257 let mut executor = Executor::Default(client);
1258 election
1259 .get_value_with_lease(&election.election_key(), &mut executor)
1260 .await
1261 .unwrap()
1262 }
1263
1264 #[tokio::test]
1265 async fn test_elected_with_incorrect_lease_fails() {
1266 maybe_skip_mysql_integration_test!();
1267 let leader_value = "test_leader".to_string();
1268 let candidate_lease_ttl = Duration::from_secs(5);
1269 let meta_lease_ttl = Duration::from_secs(2);
1270 let execution_timeout = Duration::from_secs(10);
1271 let idle_session_timeout = Duration::from_secs(0);
1272 let uuid = uuid::Uuid::new_v4().to_string();
1273 let table_name = "test_elected_failed_greptime_metakv";
1274 let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1275 .await
1276 .unwrap();
1277
1278 let (tx, _) = broadcast::channel(100);
1279 let leader_mysql_election = MySqlElection {
1280 leader_value: leader_value.clone(),
1281 client,
1282 is_leader: AtomicBool::new(false),
1283 leader_infancy: AtomicBool::new(true),
1284 leader_watcher: tx,
1285 store_key_prefix: uuid,
1286 candidate_lease_ttl,
1287 meta_lease_ttl,
1288 sql_set: ElectionSqlFactory::new(table_name).build(),
1289 };
1290
1291 let incorrect_lease = Lease::default();
1292 let err = elected(&leader_mysql_election, table_name, Some(incorrect_lease))
1293 .await
1294 .unwrap_err();
1295 assert_matches!(err, error::Error::LeaderLeaseChanged { .. });
1296 let lease = get_lease(&leader_mysql_election).await;
1297 assert!(lease.is_none());
1298 drop_table(&leader_mysql_election.client, table_name).await;
1299 }
1300
1301 #[tokio::test]
1302 async fn test_reelection_with_idle_session_timeout() {
1303 maybe_skip_mysql_integration_test!();
1304 let leader_value = "test_leader".to_string();
1305 let uuid = uuid::Uuid::new_v4().to_string();
1306 let table_name = "test_reelection_greptime_metakv";
1307 let candidate_lease_ttl = Duration::from_secs(5);
1308 let meta_lease_ttl = Duration::from_secs(5);
1309 let execution_timeout = Duration::from_secs(10);
1310 let idle_session_timeout = Duration::from_secs(2);
1311 let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1312 .await
1313 .unwrap();
1314
1315 let (tx, _) = broadcast::channel(100);
1316 let leader_mysql_election = MySqlElection {
1317 leader_value: leader_value.clone(),
1318 client,
1319 is_leader: AtomicBool::new(false),
1320 leader_infancy: AtomicBool::new(true),
1321 leader_watcher: tx,
1322 store_key_prefix: uuid,
1323 candidate_lease_ttl,
1324 meta_lease_ttl,
1325 sql_set: ElectionSqlFactory::new(table_name).build(),
1326 };
1327
1328 elected(&leader_mysql_election, table_name, None)
1329 .await
1330 .unwrap();
1331 let lease = get_lease(&leader_mysql_election).await.unwrap();
1332 assert_eq!(lease.leader_value, leader_value);
1333 assert!(lease.expire_time > lease.current);
1334 assert!(leader_mysql_election.is_leader());
1335 tokio::time::sleep(Duration::from_millis(2100)).await;
1337 leader_mysql_election
1339 .client
1340 .lock()
1341 .await
1342 .query(sqlx::query("SELECT 1"), "SELECT 1")
1343 .await
1344 .unwrap_err();
1345 leader_mysql_election
1347 .client
1348 .lock()
1349 .await
1350 .reset_client()
1351 .await
1352 .unwrap();
1353
1354 elected(&leader_mysql_election, table_name, Some(lease.clone()))
1356 .await
1357 .unwrap();
1358 let lease = get_lease(&leader_mysql_election).await.unwrap();
1359 assert_eq!(lease.leader_value, leader_value);
1360 assert!(lease.expire_time > lease.current);
1361 assert!(leader_mysql_election.is_leader());
1362 drop_table(&leader_mysql_election.client, table_name).await;
1363 }
1364
1365 #[tokio::test]
1366 async fn test_elected_and_step_down() {
1367 maybe_skip_mysql_integration_test!();
1368 let leader_value = "test_leader".to_string();
1369 let candidate_lease_ttl = Duration::from_secs(5);
1370 let meta_lease_ttl = Duration::from_secs(2);
1371 let execution_timeout = Duration::from_secs(10);
1372 let idle_session_timeout = Duration::from_secs(0);
1373 let uuid = uuid::Uuid::new_v4().to_string();
1374 let table_name = "test_elected_and_step_down_greptime_metakv";
1375 let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1376 .await
1377 .unwrap();
1378
1379 let (tx, mut rx) = broadcast::channel(100);
1380 let leader_mysql_election = MySqlElection {
1381 leader_value: leader_value.clone(),
1382 client,
1383 is_leader: AtomicBool::new(false),
1384 leader_infancy: AtomicBool::new(true),
1385 leader_watcher: tx,
1386 store_key_prefix: uuid,
1387 candidate_lease_ttl,
1388 meta_lease_ttl,
1389 sql_set: ElectionSqlFactory::new(table_name).build(),
1390 };
1391
1392 elected(&leader_mysql_election, table_name, None)
1393 .await
1394 .unwrap();
1395 let lease = get_lease(&leader_mysql_election).await.unwrap();
1396 assert_eq!(lease.leader_value, leader_value);
1397 assert!(lease.expire_time > lease.current);
1398 assert!(leader_mysql_election.is_leader());
1399
1400 match rx.recv().await {
1401 Ok(LeaderChangeMessage::Elected(key)) => {
1402 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1403 assert_eq!(
1404 String::from_utf8_lossy(key.key()),
1405 leader_mysql_election.election_key()
1406 );
1407 assert_eq!(key.lease_id(), i64::default());
1408 assert_eq!(key.revision(), i64::default());
1409 }
1410 _ => panic!("Expected LeaderChangeMessage::Elected"),
1411 }
1412
1413 leader_mysql_election
1414 .step_down_without_lock()
1415 .await
1416 .unwrap();
1417 let lease = get_lease(&leader_mysql_election).await.unwrap();
1418 assert_eq!(lease.leader_value, leader_value);
1419 assert!(!leader_mysql_election.is_leader());
1420
1421 match rx.recv().await {
1422 Ok(LeaderChangeMessage::StepDown(key)) => {
1423 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1424 assert_eq!(
1425 String::from_utf8_lossy(key.key()),
1426 leader_mysql_election.election_key()
1427 );
1428 assert_eq!(key.lease_id(), i64::default());
1429 assert_eq!(key.revision(), i64::default());
1430 }
1431 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1432 }
1433
1434 elected(&leader_mysql_election, table_name, Some(lease.clone()))
1435 .await
1436 .unwrap();
1437 let lease = get_lease(&leader_mysql_election).await.unwrap();
1438 assert_eq!(lease.leader_value, leader_value);
1439 assert!(lease.expire_time > lease.current);
1440 assert!(leader_mysql_election.is_leader());
1441
1442 match rx.recv().await {
1443 Ok(LeaderChangeMessage::Elected(key)) => {
1444 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1445 assert_eq!(
1446 String::from_utf8_lossy(key.key()),
1447 leader_mysql_election.election_key()
1448 );
1449 assert_eq!(key.lease_id(), i64::default());
1450 assert_eq!(key.revision(), i64::default());
1451 }
1452 _ => panic!("Expected LeaderChangeMessage::Elected"),
1453 }
1454
1455 drop_table(&leader_mysql_election.client, table_name).await;
1456 }
1457
1458 #[tokio::test]
1459 async fn test_campaign() {
1460 maybe_skip_mysql_integration_test!();
1461 let leader_value = "test_leader".to_string();
1462 let uuid = uuid::Uuid::new_v4().to_string();
1463 let table_name = "test_leader_action_greptime_metakv";
1464 let candidate_lease_ttl = Duration::from_secs(5);
1465 let meta_lease_ttl = Duration::from_secs(2);
1466 let execution_timeout = Duration::from_secs(10);
1467 let idle_session_timeout = Duration::from_secs(0);
1468 let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1469 .await
1470 .unwrap();
1471
1472 let (tx, mut rx) = broadcast::channel(100);
1473 let leader_mysql_election = MySqlElection {
1474 leader_value: leader_value.clone(),
1475 client,
1476 is_leader: AtomicBool::new(false),
1477 leader_infancy: AtomicBool::new(true),
1478 leader_watcher: tx,
1479 store_key_prefix: uuid,
1480 candidate_lease_ttl,
1481 meta_lease_ttl,
1482 sql_set: ElectionSqlFactory::new(table_name).build(),
1483 };
1484
1485 leader_mysql_election.do_campaign().await.unwrap();
1487 let lease = get_lease(&leader_mysql_election).await.unwrap();
1488 assert_eq!(lease.leader_value, leader_value);
1489 assert!(lease.expire_time > lease.current);
1490 assert!(leader_mysql_election.is_leader());
1491
1492 match rx.recv().await {
1493 Ok(LeaderChangeMessage::Elected(key)) => {
1494 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1495 assert_eq!(
1496 String::from_utf8_lossy(key.key()),
1497 leader_mysql_election.election_key()
1498 );
1499 assert_eq!(key.lease_id(), i64::default());
1500 assert_eq!(key.revision(), i64::default());
1501 }
1502 _ => panic!("Expected LeaderChangeMessage::Elected"),
1503 }
1504
1505 leader_mysql_election.do_campaign().await.unwrap();
1507 let new_lease = get_lease(&leader_mysql_election).await.unwrap();
1508 assert_eq!(lease.leader_value, leader_value);
1509 assert!(new_lease.expire_time > lease.expire_time);
1511 assert!(new_lease.expire_time > new_lease.current);
1512 assert!(leader_mysql_election.is_leader());
1513
1514 tokio::time::sleep(meta_lease_ttl + Duration::from_secs(1)).await;
1516 leader_mysql_election.do_campaign().await.unwrap();
1517 let lease = get_lease(&leader_mysql_election).await.unwrap();
1518 assert_eq!(lease.leader_value, leader_value);
1519 assert!(lease.expire_time <= lease.current);
1520 assert!(!leader_mysql_election.is_leader());
1521
1522 match rx.recv().await {
1523 Ok(LeaderChangeMessage::StepDown(key)) => {
1524 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1525 assert_eq!(
1526 String::from_utf8_lossy(key.key()),
1527 leader_mysql_election.election_key()
1528 );
1529 assert_eq!(key.lease_id(), i64::default());
1530 assert_eq!(key.revision(), i64::default());
1531 }
1532 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1533 }
1534
1535 leader_mysql_election.do_campaign().await.unwrap();
1537 let lease = get_lease(&leader_mysql_election).await.unwrap();
1538 assert_eq!(lease.leader_value, leader_value);
1539 assert!(lease.expire_time > lease.current);
1540 assert!(leader_mysql_election.is_leader());
1541
1542 match rx.recv().await {
1543 Ok(LeaderChangeMessage::Elected(key)) => {
1544 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1545 assert_eq!(
1546 String::from_utf8_lossy(key.key()),
1547 leader_mysql_election.election_key()
1548 );
1549 assert_eq!(key.lease_id(), i64::default());
1550 assert_eq!(key.revision(), i64::default());
1551 }
1552 _ => panic!("Expected LeaderChangeMessage::Elected"),
1553 }
1554
1555 {
1557 let client = leader_mysql_election.client.lock().await;
1558 let mut executor = Executor::Default(client);
1559 leader_mysql_election
1560 .delete_value(&leader_mysql_election.election_key(), &mut executor)
1561 .await
1562 .unwrap();
1563 }
1564 leader_mysql_election.do_campaign().await.unwrap();
1565 let res = get_lease(&leader_mysql_election).await;
1566 assert!(res.is_none());
1567 assert!(!leader_mysql_election.is_leader());
1568
1569 match rx.recv().await {
1570 Ok(LeaderChangeMessage::StepDown(key)) => {
1571 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1572 assert_eq!(
1573 String::from_utf8_lossy(key.key()),
1574 leader_mysql_election.election_key()
1575 );
1576 assert_eq!(key.lease_id(), i64::default());
1577 assert_eq!(key.revision(), i64::default());
1578 }
1579 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1580 }
1581
1582 leader_mysql_election.do_campaign().await.unwrap();
1584 let lease = get_lease(&leader_mysql_election).await.unwrap();
1585 assert_eq!(lease.leader_value, leader_value);
1586 assert!(lease.expire_time > lease.current);
1587 assert!(leader_mysql_election.is_leader());
1588
1589 match rx.recv().await {
1590 Ok(LeaderChangeMessage::Elected(key)) => {
1591 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1592 assert_eq!(
1593 String::from_utf8_lossy(key.key()),
1594 leader_mysql_election.election_key()
1595 );
1596 assert_eq!(key.lease_id(), i64::default());
1597 assert_eq!(key.revision(), i64::default());
1598 }
1599 _ => panic!("Expected LeaderChangeMessage::Elected"),
1600 }
1601
1602 let another_leader_key = "another_leader";
1604 {
1605 let client = leader_mysql_election.client.lock().await;
1606 let mut executor = Executor::Default(client);
1607 leader_mysql_election
1608 .delete_value(&leader_mysql_election.election_key(), &mut executor)
1609 .await
1610 .unwrap();
1611 leader_mysql_election
1612 .put_value_with_lease(
1613 &leader_mysql_election.election_key(),
1614 another_leader_key,
1615 10,
1616 &mut executor,
1617 )
1618 .await
1619 .unwrap();
1620 }
1621 leader_mysql_election.do_campaign().await.unwrap();
1622 let lease = get_lease(&leader_mysql_election).await.unwrap();
1623 assert_eq!(lease.leader_value, another_leader_key);
1625 assert!(lease.expire_time > lease.current);
1626 assert!(!leader_mysql_election.is_leader());
1627
1628 match rx.recv().await {
1629 Ok(LeaderChangeMessage::StepDown(key)) => {
1630 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1631 assert_eq!(
1632 String::from_utf8_lossy(key.key()),
1633 leader_mysql_election.election_key()
1634 );
1635 assert_eq!(key.lease_id(), i64::default());
1636 assert_eq!(key.revision(), i64::default());
1637 }
1638 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1639 }
1640
1641 drop_table(&leader_mysql_election.client, table_name).await;
1642 }
1643
1644 #[tokio::test]
1645 async fn test_follower_action() {
1646 maybe_skip_mysql_integration_test!();
1647 common_telemetry::init_default_ut_logging();
1648 let candidate_lease_ttl = Duration::from_secs(5);
1649 let meta_lease_ttl = Duration::from_secs(1);
1650 let execution_timeout = Duration::from_secs(10);
1651 let idle_session_timeout = Duration::from_secs(0);
1652 let uuid = uuid::Uuid::new_v4().to_string();
1653 let table_name = "test_follower_action_greptime_metakv";
1654
1655 let follower_client =
1656 create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1657 .await
1658 .unwrap();
1659 let (tx, mut rx) = broadcast::channel(100);
1660 let follower_mysql_election = MySqlElection {
1661 leader_value: "test_follower".to_string(),
1662 client: follower_client,
1663 is_leader: AtomicBool::new(false),
1664 leader_infancy: AtomicBool::new(true),
1665 leader_watcher: tx,
1666 store_key_prefix: uuid.clone(),
1667 candidate_lease_ttl,
1668 meta_lease_ttl,
1669 sql_set: ElectionSqlFactory::new(table_name).build(),
1670 };
1671
1672 let leader_client =
1673 create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1674 .await
1675 .unwrap();
1676 let (tx, _) = broadcast::channel(100);
1677 let leader_mysql_election = MySqlElection {
1678 leader_value: "test_leader".to_string(),
1679 client: leader_client,
1680 is_leader: AtomicBool::new(false),
1681 leader_infancy: AtomicBool::new(true),
1682 leader_watcher: tx,
1683 store_key_prefix: uuid,
1684 candidate_lease_ttl,
1685 meta_lease_ttl,
1686 sql_set: ElectionSqlFactory::new(table_name).build(),
1687 };
1688
1689 leader_mysql_election.do_campaign().await.unwrap();
1690
1691 follower_mysql_election.do_campaign().await.unwrap();
1693
1694 tokio::time::sleep(meta_lease_ttl + Duration::from_secs(1)).await;
1696 follower_mysql_election.do_campaign().await.unwrap();
1697 assert!(follower_mysql_election.is_leader());
1698
1699 match rx.recv().await {
1700 Ok(LeaderChangeMessage::Elected(key)) => {
1701 assert_eq!(String::from_utf8_lossy(key.name()), "test_follower");
1702 assert_eq!(
1703 String::from_utf8_lossy(key.key()),
1704 follower_mysql_election.election_key()
1705 );
1706 assert_eq!(key.lease_id(), i64::default());
1707 assert_eq!(key.revision(), i64::default());
1708 }
1709 _ => panic!("Expected LeaderChangeMessage::Elected"),
1710 }
1711
1712 drop_table(&follower_mysql_election.client, table_name).await;
1713 }
1714
1715 #[tokio::test]
1716 async fn test_wait_timeout() {
1717 maybe_skip_mysql_integration_test!();
1718 common_telemetry::init_default_ut_logging();
1719 let execution_timeout = Duration::from_secs(10);
1720 let idle_session_timeout = Duration::from_secs(1);
1721
1722 let client = create_mysql_client(None, execution_timeout, idle_session_timeout)
1723 .await
1724 .unwrap();
1725 tokio::time::sleep(Duration::from_millis(1100)).await;
1726 let err = client
1728 .lock()
1729 .await
1730 .query(sqlx::query("SELECT 1"), "SELECT 1")
1731 .await
1732 .unwrap_err();
1733 assert_matches!(err, error::Error::MySqlExecution { .. });
1734 client.lock().await.reset_client().await.unwrap();
1736 let _ = client
1737 .lock()
1738 .await
1739 .query(sqlx::query("SELECT 1"), "SELECT 1")
1740 .await
1741 .unwrap();
1742 }
1743}