1use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::Arc;
17use std::time::Duration;
18
19use common_telemetry::{error, warn};
20use common_time::Timestamp;
21use snafu::{ensure, OptionExt, ResultExt};
22use sqlx::mysql::{MySqlArguments, MySqlRow};
23use sqlx::pool::PoolConnection;
24use sqlx::query::Query;
25use sqlx::{MySql, MySqlPool, MySqlTransaction, Row};
26use tokio::sync::{broadcast, Mutex, MutexGuard};
27use tokio::time::MissedTickBehavior;
28
29use crate::election::rds::{parse_value_and_expire_time, Lease, RdsLeaderKey, LEASE_SEP};
30use crate::election::{
31 listen_leader_change, send_leader_change_and_set_flags, Election, LeaderChangeMessage,
32 CANDIDATES_ROOT, ELECTION_KEY,
33};
34use crate::error::{
35 AcquireMySqlClientSnafu, DecodeSqlValueSnafu, DeserializeFromJsonSnafu,
36 LeaderLeaseChangedSnafu, LeaderLeaseExpiredSnafu, MySqlExecutionSnafu, NoLeaderSnafu, Result,
37 SerializeToJsonSnafu, SqlExecutionTimeoutSnafu, UnexpectedSnafu,
38};
39use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};
40
41struct ElectionSqlFactory<'a> {
42 table_name: &'a str,
43}
44
45struct ElectionSqlSet {
46 campaign: String,
47 put_value_with_lease: String,
57 update_value_with_lease: String,
65 get_value_with_lease: String,
70 get_value_with_lease_by_prefix: String,
79 delete_value: String,
87}
88
89impl<'a> ElectionSqlFactory<'a> {
90 fn new(table_name: &'a str) -> Self {
91 Self { table_name }
92 }
93
94 fn build(self) -> ElectionSqlSet {
95 ElectionSqlSet {
96 campaign: self.campaign_sql(),
97 put_value_with_lease: self.put_value_with_lease_sql(),
98 update_value_with_lease: self.update_value_with_lease_sql(),
99 get_value_with_lease: self.get_value_with_lease_sql(),
100 get_value_with_lease_by_prefix: self.get_value_with_lease_by_prefix_sql(),
101 delete_value: self.delete_value_sql(),
102 }
103 }
104
105 fn campaign_sql(&self) -> String {
108 format!("SELECT * FROM `{}` FOR UPDATE;", self.table_name)
109 }
110
111 fn put_value_with_lease_sql(&self) -> String {
112 format!(
113 r#"
114 INSERT INTO `{}` (k, v) VALUES (
115 ?,
116 CONCAT(
117 ?,
118 '{}',
119 DATE_FORMAT(DATE_ADD(NOW(4), INTERVAL ? SECOND), '%Y-%m-%d %T.%f')
120 )
121 )
122 ON DUPLICATE KEY UPDATE v = VALUES(v);
123 "#,
124 self.table_name, LEASE_SEP
125 )
126 }
127
128 fn update_value_with_lease_sql(&self) -> String {
129 format!(
130 r#"UPDATE `{}`
131 SET v = CONCAT(?, '{}', DATE_FORMAT(DATE_ADD(NOW(4), INTERVAL ? SECOND), '%Y-%m-%d %T.%f'))
132 WHERE k = ? AND v = ?"#,
133 self.table_name, LEASE_SEP
134 )
135 }
136
137 fn get_value_with_lease_sql(&self) -> String {
138 format!(
139 r#"SELECT v, DATE_FORMAT(NOW(4), '%Y-%m-%d %T.%f') FROM `{}` WHERE k = ?"#,
140 self.table_name
141 )
142 }
143
144 fn get_value_with_lease_by_prefix_sql(&self) -> String {
145 format!(
146 r#"SELECT v, DATE_FORMAT(NOW(4), '%Y-%m-%d %T.%f') FROM `{}` WHERE k LIKE ?"#,
147 self.table_name
148 )
149 }
150
151 fn delete_value_sql(&self) -> String {
152 format!("DELETE FROM {} WHERE k = ?;", self.table_name)
153 }
154}
155
156enum Executor<'a> {
157 Default(MutexGuard<'a, ElectionMysqlClient>),
158 Txn(TransactionWithExecutionTimeout<'a>),
159}
160
161impl Executor<'_> {
162 async fn query(
163 &mut self,
164 query: Query<'_, MySql, MySqlArguments>,
165 sql: &str,
166 ) -> Result<Vec<MySqlRow>> {
167 match self {
168 Executor::Default(client) => client.query(query, sql).await,
169 Executor::Txn(txn) => txn.query(query, sql).await,
170 }
171 }
172
173 async fn execute(&mut self, query: Query<'_, MySql, MySqlArguments>, sql: &str) -> Result<u64> {
174 match self {
175 Executor::Default(client) => client.execute(query, sql).await,
176 Executor::Txn(txn) => txn.execute(query, sql).await,
177 }
178 }
179
180 async fn commit(self) -> Result<()> {
181 match self {
182 Executor::Txn(txn) => txn.commit().await,
183 _ => Ok(()),
184 }
185 }
186}
187
188pub struct ElectionMysqlClient {
190 current: Option<PoolConnection<MySql>>,
191 pool: MySqlPool,
192
193 execution_timeout: Duration,
198
199 max_execution_time: Duration,
204
205 innode_lock_wait_timeout: Duration,
210
211 wait_timeout: Duration,
217
218 table_for_election: String,
220}
221
222impl ElectionMysqlClient {
223 pub fn new(
224 pool: MySqlPool,
225 execution_timeout: Duration,
226 max_execution_time: Duration,
227 innode_lock_wait_timeout: Duration,
228 wait_timeout: Duration,
229 table_for_election: &str,
230 ) -> Self {
231 Self {
232 current: None,
233 pool,
234 execution_timeout,
235 max_execution_time,
236 innode_lock_wait_timeout,
237 wait_timeout,
238 table_for_election: table_for_election.to_string(),
239 }
240 }
241
242 fn create_table_sql(&self) -> String {
243 format!(
244 r#"
245 CREATE TABLE IF NOT EXISTS `{}` (
246 k VARBINARY(3072) PRIMARY KEY,
247 v BLOB
248 );"#,
249 self.table_for_election
250 )
251 }
252
253 fn insert_once_sql(&self) -> String {
254 format!(
255 "INSERT IGNORE INTO `{}` (k, v) VALUES ('__place_holder_for_lock', '');",
256 self.table_for_election
257 )
258 }
259
260 fn check_version_sql(&self) -> String {
261 "SELECT @@version;".to_string()
262 }
263
264 async fn reset_client(&mut self) -> Result<()> {
265 self.current = None;
266 self.maybe_init_client().await
267 }
268
269 async fn ensure_table_exists(&mut self) -> Result<()> {
270 let create_table_sql = self.create_table_sql();
271 let query = sqlx::query(&create_table_sql);
272 self.execute(query, &create_table_sql).await?;
273 let insert_once_sql = self.insert_once_sql();
275 let query = sqlx::query(&insert_once_sql);
276 self.execute(query, &insert_once_sql).await?;
277 Ok(())
278 }
279
280 async fn maybe_init_client(&mut self) -> Result<()> {
281 if self.current.is_none() {
282 let client = self.pool.acquire().await.context(AcquireMySqlClientSnafu)?;
283
284 self.current = Some(client);
285 let (query, sql) = if !self.wait_timeout.is_zero() {
286 let sql = "SET SESSION wait_timeout = ?, innodb_lock_wait_timeout = ?, max_execution_time = ?;";
287 (
288 sqlx::query(sql)
289 .bind(self.wait_timeout.as_secs())
290 .bind(self.innode_lock_wait_timeout.as_secs())
291 .bind(self.max_execution_time.as_millis() as u64),
292 sql,
293 )
294 } else {
295 let sql = "SET SESSION innodb_lock_wait_timeout = ?, max_execution_time = ?;";
296 (
297 sqlx::query(sql)
298 .bind(self.innode_lock_wait_timeout.as_secs())
299 .bind(self.max_execution_time.as_millis() as u64),
300 sql,
301 )
302 };
303 self.set_session_isolation_level().await?;
304 self.execute(query, sql).await?;
305 self.check_version(&self.check_version_sql()).await?;
306 }
307 Ok(())
308 }
309
310 async fn set_session_isolation_level(&mut self) -> Result<()> {
315 let sql = "SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE";
316 let query = sqlx::query(sql);
317 self.execute(query, sql).await?;
318 Ok(())
319 }
320
321 async fn check_version(&mut self, sql: &str) -> Result<()> {
326 let query = sqlx::query(sql);
328 let client = self.current.as_mut().unwrap();
330 let row = tokio::time::timeout(self.execution_timeout, query.fetch_one(&mut **client))
331 .await
332 .map_err(|_| {
333 SqlExecutionTimeoutSnafu {
334 sql,
335 duration: self.execution_timeout,
336 }
337 .build()
338 })?;
339 match row {
340 Ok(row) => {
341 let version: String = row.try_get(0).context(DecodeSqlValueSnafu {})?;
342 if !version.starts_with("8.0") && !version.starts_with("5.7") {
343 warn!(
344 "Unsupported MySQL version: {}, expected: [5.7, 8.0]",
345 version
346 );
347 }
348 }
349 Err(e) => {
350 warn!(e; "Failed to check MySQL version through sql: {}", sql);
351 }
352 }
353 Ok(())
354 }
355
356 async fn execute(&mut self, query: Query<'_, MySql, MySqlArguments>, sql: &str) -> Result<u64> {
361 let client = self.current.as_mut().unwrap();
363 let future = query.execute(&mut **client);
364 let rows_affected = tokio::time::timeout(self.execution_timeout, future)
365 .await
366 .map_err(|_| {
367 SqlExecutionTimeoutSnafu {
368 sql,
369 duration: self.execution_timeout,
370 }
371 .build()
372 })?
373 .context(MySqlExecutionSnafu { sql })?
374 .rows_affected();
375 Ok(rows_affected)
376 }
377
378 async fn query(
383 &mut self,
384 query: Query<'_, MySql, MySqlArguments>,
385 sql: &str,
386 ) -> Result<Vec<MySqlRow>> {
387 let client = self.current.as_mut().unwrap();
389 let future = query.fetch_all(&mut **client);
390 tokio::time::timeout(self.execution_timeout, future)
391 .await
392 .map_err(|_| {
393 SqlExecutionTimeoutSnafu {
394 sql,
395 duration: self.execution_timeout,
396 }
397 .build()
398 })?
399 .context(MySqlExecutionSnafu { sql })
400 }
401
402 async fn transaction(&mut self) -> Result<TransactionWithExecutionTimeout<'_>> {
403 use sqlx::Acquire;
404 let client = self.current.as_mut().unwrap();
405 let transaction = client
406 .begin()
407 .await
408 .context(MySqlExecutionSnafu { sql: "BEGIN" })?;
409
410 Ok(TransactionWithExecutionTimeout {
411 transaction,
412 execution_timeout: self.execution_timeout,
413 })
414 }
415}
416
417struct TransactionWithExecutionTimeout<'a> {
418 transaction: MySqlTransaction<'a>,
419 execution_timeout: Duration,
420}
421
422impl TransactionWithExecutionTimeout<'_> {
423 async fn query(
424 &mut self,
425 query: Query<'_, MySql, MySqlArguments>,
426 sql: &str,
427 ) -> Result<Vec<MySqlRow>> {
428 let res = tokio::time::timeout(
429 self.execution_timeout,
430 query.fetch_all(&mut *self.transaction),
431 )
432 .await
433 .map_err(|_| {
434 SqlExecutionTimeoutSnafu {
435 sql,
436 duration: self.execution_timeout,
437 }
438 .build()
439 })?
440 .context(MySqlExecutionSnafu { sql })?;
441 Ok(res)
442 }
443
444 async fn execute(&mut self, query: Query<'_, MySql, MySqlArguments>, sql: &str) -> Result<u64> {
445 let res = tokio::time::timeout(
446 self.execution_timeout,
447 query.execute(&mut *self.transaction),
448 )
449 .await
450 .map_err(|_| {
451 SqlExecutionTimeoutSnafu {
452 sql,
453 duration: self.execution_timeout,
454 }
455 .build()
456 })?
457 .context(MySqlExecutionSnafu { sql })?;
458 Ok(res.rows_affected())
459 }
460
461 async fn commit(self) -> Result<()> {
462 tokio::time::timeout(self.execution_timeout, self.transaction.commit())
463 .await
464 .map_err(|_| {
465 SqlExecutionTimeoutSnafu {
466 sql: "COMMIT",
467 duration: self.execution_timeout,
468 }
469 .build()
470 })?
471 .context(MySqlExecutionSnafu { sql: "COMMIT" })?;
472 Ok(())
473 }
474}
475
476pub struct MySqlElection {
478 leader_value: String,
479 client: Mutex<ElectionMysqlClient>,
480 is_leader: AtomicBool,
481 leader_infancy: AtomicBool,
482 leader_watcher: broadcast::Sender<LeaderChangeMessage>,
483 store_key_prefix: String,
484 candidate_lease_ttl: Duration,
485 meta_lease_ttl: Duration,
486 sql_set: ElectionSqlSet,
487}
488
489impl MySqlElection {
490 pub async fn with_mysql_client(
491 leader_value: String,
492 mut client: ElectionMysqlClient,
493 store_key_prefix: String,
494 candidate_lease_ttl: Duration,
495 meta_lease_ttl: Duration,
496 table_name: &str,
497 ) -> Result<ElectionRef> {
498 let sql_factory = ElectionSqlFactory::new(table_name);
499 client.maybe_init_client().await?;
500 client.ensure_table_exists().await?;
501 let tx = listen_leader_change(leader_value.clone());
502 Ok(Arc::new(Self {
503 leader_value,
504 client: Mutex::new(client),
505 is_leader: AtomicBool::new(false),
506 leader_infancy: AtomicBool::new(false),
507 leader_watcher: tx,
508 store_key_prefix,
509 candidate_lease_ttl,
510 meta_lease_ttl,
511 sql_set: sql_factory.build(),
512 }))
513 }
514
515 fn election_key(&self) -> String {
516 format!("{}{}", self.store_key_prefix, ELECTION_KEY)
517 }
518
519 fn candidate_root(&self) -> String {
520 format!("{}{}", self.store_key_prefix, CANDIDATES_ROOT)
521 }
522
523 fn candidate_key(&self) -> String {
524 format!("{}{}", self.candidate_root(), self.leader_value)
525 }
526
527 async fn maybe_init_client(&self) -> Result<()> {
528 let mut client = self.client.lock().await;
529 client.maybe_init_client().await?;
530 Ok(())
531 }
532}
533
534#[async_trait::async_trait]
535impl Election for MySqlElection {
536 type Leader = LeaderValue;
537
538 fn is_leader(&self) -> bool {
539 self.is_leader.load(Ordering::Relaxed)
540 }
541
542 fn in_leader_infancy(&self) -> bool {
543 self.leader_infancy
544 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
545 .is_ok()
546 }
547
548 async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> {
549 self.maybe_init_client().await?;
550 let key = self.candidate_key();
551 let node_info =
552 serde_json::to_string(node_info).with_context(|_| SerializeToJsonSnafu {
553 input: format!("{node_info:?}"),
554 })?;
555
556 {
557 let client = self.client.lock().await;
558 let mut executor = Executor::Default(client);
559 let res = self
560 .put_value_with_lease(
561 &key,
562 &node_info,
563 self.candidate_lease_ttl.as_secs(),
564 &mut executor,
565 )
566 .await?;
567 if !res {
569 warn!("Candidate already registered, update the lease");
570 self.delete_value(&key, &mut executor).await?;
571 self.put_value_with_lease(
572 &key,
573 &node_info,
574 self.candidate_lease_ttl.as_secs(),
575 &mut executor,
576 )
577 .await?;
578 }
579 }
580
581 let mut keep_alive_interval = tokio::time::interval(self.candidate_lease_ttl / 2);
583 loop {
584 let _ = keep_alive_interval.tick().await;
585 let client = self.client.lock().await;
586 let mut executor = Executor::Default(client);
587 let lease = self
588 .get_value_with_lease(&key, &mut executor)
589 .await?
590 .unwrap_or_default();
591
592 ensure!(
593 lease.expire_time > lease.current,
594 UnexpectedSnafu {
595 violated: format!(
596 "Candidate lease expired at {:?} (current time: {:?}), key: {:?}",
597 lease.expire_time,
598 lease.current,
599 String::from_utf8_lossy(&key.into_bytes())
600 ),
601 }
602 );
603
604 self.update_value_with_lease(
605 &key,
606 &lease.origin,
607 &node_info,
608 self.candidate_lease_ttl.as_secs(),
609 &mut executor,
610 )
611 .await?;
612 std::mem::drop(executor);
613 }
614 }
615
616 async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>> {
617 self.maybe_init_client().await?;
618 let key_prefix = self.candidate_root();
619 let client = self.client.lock().await;
620 let mut executor = Executor::Default(client);
621 let (mut candidates, current) = self
622 .get_value_with_lease_by_prefix(&key_prefix, &mut executor)
623 .await?;
624 candidates.retain(|c| c.1 > current);
626 let mut valid_candidates = Vec::with_capacity(candidates.len());
627 for (c, _) in candidates {
628 let node_info: MetasrvNodeInfo =
629 serde_json::from_str(&c).with_context(|_| DeserializeFromJsonSnafu {
630 input: format!("{:?}", c),
631 })?;
632 valid_candidates.push(node_info);
633 }
634 Ok(valid_candidates)
635 }
636
637 async fn campaign(&self) -> Result<()> {
638 self.maybe_init_client().await?;
639 let mut keep_alive_interval = tokio::time::interval(self.meta_lease_ttl / 2);
640 keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
641 loop {
642 self.do_campaign().await?;
643 keep_alive_interval.tick().await;
644 }
645 }
646
647 async fn reset_campaign(&self) {
648 if let Err(err) = self.client.lock().await.reset_client().await {
649 error!(err; "Failed to reset client");
650 }
651 }
652
653 async fn leader(&self) -> Result<Self::Leader> {
654 self.maybe_init_client().await?;
655 if self.is_leader.load(Ordering::Relaxed) {
656 Ok(self.leader_value.as_bytes().into())
657 } else {
658 let key = self.election_key();
659
660 let client = self.client.lock().await;
661 let mut executor = Executor::Default(client);
662 if let Some(lease) = self.get_value_with_lease(&key, &mut executor).await? {
663 ensure!(lease.expire_time > lease.current, NoLeaderSnafu);
664 Ok(lease.leader_value.as_bytes().into())
665 } else {
666 NoLeaderSnafu.fail()
667 }
668 }
669 }
670
671 async fn resign(&self) -> Result<()> {
672 todo!()
673 }
674
675 fn subscribe_leader_change(&self) -> broadcast::Receiver<LeaderChangeMessage> {
676 self.leader_watcher.subscribe()
677 }
678}
679
680impl MySqlElection {
681 async fn get_value_with_lease(
683 &self,
684 key: &str,
685 executor: &mut Executor<'_>,
686 ) -> Result<Option<Lease>> {
687 let key = key.as_bytes();
688 let query = sqlx::query(&self.sql_set.get_value_with_lease).bind(key);
689 let res = executor
690 .query(query, &self.sql_set.get_value_with_lease)
691 .await?;
692
693 if res.is_empty() {
694 return Ok(None);
695 }
696 let current_time_str = String::from_utf8_lossy(res[0].try_get(1).unwrap());
698 let current_time = match Timestamp::from_str(¤t_time_str, None) {
699 Ok(ts) => ts,
700 Err(_) => UnexpectedSnafu {
701 violated: format!("Invalid timestamp: {}", current_time_str),
702 }
703 .fail()?,
704 };
705 let value_and_expire_time = String::from_utf8_lossy(res[0].try_get(0).unwrap_or_default());
707 let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
708
709 Ok(Some(Lease {
710 leader_value: value,
711 expire_time,
712 current: current_time,
713 origin: value_and_expire_time.to_string(),
714 }))
715 }
716
717 async fn get_value_with_lease_by_prefix(
719 &self,
720 key_prefix: &str,
721 executor: &mut Executor<'_>,
722 ) -> Result<(Vec<(String, Timestamp)>, Timestamp)> {
723 let key_prefix = format!("{}%", key_prefix).as_bytes().to_vec();
724 let query = sqlx::query(&self.sql_set.get_value_with_lease_by_prefix).bind(key_prefix);
725 let res = executor
726 .query(query, &self.sql_set.get_value_with_lease_by_prefix)
727 .await?;
728
729 let mut values_with_leases = vec![];
730 let mut current = Timestamp::default();
731 for row in res {
732 let current_time_str = row.try_get(1).unwrap_or_default();
733 current = match Timestamp::from_str(current_time_str, None) {
734 Ok(ts) => ts,
735 Err(_) => UnexpectedSnafu {
736 violated: format!("Invalid timestamp: {}", current_time_str),
737 }
738 .fail()?,
739 };
740
741 let value_and_expire_time = String::from_utf8_lossy(row.try_get(0).unwrap_or_default());
742 let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
743
744 values_with_leases.push((value, expire_time));
745 }
746 Ok((values_with_leases, current))
747 }
748
749 async fn update_value_with_lease(
750 &self,
751 key: &str,
752 prev: &str,
753 updated: &str,
754 lease_ttl: u64,
755 executor: &mut Executor<'_>,
756 ) -> Result<()> {
757 let key = key.as_bytes();
758 let prev = prev.as_bytes();
759 let updated = updated.as_bytes();
760
761 let query = sqlx::query(&self.sql_set.update_value_with_lease)
762 .bind(updated)
763 .bind(lease_ttl as f64)
764 .bind(key)
765 .bind(prev);
766 let res = executor
767 .execute(query, &self.sql_set.update_value_with_lease)
768 .await?;
769
770 ensure!(
771 res == 1,
772 UnexpectedSnafu {
773 violated: format!("Failed to update key: {}", String::from_utf8_lossy(key)),
774 }
775 );
776
777 Ok(())
778 }
779
780 async fn put_value_with_lease(
782 &self,
783 key: &str,
784 value: &str,
785 lease_ttl_secs: u64,
786 executor: &mut Executor<'_>,
787 ) -> Result<bool> {
788 let key = key.as_bytes();
789 let lease_ttl_secs = lease_ttl_secs as f64;
790 let query = sqlx::query(&self.sql_set.put_value_with_lease)
791 .bind(key)
792 .bind(value)
793 .bind(lease_ttl_secs);
794 let res = executor
795 .execute(query, &self.sql_set.put_value_with_lease)
796 .await?;
797 Ok(res == 1)
798 }
799
800 async fn delete_value(&self, key: &str, executor: &mut Executor<'_>) -> Result<bool> {
803 let key = key.as_bytes();
804 let query = sqlx::query(&self.sql_set.delete_value).bind(key);
805 let res = executor.execute(query, &self.sql_set.delete_value).await?;
806
807 Ok(res == 1)
808 }
809
810 async fn do_campaign(&self) -> Result<()> {
813 let lease = {
814 let client = self.client.lock().await;
815 let mut executor = Executor::Default(client);
816 self.get_value_with_lease(&self.election_key(), &mut executor)
817 .await?
818 };
819
820 let is_leader = self.is_leader();
821 let is_current_leader = lease
824 .as_ref()
825 .map(|lease| lease.leader_value == self.leader_value)
826 .unwrap_or(false);
827 match (self.lease_check(&lease), is_leader, is_current_leader) {
828 (Ok(_), true, true) => {
830 let mut client = self.client.lock().await;
831 let txn = client.transaction().await?;
832 let mut executor = Executor::Txn(txn);
833 let query = sqlx::query(&self.sql_set.campaign);
834 executor.query(query, &self.sql_set.campaign).await?;
835 self.renew_lease(executor, lease.unwrap()).await?;
837 }
838 (Err(err), true, _) => {
841 warn!(err; "Leader lease expired, step down...");
842 self.step_down_without_lock().await?;
843 }
844 (Ok(_), true, false) => {
845 warn!("Leader lease expired, step down...");
846 self.step_down_without_lock().await?;
847 }
848 (Err(err), false, _) => {
850 warn!(err; "Leader lease expired, elect myself.");
851 let mut client = self.client.lock().await;
852 let txn = client.transaction().await?;
853 let mut executor = Executor::Txn(txn);
854 let query = sqlx::query(&self.sql_set.campaign);
855 executor.query(query, &self.sql_set.campaign).await?;
856 self.elected(executor, lease).await?;
857 }
858 (Ok(_), false, true) => {
861 warn!("I should be the leader, but I don't think so. Something went wrong.");
862 let mut client = self.client.lock().await;
863 let txn = client.transaction().await?;
864 let mut executor = Executor::Txn(txn);
865 let query = sqlx::query(&self.sql_set.campaign);
866 executor.query(query, &self.sql_set.campaign).await?;
867 self.renew_lease(executor, lease.unwrap()).await?;
869 }
870 (Ok(_), false, false) => {}
872 }
873 Ok(())
874 }
875
876 async fn renew_lease(&self, mut executor: Executor<'_>, lease: Lease) -> Result<()> {
878 let key = self.election_key();
879 self.update_value_with_lease(
880 &key,
881 &lease.origin,
882 &self.leader_value,
883 self.meta_lease_ttl.as_secs(),
884 &mut executor,
885 )
886 .await?;
887 executor.commit().await?;
888
889 if !self.is_leader() {
890 let key = self.election_key();
891 let leader_key = RdsLeaderKey {
892 name: self.leader_value.clone().into_bytes(),
893 key: key.clone().into_bytes(),
894 ..Default::default()
895 };
896 send_leader_change_and_set_flags(
897 &self.is_leader,
898 &self.leader_infancy,
899 &self.leader_watcher,
900 LeaderChangeMessage::Elected(Arc::new(leader_key)),
901 );
902 }
903
904 Ok(())
905 }
906
907 fn lease_check(&self, lease: &Option<Lease>) -> Result<Lease> {
916 let lease = lease.as_ref().context(NoLeaderSnafu)?;
917 ensure!(lease.expire_time > lease.current, LeaderLeaseExpiredSnafu);
919 Ok(lease.clone())
921 }
922
923 async fn step_down_without_lock(&self) -> Result<()> {
925 let key = self.election_key().into_bytes();
926 let leader_key = RdsLeaderKey {
927 name: self.leader_value.clone().into_bytes(),
928 key: key.clone(),
929 ..Default::default()
930 };
931 send_leader_change_and_set_flags(
932 &self.is_leader,
933 &self.leader_infancy,
934 &self.leader_watcher,
935 LeaderChangeMessage::StepDown(Arc::new(leader_key)),
936 );
937 Ok(())
938 }
939
940 async fn elected(
943 &self,
944 mut executor: Executor<'_>,
945 expected_lease: Option<Lease>,
946 ) -> Result<()> {
947 let key = self.election_key();
948 let leader_key = RdsLeaderKey {
949 name: self.leader_value.clone().into_bytes(),
950 key: key.clone().into_bytes(),
951 ..Default::default()
952 };
953 let remote_lease = self.get_value_with_lease(&key, &mut executor).await?;
954 ensure!(
955 expected_lease.map(|lease| lease.origin) == remote_lease.map(|lease| lease.origin),
956 LeaderLeaseChangedSnafu
957 );
958 self.delete_value(&key, &mut executor).await?;
959 self.put_value_with_lease(
960 &key,
961 &self.leader_value,
962 self.meta_lease_ttl.as_secs(),
963 &mut executor,
964 )
965 .await?;
966 executor.commit().await?;
967
968 send_leader_change_and_set_flags(
969 &self.is_leader,
970 &self.leader_infancy,
971 &self.leader_watcher,
972 LeaderChangeMessage::Elected(Arc::new(leader_key)),
973 );
974 Ok(())
975 }
976}
977
978#[cfg(test)]
979mod tests {
980 use std::assert_matches::assert_matches;
981 use std::env;
982
983 use common_meta::maybe_skip_mysql_integration_test;
984 use common_telemetry::init_default_ut_logging;
985
986 use super::*;
987 use crate::bootstrap::create_mysql_pool;
988 use crate::error;
989
990 async fn create_mysql_client(
991 table_name: Option<&str>,
992 execution_timeout: Duration,
993 wait_timeout: Duration,
994 ) -> Result<Mutex<ElectionMysqlClient>> {
995 init_default_ut_logging();
996 let endpoint = env::var("GT_MYSQL_ENDPOINTS").unwrap_or_default();
997 if endpoint.is_empty() {
998 return UnexpectedSnafu {
999 violated: "MySQL endpoint is empty".to_string(),
1000 }
1001 .fail();
1002 }
1003 let pool = create_mysql_pool(&[endpoint]).await.unwrap();
1004 let mut client = ElectionMysqlClient::new(
1005 pool,
1006 execution_timeout,
1007 execution_timeout,
1008 Duration::from_secs(1),
1009 wait_timeout,
1010 table_name.unwrap_or("default_greptime_metakv_election"),
1011 );
1012 client.maybe_init_client().await?;
1013 if table_name.is_some() {
1014 client.ensure_table_exists().await?;
1015 }
1016 Ok(Mutex::new(client))
1017 }
1018
1019 async fn drop_table(client: &Mutex<ElectionMysqlClient>, table_name: &str) {
1020 let mut client = client.lock().await;
1021 let sql = format!("DROP TABLE IF EXISTS {};", table_name);
1022 client.execute(sqlx::query(&sql), &sql).await.unwrap();
1023 }
1024
1025 #[tokio::test]
1026 async fn test_mysql_crud() {
1027 maybe_skip_mysql_integration_test!();
1028 let key = "test_key".to_string();
1029 let value = "test_value".to_string();
1030
1031 let uuid = uuid::Uuid::new_v4().to_string();
1032 let table_name = "test_mysql_crud_greptime_metakv";
1033 let candidate_lease_ttl = Duration::from_secs(10);
1034 let meta_lease_ttl = Duration::from_secs(2);
1035
1036 let execution_timeout = Duration::from_secs(10);
1037 let idle_session_timeout = Duration::from_secs(0);
1038 let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1039 .await
1040 .unwrap();
1041
1042 {
1043 let mut a = client.lock().await;
1044 let txn = a.transaction().await.unwrap();
1045 let mut executor = Executor::Txn(txn);
1046 let raw_query = format!("SELECT * FROM {} FOR UPDATE;", table_name);
1047 let query = sqlx::query(&raw_query);
1048 let _ = executor.query(query, &raw_query).await.unwrap();
1049 }
1050
1051 let (tx, _) = broadcast::channel(100);
1052 let mysql_election = MySqlElection {
1053 leader_value: "test_leader".to_string(),
1054 client,
1055 is_leader: AtomicBool::new(false),
1056 leader_infancy: AtomicBool::new(true),
1057 leader_watcher: tx,
1058 store_key_prefix: uuid,
1059 candidate_lease_ttl,
1060 meta_lease_ttl,
1061 sql_set: ElectionSqlFactory::new(table_name).build(),
1062 };
1063 let client = mysql_election.client.lock().await;
1064 let mut executor = Executor::Default(client);
1065 let res = mysql_election
1066 .put_value_with_lease(&key, &value, 10, &mut executor)
1067 .await
1068 .unwrap();
1069 assert!(res);
1070
1071 let lease = mysql_election
1072 .get_value_with_lease(&key, &mut executor)
1073 .await
1074 .unwrap()
1075 .unwrap();
1076 assert_eq!(lease.leader_value, value);
1077
1078 mysql_election
1079 .update_value_with_lease(&key, &lease.origin, &value, 10, &mut executor)
1080 .await
1081 .unwrap();
1082
1083 let res = mysql_election
1084 .delete_value(&key, &mut executor)
1085 .await
1086 .unwrap();
1087 assert!(res);
1088
1089 let res = mysql_election
1090 .get_value_with_lease(&key, &mut executor)
1091 .await
1092 .unwrap();
1093 assert!(res.is_none());
1094
1095 for i in 0..10 {
1096 let key = format!("test_key_{}", i);
1097 let value = format!("test_value_{}", i);
1098 mysql_election
1099 .put_value_with_lease(&key, &value, 10, &mut executor)
1100 .await
1101 .unwrap();
1102 }
1103
1104 let key_prefix = "test_key".to_string();
1105 let (res, _) = mysql_election
1106 .get_value_with_lease_by_prefix(&key_prefix, &mut executor)
1107 .await
1108 .unwrap();
1109 assert_eq!(res.len(), 10);
1110
1111 for i in 0..10 {
1112 let key = format!("test_key_{}", i);
1113 let res = mysql_election
1114 .delete_value(&key, &mut executor)
1115 .await
1116 .unwrap();
1117 assert!(res);
1118 }
1119
1120 let (res, current) = mysql_election
1121 .get_value_with_lease_by_prefix(&key_prefix, &mut executor)
1122 .await
1123 .unwrap();
1124 assert!(res.is_empty());
1125 assert!(current == Timestamp::default());
1126
1127 std::mem::drop(executor);
1129 drop_table(&mysql_election.client, table_name).await;
1130 }
1131
1132 async fn candidate(
1133 leader_value: String,
1134 candidate_lease_ttl: Duration,
1135 store_key_prefix: String,
1136 table_name: String,
1137 ) {
1138 let meta_lease_ttl = Duration::from_secs(2);
1139 let execution_timeout = Duration::from_secs(10);
1140 let idle_session_timeout = Duration::from_secs(0);
1141 let client =
1142 create_mysql_client(Some(&table_name), execution_timeout, idle_session_timeout)
1143 .await
1144 .unwrap();
1145
1146 let (tx, _) = broadcast::channel(100);
1147 let mysql_election = MySqlElection {
1148 leader_value,
1149 client,
1150 is_leader: AtomicBool::new(false),
1151 leader_infancy: AtomicBool::new(true),
1152 leader_watcher: tx,
1153 store_key_prefix,
1154 candidate_lease_ttl,
1155 meta_lease_ttl,
1156 sql_set: ElectionSqlFactory::new(&table_name).build(),
1157 };
1158
1159 let node_info = MetasrvNodeInfo {
1160 addr: "test_addr".to_string(),
1161 version: "test_version".to_string(),
1162 git_commit: "test_git_commit".to_string(),
1163 start_time_ms: 0,
1164 };
1165 mysql_election.register_candidate(&node_info).await.unwrap();
1166 }
1167
1168 #[tokio::test]
1169 async fn test_candidate_registration() {
1170 maybe_skip_mysql_integration_test!();
1171 let leader_value_prefix = "test_leader".to_string();
1172 let candidate_lease_ttl = Duration::from_secs(2);
1173 let execution_timeout = Duration::from_secs(10);
1174 let meta_lease_ttl = Duration::from_secs(2);
1175 let idle_session_timeout = Duration::from_secs(0);
1176 let uuid = uuid::Uuid::new_v4().to_string();
1177 let table_name = "test_candidate_registration_greptime_metakv";
1178 let mut handles = vec![];
1179 let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1180 .await
1181 .unwrap();
1182
1183 for i in 0..10 {
1184 let leader_value = format!("{}{}", leader_value_prefix, i);
1185 let handle = tokio::spawn(candidate(
1186 leader_value,
1187 candidate_lease_ttl,
1188 uuid.clone(),
1189 table_name.to_string(),
1190 ));
1191 handles.push(handle);
1192 }
1193 tokio::time::sleep(candidate_lease_ttl / 2 + Duration::from_secs(1)).await;
1195
1196 let (tx, _) = broadcast::channel(100);
1197 let leader_value = "test_leader".to_string();
1198 let mysql_election = MySqlElection {
1199 leader_value,
1200 client,
1201 is_leader: AtomicBool::new(false),
1202 leader_infancy: AtomicBool::new(true),
1203 leader_watcher: tx,
1204 store_key_prefix: uuid.clone(),
1205 candidate_lease_ttl,
1206 meta_lease_ttl,
1207 sql_set: ElectionSqlFactory::new(table_name).build(),
1208 };
1209
1210 let candidates = mysql_election.all_candidates().await.unwrap();
1211 assert_eq!(candidates.len(), 10);
1212
1213 for handle in handles {
1214 handle.abort();
1215 }
1216
1217 tokio::time::sleep(candidate_lease_ttl + Duration::from_secs(1)).await;
1219 let candidates = mysql_election.all_candidates().await.unwrap();
1220 assert!(candidates.is_empty());
1221
1222 let client = mysql_election.client.lock().await;
1224 let mut executor = Executor::Default(client);
1225 for i in 0..10 {
1226 let key = format!("{}{}{}{}", uuid, CANDIDATES_ROOT, leader_value_prefix, i);
1227 let res = mysql_election
1228 .delete_value(&key, &mut executor)
1229 .await
1230 .unwrap();
1231 assert!(res);
1232 }
1233
1234 std::mem::drop(executor);
1236 drop_table(&mysql_election.client, table_name).await;
1237 }
1238
1239 async fn elected(
1240 election: &MySqlElection,
1241 table_name: &str,
1242 expected_lease: Option<Lease>,
1243 ) -> Result<()> {
1244 let mut client = election.client.lock().await;
1245 let txn = client.transaction().await.unwrap();
1246 let mut executor = Executor::Txn(txn);
1247 let raw_query = format!("SELECT * FROM {} FOR UPDATE;", table_name);
1248 let query = sqlx::query(&raw_query);
1249 let _ = executor.query(query, &raw_query).await.unwrap();
1250 election.elected(executor, expected_lease).await
1251 }
1252
1253 async fn get_lease(election: &MySqlElection) -> Option<Lease> {
1254 let client = election.client.lock().await;
1255 let mut executor = Executor::Default(client);
1256 election
1257 .get_value_with_lease(&election.election_key(), &mut executor)
1258 .await
1259 .unwrap()
1260 }
1261
1262 #[tokio::test]
1263 async fn test_elected_with_incorrect_lease_fails() {
1264 maybe_skip_mysql_integration_test!();
1265 let leader_value = "test_leader".to_string();
1266 let candidate_lease_ttl = Duration::from_secs(5);
1267 let meta_lease_ttl = Duration::from_secs(2);
1268 let execution_timeout = Duration::from_secs(10);
1269 let idle_session_timeout = Duration::from_secs(0);
1270 let uuid = uuid::Uuid::new_v4().to_string();
1271 let table_name = "test_elected_failed_greptime_metakv";
1272 let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1273 .await
1274 .unwrap();
1275
1276 let (tx, _) = broadcast::channel(100);
1277 let leader_mysql_election = MySqlElection {
1278 leader_value: leader_value.clone(),
1279 client,
1280 is_leader: AtomicBool::new(false),
1281 leader_infancy: AtomicBool::new(true),
1282 leader_watcher: tx,
1283 store_key_prefix: uuid,
1284 candidate_lease_ttl,
1285 meta_lease_ttl,
1286 sql_set: ElectionSqlFactory::new(table_name).build(),
1287 };
1288
1289 let incorrect_lease = Lease::default();
1290 let err = elected(&leader_mysql_election, table_name, Some(incorrect_lease))
1291 .await
1292 .unwrap_err();
1293 assert_matches!(err, error::Error::LeaderLeaseChanged { .. });
1294 let lease = get_lease(&leader_mysql_election).await;
1295 assert!(lease.is_none());
1296 drop_table(&leader_mysql_election.client, table_name).await;
1297 }
1298
1299 #[tokio::test]
1300 async fn test_reelection_with_idle_session_timeout() {
1301 maybe_skip_mysql_integration_test!();
1302 let leader_value = "test_leader".to_string();
1303 let uuid = uuid::Uuid::new_v4().to_string();
1304 let table_name = "test_reelection_greptime_metakv";
1305 let candidate_lease_ttl = Duration::from_secs(5);
1306 let meta_lease_ttl = Duration::from_secs(5);
1307 let execution_timeout = Duration::from_secs(10);
1308 let idle_session_timeout = Duration::from_secs(2);
1309 let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1310 .await
1311 .unwrap();
1312
1313 let (tx, _) = broadcast::channel(100);
1314 let leader_mysql_election = MySqlElection {
1315 leader_value: leader_value.clone(),
1316 client,
1317 is_leader: AtomicBool::new(false),
1318 leader_infancy: AtomicBool::new(true),
1319 leader_watcher: tx,
1320 store_key_prefix: uuid,
1321 candidate_lease_ttl,
1322 meta_lease_ttl,
1323 sql_set: ElectionSqlFactory::new(table_name).build(),
1324 };
1325
1326 elected(&leader_mysql_election, table_name, None)
1327 .await
1328 .unwrap();
1329 let lease = get_lease(&leader_mysql_election).await.unwrap();
1330 assert_eq!(lease.leader_value, leader_value);
1331 assert!(lease.expire_time > lease.current);
1332 assert!(leader_mysql_election.is_leader());
1333 tokio::time::sleep(Duration::from_millis(2100)).await;
1335 leader_mysql_election
1337 .client
1338 .lock()
1339 .await
1340 .query(sqlx::query("SELECT 1"), "SELECT 1")
1341 .await
1342 .unwrap_err();
1343 leader_mysql_election
1345 .client
1346 .lock()
1347 .await
1348 .reset_client()
1349 .await
1350 .unwrap();
1351
1352 elected(&leader_mysql_election, table_name, Some(lease.clone()))
1354 .await
1355 .unwrap();
1356 let lease = get_lease(&leader_mysql_election).await.unwrap();
1357 assert_eq!(lease.leader_value, leader_value);
1358 assert!(lease.expire_time > lease.current);
1359 assert!(leader_mysql_election.is_leader());
1360 drop_table(&leader_mysql_election.client, table_name).await;
1361 }
1362
1363 #[tokio::test]
1364 async fn test_elected_and_step_down() {
1365 maybe_skip_mysql_integration_test!();
1366 let leader_value = "test_leader".to_string();
1367 let candidate_lease_ttl = Duration::from_secs(5);
1368 let meta_lease_ttl = Duration::from_secs(2);
1369 let execution_timeout = Duration::from_secs(10);
1370 let idle_session_timeout = Duration::from_secs(0);
1371 let uuid = uuid::Uuid::new_v4().to_string();
1372 let table_name = "test_elected_and_step_down_greptime_metakv";
1373 let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1374 .await
1375 .unwrap();
1376
1377 let (tx, mut rx) = broadcast::channel(100);
1378 let leader_mysql_election = MySqlElection {
1379 leader_value: leader_value.clone(),
1380 client,
1381 is_leader: AtomicBool::new(false),
1382 leader_infancy: AtomicBool::new(true),
1383 leader_watcher: tx,
1384 store_key_prefix: uuid,
1385 candidate_lease_ttl,
1386 meta_lease_ttl,
1387 sql_set: ElectionSqlFactory::new(table_name).build(),
1388 };
1389
1390 elected(&leader_mysql_election, table_name, None)
1391 .await
1392 .unwrap();
1393 let lease = get_lease(&leader_mysql_election).await.unwrap();
1394 assert_eq!(lease.leader_value, leader_value);
1395 assert!(lease.expire_time > lease.current);
1396 assert!(leader_mysql_election.is_leader());
1397
1398 match rx.recv().await {
1399 Ok(LeaderChangeMessage::Elected(key)) => {
1400 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1401 assert_eq!(
1402 String::from_utf8_lossy(key.key()),
1403 leader_mysql_election.election_key()
1404 );
1405 assert_eq!(key.lease_id(), i64::default());
1406 assert_eq!(key.revision(), i64::default());
1407 }
1408 _ => panic!("Expected LeaderChangeMessage::Elected"),
1409 }
1410
1411 leader_mysql_election
1412 .step_down_without_lock()
1413 .await
1414 .unwrap();
1415 let lease = get_lease(&leader_mysql_election).await.unwrap();
1416 assert_eq!(lease.leader_value, leader_value);
1417 assert!(!leader_mysql_election.is_leader());
1418
1419 match rx.recv().await {
1420 Ok(LeaderChangeMessage::StepDown(key)) => {
1421 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1422 assert_eq!(
1423 String::from_utf8_lossy(key.key()),
1424 leader_mysql_election.election_key()
1425 );
1426 assert_eq!(key.lease_id(), i64::default());
1427 assert_eq!(key.revision(), i64::default());
1428 }
1429 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1430 }
1431
1432 elected(&leader_mysql_election, table_name, Some(lease.clone()))
1433 .await
1434 .unwrap();
1435 let lease = get_lease(&leader_mysql_election).await.unwrap();
1436 assert_eq!(lease.leader_value, leader_value);
1437 assert!(lease.expire_time > lease.current);
1438 assert!(leader_mysql_election.is_leader());
1439
1440 match rx.recv().await {
1441 Ok(LeaderChangeMessage::Elected(key)) => {
1442 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1443 assert_eq!(
1444 String::from_utf8_lossy(key.key()),
1445 leader_mysql_election.election_key()
1446 );
1447 assert_eq!(key.lease_id(), i64::default());
1448 assert_eq!(key.revision(), i64::default());
1449 }
1450 _ => panic!("Expected LeaderChangeMessage::Elected"),
1451 }
1452
1453 drop_table(&leader_mysql_election.client, table_name).await;
1454 }
1455
1456 #[tokio::test]
1457 async fn test_campaign() {
1458 maybe_skip_mysql_integration_test!();
1459 let leader_value = "test_leader".to_string();
1460 let uuid = uuid::Uuid::new_v4().to_string();
1461 let table_name = "test_leader_action_greptime_metakv";
1462 let candidate_lease_ttl = Duration::from_secs(5);
1463 let meta_lease_ttl = Duration::from_secs(2);
1464 let execution_timeout = Duration::from_secs(10);
1465 let idle_session_timeout = Duration::from_secs(0);
1466 let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1467 .await
1468 .unwrap();
1469
1470 let (tx, mut rx) = broadcast::channel(100);
1471 let leader_mysql_election = MySqlElection {
1472 leader_value: leader_value.clone(),
1473 client,
1474 is_leader: AtomicBool::new(false),
1475 leader_infancy: AtomicBool::new(true),
1476 leader_watcher: tx,
1477 store_key_prefix: uuid,
1478 candidate_lease_ttl,
1479 meta_lease_ttl,
1480 sql_set: ElectionSqlFactory::new(table_name).build(),
1481 };
1482
1483 leader_mysql_election.do_campaign().await.unwrap();
1485 let lease = get_lease(&leader_mysql_election).await.unwrap();
1486 assert_eq!(lease.leader_value, leader_value);
1487 assert!(lease.expire_time > lease.current);
1488 assert!(leader_mysql_election.is_leader());
1489
1490 match rx.recv().await {
1491 Ok(LeaderChangeMessage::Elected(key)) => {
1492 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1493 assert_eq!(
1494 String::from_utf8_lossy(key.key()),
1495 leader_mysql_election.election_key()
1496 );
1497 assert_eq!(key.lease_id(), i64::default());
1498 assert_eq!(key.revision(), i64::default());
1499 }
1500 _ => panic!("Expected LeaderChangeMessage::Elected"),
1501 }
1502
1503 leader_mysql_election.do_campaign().await.unwrap();
1505 let new_lease = get_lease(&leader_mysql_election).await.unwrap();
1506 assert_eq!(lease.leader_value, leader_value);
1507 assert!(new_lease.expire_time > lease.expire_time);
1509 assert!(new_lease.expire_time > new_lease.current);
1510 assert!(leader_mysql_election.is_leader());
1511
1512 tokio::time::sleep(meta_lease_ttl + Duration::from_secs(1)).await;
1514 leader_mysql_election.do_campaign().await.unwrap();
1515 let lease = get_lease(&leader_mysql_election).await.unwrap();
1516 assert_eq!(lease.leader_value, leader_value);
1517 assert!(lease.expire_time <= lease.current);
1518 assert!(!leader_mysql_election.is_leader());
1519
1520 match rx.recv().await {
1521 Ok(LeaderChangeMessage::StepDown(key)) => {
1522 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1523 assert_eq!(
1524 String::from_utf8_lossy(key.key()),
1525 leader_mysql_election.election_key()
1526 );
1527 assert_eq!(key.lease_id(), i64::default());
1528 assert_eq!(key.revision(), i64::default());
1529 }
1530 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1531 }
1532
1533 leader_mysql_election.do_campaign().await.unwrap();
1535 let lease = get_lease(&leader_mysql_election).await.unwrap();
1536 assert_eq!(lease.leader_value, leader_value);
1537 assert!(lease.expire_time > lease.current);
1538 assert!(leader_mysql_election.is_leader());
1539
1540 match rx.recv().await {
1541 Ok(LeaderChangeMessage::Elected(key)) => {
1542 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1543 assert_eq!(
1544 String::from_utf8_lossy(key.key()),
1545 leader_mysql_election.election_key()
1546 );
1547 assert_eq!(key.lease_id(), i64::default());
1548 assert_eq!(key.revision(), i64::default());
1549 }
1550 _ => panic!("Expected LeaderChangeMessage::Elected"),
1551 }
1552
1553 {
1555 let client = leader_mysql_election.client.lock().await;
1556 let mut executor = Executor::Default(client);
1557 leader_mysql_election
1558 .delete_value(&leader_mysql_election.election_key(), &mut executor)
1559 .await
1560 .unwrap();
1561 }
1562 leader_mysql_election.do_campaign().await.unwrap();
1563 let res = get_lease(&leader_mysql_election).await;
1564 assert!(res.is_none());
1565 assert!(!leader_mysql_election.is_leader());
1566
1567 match rx.recv().await {
1568 Ok(LeaderChangeMessage::StepDown(key)) => {
1569 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1570 assert_eq!(
1571 String::from_utf8_lossy(key.key()),
1572 leader_mysql_election.election_key()
1573 );
1574 assert_eq!(key.lease_id(), i64::default());
1575 assert_eq!(key.revision(), i64::default());
1576 }
1577 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1578 }
1579
1580 leader_mysql_election.do_campaign().await.unwrap();
1582 let lease = get_lease(&leader_mysql_election).await.unwrap();
1583 assert_eq!(lease.leader_value, leader_value);
1584 assert!(lease.expire_time > lease.current);
1585 assert!(leader_mysql_election.is_leader());
1586
1587 match rx.recv().await {
1588 Ok(LeaderChangeMessage::Elected(key)) => {
1589 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1590 assert_eq!(
1591 String::from_utf8_lossy(key.key()),
1592 leader_mysql_election.election_key()
1593 );
1594 assert_eq!(key.lease_id(), i64::default());
1595 assert_eq!(key.revision(), i64::default());
1596 }
1597 _ => panic!("Expected LeaderChangeMessage::Elected"),
1598 }
1599
1600 let another_leader_key = "another_leader";
1602 {
1603 let client = leader_mysql_election.client.lock().await;
1604 let mut executor = Executor::Default(client);
1605 leader_mysql_election
1606 .delete_value(&leader_mysql_election.election_key(), &mut executor)
1607 .await
1608 .unwrap();
1609 leader_mysql_election
1610 .put_value_with_lease(
1611 &leader_mysql_election.election_key(),
1612 another_leader_key,
1613 10,
1614 &mut executor,
1615 )
1616 .await
1617 .unwrap();
1618 }
1619 leader_mysql_election.do_campaign().await.unwrap();
1620 let lease = get_lease(&leader_mysql_election).await.unwrap();
1621 assert_eq!(lease.leader_value, another_leader_key);
1623 assert!(lease.expire_time > lease.current);
1624 assert!(!leader_mysql_election.is_leader());
1625
1626 match rx.recv().await {
1627 Ok(LeaderChangeMessage::StepDown(key)) => {
1628 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1629 assert_eq!(
1630 String::from_utf8_lossy(key.key()),
1631 leader_mysql_election.election_key()
1632 );
1633 assert_eq!(key.lease_id(), i64::default());
1634 assert_eq!(key.revision(), i64::default());
1635 }
1636 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1637 }
1638
1639 drop_table(&leader_mysql_election.client, table_name).await;
1640 }
1641
1642 #[tokio::test]
1643 async fn test_follower_action() {
1644 maybe_skip_mysql_integration_test!();
1645 common_telemetry::init_default_ut_logging();
1646 let candidate_lease_ttl = Duration::from_secs(5);
1647 let meta_lease_ttl = Duration::from_secs(1);
1648 let execution_timeout = Duration::from_secs(10);
1649 let idle_session_timeout = Duration::from_secs(0);
1650 let uuid = uuid::Uuid::new_v4().to_string();
1651 let table_name = "test_follower_action_greptime_metakv";
1652
1653 let follower_client =
1654 create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1655 .await
1656 .unwrap();
1657 let (tx, mut rx) = broadcast::channel(100);
1658 let follower_mysql_election = MySqlElection {
1659 leader_value: "test_follower".to_string(),
1660 client: follower_client,
1661 is_leader: AtomicBool::new(false),
1662 leader_infancy: AtomicBool::new(true),
1663 leader_watcher: tx,
1664 store_key_prefix: uuid.clone(),
1665 candidate_lease_ttl,
1666 meta_lease_ttl,
1667 sql_set: ElectionSqlFactory::new(table_name).build(),
1668 };
1669
1670 let leader_client =
1671 create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1672 .await
1673 .unwrap();
1674 let (tx, _) = broadcast::channel(100);
1675 let leader_mysql_election = MySqlElection {
1676 leader_value: "test_leader".to_string(),
1677 client: leader_client,
1678 is_leader: AtomicBool::new(false),
1679 leader_infancy: AtomicBool::new(true),
1680 leader_watcher: tx,
1681 store_key_prefix: uuid,
1682 candidate_lease_ttl,
1683 meta_lease_ttl,
1684 sql_set: ElectionSqlFactory::new(table_name).build(),
1685 };
1686
1687 leader_mysql_election.do_campaign().await.unwrap();
1688
1689 follower_mysql_election.do_campaign().await.unwrap();
1691
1692 tokio::time::sleep(meta_lease_ttl + Duration::from_secs(1)).await;
1694 follower_mysql_election.do_campaign().await.unwrap();
1695 assert!(follower_mysql_election.is_leader());
1696
1697 match rx.recv().await {
1698 Ok(LeaderChangeMessage::Elected(key)) => {
1699 assert_eq!(String::from_utf8_lossy(key.name()), "test_follower");
1700 assert_eq!(
1701 String::from_utf8_lossy(key.key()),
1702 follower_mysql_election.election_key()
1703 );
1704 assert_eq!(key.lease_id(), i64::default());
1705 assert_eq!(key.revision(), i64::default());
1706 }
1707 _ => panic!("Expected LeaderChangeMessage::Elected"),
1708 }
1709
1710 drop_table(&follower_mysql_election.client, table_name).await;
1711 }
1712
1713 #[tokio::test]
1714 async fn test_wait_timeout() {
1715 maybe_skip_mysql_integration_test!();
1716 common_telemetry::init_default_ut_logging();
1717 let execution_timeout = Duration::from_secs(10);
1718 let idle_session_timeout = Duration::from_secs(1);
1719
1720 let client = create_mysql_client(None, execution_timeout, idle_session_timeout)
1721 .await
1722 .unwrap();
1723 tokio::time::sleep(Duration::from_millis(1100)).await;
1724 let err = client
1726 .lock()
1727 .await
1728 .query(sqlx::query("SELECT 1"), "SELECT 1")
1729 .await
1730 .unwrap_err();
1731 assert_matches!(err, error::Error::MySqlExecution { .. });
1732 client.lock().await.reset_client().await.unwrap();
1734 let _ = client
1735 .lock()
1736 .await
1737 .query(sqlx::query("SELECT 1"), "SELECT 1")
1738 .await
1739 .unwrap();
1740 }
1741}