1use std::sync::Arc;
16use std::sync::atomic::{AtomicBool, Ordering};
17use std::time::Duration;
18
19use common_telemetry::{error, info, warn};
20use common_time::Timestamp;
21use snafu::{OptionExt, ResultExt, ensure};
22use sqlx::mysql::{MySqlArguments, MySqlRow};
23use sqlx::pool::PoolConnection;
24use sqlx::query::Query;
25use sqlx::{MySql, MySqlPool, MySqlTransaction, Row};
26use tokio::sync::{Mutex, MutexGuard, broadcast};
27use tokio::time::MissedTickBehavior;
28
29use crate::election::rds::{LEASE_SEP, Lease, RdsLeaderKey, parse_value_and_expire_time};
30use crate::election::{
31 Election, ElectionRef, LeaderChangeMessage, LeaderValue, MetasrvNodeInfo, listen_leader_change,
32 send_leader_change_and_set_flags,
33};
34use crate::error::{
35 AcquireMySqlClientSnafu, DecodeSqlValueSnafu, DeserializeFromJsonSnafu,
36 ElectionLeaderLeaseChangedSnafu, ElectionLeaderLeaseExpiredSnafu, ElectionNoLeaderSnafu,
37 MySqlExecutionSnafu, Result, SerializeToJsonSnafu, SqlExecutionTimeoutSnafu, UnexpectedSnafu,
38};
39use crate::key::{CANDIDATES_ROOT, ELECTION_KEY};
40
41struct ElectionSqlFactory<'a> {
42 table_name: &'a str,
43}
44
45struct ElectionSqlSet {
46 campaign: String,
47 put_value_with_lease: String,
57 update_value_with_lease: String,
65 get_value_with_lease: String,
70 get_value_with_lease_by_prefix: String,
79 delete_value: String,
87}
88
89impl<'a> ElectionSqlFactory<'a> {
90 fn new(table_name: &'a str) -> Self {
91 Self { table_name }
92 }
93
94 fn build(self) -> ElectionSqlSet {
95 ElectionSqlSet {
96 campaign: self.campaign_sql(),
97 put_value_with_lease: self.put_value_with_lease_sql(),
98 update_value_with_lease: self.update_value_with_lease_sql(),
99 get_value_with_lease: self.get_value_with_lease_sql(),
100 get_value_with_lease_by_prefix: self.get_value_with_lease_by_prefix_sql(),
101 delete_value: self.delete_value_sql(),
102 }
103 }
104
105 fn campaign_sql(&self) -> String {
108 format!("SELECT * FROM `{}` FOR UPDATE;", self.table_name)
109 }
110
111 fn put_value_with_lease_sql(&self) -> String {
112 format!(
113 r#"
114 INSERT INTO `{}` (k, v) VALUES (
115 ?,
116 CONCAT(
117 ?,
118 '{}',
119 DATE_FORMAT(DATE_ADD(NOW(4), INTERVAL ? SECOND), '%Y-%m-%d %T.%f')
120 )
121 )
122 ON DUPLICATE KEY UPDATE v = VALUES(v);
123 "#,
124 self.table_name, LEASE_SEP
125 )
126 }
127
128 fn update_value_with_lease_sql(&self) -> String {
129 format!(
130 r#"UPDATE `{}`
131 SET v = CONCAT(?, '{}', DATE_FORMAT(DATE_ADD(NOW(4), INTERVAL ? SECOND), '%Y-%m-%d %T.%f'))
132 WHERE k = ? AND v = ?"#,
133 self.table_name, LEASE_SEP
134 )
135 }
136
137 fn get_value_with_lease_sql(&self) -> String {
138 format!(
139 r#"SELECT v, DATE_FORMAT(NOW(4), '%Y-%m-%d %T.%f') FROM `{}` WHERE k = ?"#,
140 self.table_name
141 )
142 }
143
144 fn get_value_with_lease_by_prefix_sql(&self) -> String {
145 format!(
146 r#"SELECT v, DATE_FORMAT(NOW(4), '%Y-%m-%d %T.%f') FROM `{}` WHERE k LIKE ?"#,
147 self.table_name
148 )
149 }
150
151 fn delete_value_sql(&self) -> String {
152 format!("DELETE FROM `{}` WHERE k = ?;", self.table_name)
153 }
154}
155
156enum Executor<'a> {
157 Default(MutexGuard<'a, ElectionMysqlClient>),
158 Txn(TransactionWithExecutionTimeout<'a>),
159}
160
161impl Executor<'_> {
162 async fn query(
163 &mut self,
164 query: Query<'_, MySql, MySqlArguments>,
165 sql: &str,
166 ) -> Result<Vec<MySqlRow>> {
167 match self {
168 Executor::Default(client) => client.query(query, sql).await,
169 Executor::Txn(txn) => txn.query(query, sql).await,
170 }
171 }
172
173 async fn execute(&mut self, query: Query<'_, MySql, MySqlArguments>, sql: &str) -> Result<u64> {
174 match self {
175 Executor::Default(client) => client.execute(query, sql).await,
176 Executor::Txn(txn) => txn.execute(query, sql).await,
177 }
178 }
179
180 async fn commit(self) -> Result<()> {
181 match self {
182 Executor::Txn(txn) => txn.commit().await,
183 _ => Ok(()),
184 }
185 }
186}
187
188pub struct ElectionMysqlClient {
190 current: Option<PoolConnection<MySql>>,
191 pool: MySqlPool,
192
193 execution_timeout: Duration,
198
199 max_execution_time: Duration,
204
205 innode_lock_wait_timeout: Duration,
210
211 wait_timeout: Duration,
217
218 table_for_election: String,
220}
221
222impl ElectionMysqlClient {
223 pub fn new(
224 pool: MySqlPool,
225 execution_timeout: Duration,
226 max_execution_time: Duration,
227 innode_lock_wait_timeout: Duration,
228 wait_timeout: Duration,
229 table_for_election: &str,
230 ) -> Self {
231 Self {
232 current: None,
233 pool,
234 execution_timeout,
235 max_execution_time,
236 innode_lock_wait_timeout,
237 wait_timeout,
238 table_for_election: table_for_election.to_string(),
239 }
240 }
241
242 fn create_table_sql(&self) -> String {
243 format!(
244 r#"
245 CREATE TABLE IF NOT EXISTS `{}` (
246 k VARBINARY(3072) PRIMARY KEY,
247 v BLOB
248 );"#,
249 self.table_for_election
250 )
251 }
252
253 fn insert_once_sql(&self) -> String {
254 format!(
255 "INSERT IGNORE INTO `{}` (k, v) VALUES ('__place_holder_for_lock', '');",
256 self.table_for_election
257 )
258 }
259
260 fn check_version_sql(&self) -> String {
261 "SELECT @@version;".to_string()
262 }
263
264 async fn reset_client(&mut self) -> Result<()> {
265 self.current = None;
266 self.maybe_init_client().await
267 }
268
269 async fn ensure_table_exists(&mut self) -> Result<()> {
270 let create_table_sql = self.create_table_sql();
271 let query = sqlx::query(&create_table_sql);
272 self.execute(query, &create_table_sql).await?;
273 let insert_once_sql = self.insert_once_sql();
275 let query = sqlx::query(&insert_once_sql);
276 self.execute(query, &insert_once_sql).await?;
277 Ok(())
278 }
279
280 async fn maybe_init_client(&mut self) -> Result<()> {
281 if self.current.is_none() {
282 let client = self.pool.acquire().await.context(AcquireMySqlClientSnafu)?;
283
284 self.current = Some(client);
285 let (query, sql) = if !self.wait_timeout.is_zero() {
286 let sql = "SET SESSION wait_timeout = ?, innodb_lock_wait_timeout = ?, max_execution_time = ?;";
287 (
288 sqlx::query(sql)
289 .bind(self.wait_timeout.as_secs())
290 .bind(self.innode_lock_wait_timeout.as_secs())
291 .bind(self.max_execution_time.as_millis() as u64),
292 sql,
293 )
294 } else {
295 let sql = "SET SESSION innodb_lock_wait_timeout = ?, max_execution_time = ?;";
296 (
297 sqlx::query(sql)
298 .bind(self.innode_lock_wait_timeout.as_secs())
299 .bind(self.max_execution_time.as_millis() as u64),
300 sql,
301 )
302 };
303 self.set_session_isolation_level().await?;
304 self.execute(query, sql).await?;
305 self.check_version(&self.check_version_sql()).await?;
306 }
307 Ok(())
308 }
309
310 async fn set_session_isolation_level(&mut self) -> Result<()> {
315 let sql = "SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE";
316 let query = sqlx::query(sql);
317 self.execute(query, sql).await?;
318 Ok(())
319 }
320
321 async fn check_version(&mut self, sql: &str) -> Result<()> {
326 let query = sqlx::query(sql);
328 let client = self.current.as_mut().unwrap();
330 let row = tokio::time::timeout(self.execution_timeout, query.fetch_one(&mut **client))
331 .await
332 .map_err(|_| {
333 SqlExecutionTimeoutSnafu {
334 sql,
335 duration: self.execution_timeout,
336 }
337 .build()
338 })?;
339 match row {
340 Ok(row) => {
341 let version: String = row.try_get(0).context(DecodeSqlValueSnafu {})?;
342 if !version.starts_with("8.0") && !version.starts_with("5.7") {
343 warn!(
344 "Unsupported MySQL version: {}, expected: [5.7, 8.0]",
345 version
346 );
347 }
348 }
349 Err(e) => {
350 warn!(e; "Failed to check MySQL version through sql: {}", sql);
351 }
352 }
353 Ok(())
354 }
355
356 async fn execute(&mut self, query: Query<'_, MySql, MySqlArguments>, sql: &str) -> Result<u64> {
361 let client = self.current.as_mut().unwrap();
363 let future = query.execute(&mut **client);
364 let rows_affected = tokio::time::timeout(self.execution_timeout, future)
365 .await
366 .map_err(|_| {
367 SqlExecutionTimeoutSnafu {
368 sql,
369 duration: self.execution_timeout,
370 }
371 .build()
372 })?
373 .context(MySqlExecutionSnafu { sql })?
374 .rows_affected();
375 Ok(rows_affected)
376 }
377
378 async fn query(
383 &mut self,
384 query: Query<'_, MySql, MySqlArguments>,
385 sql: &str,
386 ) -> Result<Vec<MySqlRow>> {
387 let client = self.current.as_mut().unwrap();
389 let future = query.fetch_all(&mut **client);
390 tokio::time::timeout(self.execution_timeout, future)
391 .await
392 .map_err(|_| {
393 SqlExecutionTimeoutSnafu {
394 sql,
395 duration: self.execution_timeout,
396 }
397 .build()
398 })?
399 .context(MySqlExecutionSnafu { sql })
400 }
401
402 async fn transaction(&mut self) -> Result<TransactionWithExecutionTimeout<'_>> {
403 use sqlx::Acquire;
404 let client = self.current.as_mut().unwrap();
405 let transaction = client
406 .begin()
407 .await
408 .context(MySqlExecutionSnafu { sql: "BEGIN" })?;
409
410 Ok(TransactionWithExecutionTimeout {
411 transaction,
412 execution_timeout: self.execution_timeout,
413 })
414 }
415}
416
417struct TransactionWithExecutionTimeout<'a> {
418 transaction: MySqlTransaction<'a>,
419 execution_timeout: Duration,
420}
421
422impl TransactionWithExecutionTimeout<'_> {
423 async fn query(
424 &mut self,
425 query: Query<'_, MySql, MySqlArguments>,
426 sql: &str,
427 ) -> Result<Vec<MySqlRow>> {
428 let res = tokio::time::timeout(
429 self.execution_timeout,
430 query.fetch_all(&mut *self.transaction),
431 )
432 .await
433 .map_err(|_| {
434 SqlExecutionTimeoutSnafu {
435 sql,
436 duration: self.execution_timeout,
437 }
438 .build()
439 })?
440 .context(MySqlExecutionSnafu { sql })?;
441 Ok(res)
442 }
443
444 async fn execute(&mut self, query: Query<'_, MySql, MySqlArguments>, sql: &str) -> Result<u64> {
445 let res = tokio::time::timeout(
446 self.execution_timeout,
447 query.execute(&mut *self.transaction),
448 )
449 .await
450 .map_err(|_| {
451 SqlExecutionTimeoutSnafu {
452 sql,
453 duration: self.execution_timeout,
454 }
455 .build()
456 })?
457 .context(MySqlExecutionSnafu { sql })?;
458 Ok(res.rows_affected())
459 }
460
461 async fn commit(self) -> Result<()> {
462 tokio::time::timeout(self.execution_timeout, self.transaction.commit())
463 .await
464 .map_err(|_| {
465 SqlExecutionTimeoutSnafu {
466 sql: "COMMIT",
467 duration: self.execution_timeout,
468 }
469 .build()
470 })?
471 .context(MySqlExecutionSnafu { sql: "COMMIT" })?;
472 Ok(())
473 }
474}
475
476pub struct MySqlElection {
478 leader_value: String,
479 client: Mutex<ElectionMysqlClient>,
480 is_leader: AtomicBool,
481 leader_infancy: AtomicBool,
482 leader_watcher: broadcast::Sender<LeaderChangeMessage>,
483 store_key_prefix: String,
484 candidate_lease_ttl: Duration,
485 meta_lease_ttl: Duration,
486 sql_set: ElectionSqlSet,
487}
488
489impl MySqlElection {
490 pub async fn with_mysql_client(
491 leader_value: String,
492 mut client: ElectionMysqlClient,
493 store_key_prefix: String,
494 candidate_lease_ttl: Duration,
495 meta_lease_ttl: Duration,
496 table_name: &str,
497 ) -> Result<ElectionRef> {
498 let sql_factory = ElectionSqlFactory::new(table_name);
499 client.maybe_init_client().await?;
500 client.ensure_table_exists().await?;
501 let tx = listen_leader_change(leader_value.clone());
502 Ok(Arc::new(Self {
503 leader_value,
504 client: Mutex::new(client),
505 is_leader: AtomicBool::new(false),
506 leader_infancy: AtomicBool::new(false),
507 leader_watcher: tx,
508 store_key_prefix,
509 candidate_lease_ttl,
510 meta_lease_ttl,
511 sql_set: sql_factory.build(),
512 }))
513 }
514
515 fn election_key(&self) -> String {
516 format!("{}{}", self.store_key_prefix, ELECTION_KEY)
517 }
518
519 fn candidate_root(&self) -> String {
520 format!("{}{}", self.store_key_prefix, CANDIDATES_ROOT)
521 }
522
523 fn candidate_key(&self) -> String {
524 format!("{}{}", self.candidate_root(), self.leader_value)
525 }
526
527 async fn maybe_init_client(&self) -> Result<()> {
528 let mut client = self.client.lock().await;
529 client.maybe_init_client().await?;
530 Ok(())
531 }
532}
533
534#[async_trait::async_trait]
535impl Election for MySqlElection {
536 type Leader = LeaderValue;
537
538 fn is_leader(&self) -> bool {
539 self.is_leader.load(Ordering::Relaxed)
540 }
541
542 fn in_leader_infancy(&self) -> bool {
543 self.leader_infancy
544 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
545 .is_ok()
546 }
547
548 async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> {
549 self.maybe_init_client().await?;
550 let key = self.candidate_key();
551 let node_info =
552 serde_json::to_string(node_info).with_context(|_| SerializeToJsonSnafu {
553 input: format!("{node_info:?}"),
554 })?;
555
556 {
557 let client = self.client.lock().await;
558 let mut executor = Executor::Default(client);
559 let res = self
560 .put_value_with_lease(
561 &key,
562 &node_info,
563 self.candidate_lease_ttl.as_secs(),
564 &mut executor,
565 )
566 .await?;
567 if !res {
569 warn!("Candidate already registered, update the lease");
570 self.delete_value(&key, &mut executor).await?;
571 self.put_value_with_lease(
572 &key,
573 &node_info,
574 self.candidate_lease_ttl.as_secs(),
575 &mut executor,
576 )
577 .await?;
578 }
579 }
580
581 let mut keep_alive_interval = tokio::time::interval(self.candidate_lease_ttl / 2);
583 loop {
584 let _ = keep_alive_interval.tick().await;
585 let client = self.client.lock().await;
586 let mut executor = Executor::Default(client);
587 let lease = self
588 .get_value_with_lease(&key, &mut executor)
589 .await?
590 .unwrap_or_default();
591
592 ensure!(
593 lease.expire_time > lease.current,
594 UnexpectedSnafu {
595 err_msg: format!(
596 "Candidate lease expired at {:?} (current time: {:?}), key: {:?}",
597 lease.expire_time,
598 lease.current,
599 String::from_utf8_lossy(&key.into_bytes())
600 ),
601 }
602 );
603
604 self.update_value_with_lease(
605 &key,
606 &lease.origin,
607 &node_info,
608 self.candidate_lease_ttl.as_secs(),
609 &mut executor,
610 )
611 .await?;
612 std::mem::drop(executor);
613 }
614 }
615
616 async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>> {
617 self.maybe_init_client().await?;
618 let key_prefix = self.candidate_root();
619 let client = self.client.lock().await;
620 let mut executor = Executor::Default(client);
621 let (mut candidates, current) = self
622 .get_value_with_lease_by_prefix(&key_prefix, &mut executor)
623 .await?;
624 candidates.retain(|c| c.1 > current);
626 let mut valid_candidates = Vec::with_capacity(candidates.len());
627 for (c, _) in candidates {
628 let node_info: MetasrvNodeInfo =
629 serde_json::from_str(&c).with_context(|_| DeserializeFromJsonSnafu {
630 input: format!("{:?}", c),
631 })?;
632 valid_candidates.push(node_info);
633 }
634 Ok(valid_candidates)
635 }
636
637 async fn campaign(&self) -> Result<()> {
638 self.maybe_init_client().await?;
639 let mut keep_alive_interval = tokio::time::interval(self.meta_lease_ttl / 2);
640 keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
641 loop {
642 self.do_campaign().await?;
643 keep_alive_interval.tick().await;
644 }
645 }
646
647 async fn reset_campaign(&self) {
648 info!("Resetting campaign");
649 if self.is_leader.load(Ordering::Relaxed) {
650 if let Err(err) = self.step_down_without_lock().await {
651 error!(err; "Failed to step down without lock");
652 }
653 info!("Step down without lock successfully, due to reset campaign");
654 }
655 if let Err(err) = self.client.lock().await.reset_client().await {
656 error!(err; "Failed to reset client");
657 }
658 }
659
660 async fn leader(&self) -> Result<Self::Leader> {
661 self.maybe_init_client().await?;
662 if self.is_leader.load(Ordering::Relaxed) {
663 Ok(self.leader_value.as_bytes().into())
664 } else {
665 let key = self.election_key();
666
667 let client = self.client.lock().await;
668 let mut executor = Executor::Default(client);
669 if let Some(lease) = self.get_value_with_lease(&key, &mut executor).await? {
670 ensure!(lease.expire_time > lease.current, ElectionNoLeaderSnafu);
671 Ok(lease.leader_value.as_bytes().into())
672 } else {
673 ElectionNoLeaderSnafu.fail()
674 }
675 }
676 }
677
678 async fn resign(&self) -> Result<()> {
679 todo!()
680 }
681
682 fn subscribe_leader_change(&self) -> broadcast::Receiver<LeaderChangeMessage> {
683 self.leader_watcher.subscribe()
684 }
685}
686
687impl MySqlElection {
688 async fn get_value_with_lease(
690 &self,
691 key: &str,
692 executor: &mut Executor<'_>,
693 ) -> Result<Option<Lease>> {
694 let key = key.as_bytes();
695 let query = sqlx::query(&self.sql_set.get_value_with_lease).bind(key);
696 let res = executor
697 .query(query, &self.sql_set.get_value_with_lease)
698 .await?;
699
700 if res.is_empty() {
701 return Ok(None);
702 }
703 let current_time_str = String::from_utf8_lossy(res[0].try_get(1).unwrap());
705 let current_time = match Timestamp::from_str(¤t_time_str, None) {
706 Ok(ts) => ts,
707 Err(_) => UnexpectedSnafu {
708 err_msg: format!("Invalid timestamp: {}", current_time_str),
709 }
710 .fail()?,
711 };
712 let value_and_expire_time = String::from_utf8_lossy(res[0].try_get(0).unwrap_or_default());
714 let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
715
716 Ok(Some(Lease {
717 leader_value: value,
718 expire_time,
719 current: current_time,
720 origin: value_and_expire_time.to_string(),
721 }))
722 }
723
724 async fn get_value_with_lease_by_prefix(
726 &self,
727 key_prefix: &str,
728 executor: &mut Executor<'_>,
729 ) -> Result<(Vec<(String, Timestamp)>, Timestamp)> {
730 let key_prefix = format!("{}%", key_prefix).as_bytes().to_vec();
731 let query = sqlx::query(&self.sql_set.get_value_with_lease_by_prefix).bind(key_prefix);
732 let res = executor
733 .query(query, &self.sql_set.get_value_with_lease_by_prefix)
734 .await?;
735
736 let mut values_with_leases = vec![];
737 let mut current = Timestamp::default();
738 for row in res {
739 let current_time_str = row.try_get(1).unwrap_or_default();
740 current = match Timestamp::from_str(current_time_str, None) {
741 Ok(ts) => ts,
742 Err(_) => UnexpectedSnafu {
743 err_msg: format!("Invalid timestamp: {}", current_time_str),
744 }
745 .fail()?,
746 };
747
748 let value_and_expire_time = String::from_utf8_lossy(row.try_get(0).unwrap_or_default());
749 let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
750
751 values_with_leases.push((value, expire_time));
752 }
753 Ok((values_with_leases, current))
754 }
755
756 async fn update_value_with_lease(
757 &self,
758 key: &str,
759 prev: &str,
760 updated: &str,
761 lease_ttl: u64,
762 executor: &mut Executor<'_>,
763 ) -> Result<()> {
764 let key = key.as_bytes();
765 let prev = prev.as_bytes();
766 let updated = updated.as_bytes();
767
768 let query = sqlx::query(&self.sql_set.update_value_with_lease)
769 .bind(updated)
770 .bind(lease_ttl as f64)
771 .bind(key)
772 .bind(prev);
773 let res = executor
774 .execute(query, &self.sql_set.update_value_with_lease)
775 .await?;
776
777 ensure!(
778 res == 1,
779 UnexpectedSnafu {
780 err_msg: format!("Failed to update key: {}", String::from_utf8_lossy(key)),
781 }
782 );
783
784 Ok(())
785 }
786
787 async fn put_value_with_lease(
789 &self,
790 key: &str,
791 value: &str,
792 lease_ttl_secs: u64,
793 executor: &mut Executor<'_>,
794 ) -> Result<bool> {
795 let key = key.as_bytes();
796 let lease_ttl_secs = lease_ttl_secs as f64;
797 let query = sqlx::query(&self.sql_set.put_value_with_lease)
798 .bind(key)
799 .bind(value)
800 .bind(lease_ttl_secs);
801 let res = executor
802 .execute(query, &self.sql_set.put_value_with_lease)
803 .await?;
804 Ok(res == 1)
805 }
806
807 async fn delete_value(&self, key: &str, executor: &mut Executor<'_>) -> Result<bool> {
810 let key = key.as_bytes();
811 let query = sqlx::query(&self.sql_set.delete_value).bind(key);
812 let res = executor.execute(query, &self.sql_set.delete_value).await?;
813
814 Ok(res == 1)
815 }
816
817 async fn do_campaign(&self) -> Result<()> {
820 let lease = {
821 let client = self.client.lock().await;
822 let mut executor = Executor::Default(client);
823 self.get_value_with_lease(&self.election_key(), &mut executor)
824 .await?
825 };
826
827 let is_leader = self.is_leader();
828 let is_current_leader = lease
831 .as_ref()
832 .map(|lease| lease.leader_value == self.leader_value)
833 .unwrap_or(false);
834 match (self.lease_check(&lease), is_leader, is_current_leader) {
835 (Ok(_), true, true) => {
837 let mut client = self.client.lock().await;
838 let txn = client.transaction().await?;
839 let mut executor = Executor::Txn(txn);
840 let query = sqlx::query(&self.sql_set.campaign);
841 executor.query(query, &self.sql_set.campaign).await?;
842 self.renew_lease(executor, lease.unwrap()).await?;
844 }
845 (Err(err), true, _) => {
848 warn!(err; "Leader lease expired, step down...");
849 self.step_down_without_lock().await?;
850 }
851 (Ok(_), true, false) => {
852 warn!("Leader lease expired, step down...");
853 self.step_down_without_lock().await?;
854 }
855 (Err(err), false, _) => {
857 warn!(err; "Leader lease expired, elect myself.");
858 let mut client = self.client.lock().await;
859 let txn = client.transaction().await?;
860 let mut executor = Executor::Txn(txn);
861 let query = sqlx::query(&self.sql_set.campaign);
862 executor.query(query, &self.sql_set.campaign).await?;
863 self.elected(executor, lease).await?;
864 }
865 (Ok(_), false, true) => {
868 warn!("I should be the leader, but I don't think so. Something went wrong.");
869 let mut client = self.client.lock().await;
870 let txn = client.transaction().await?;
871 let mut executor = Executor::Txn(txn);
872 let query = sqlx::query(&self.sql_set.campaign);
873 executor.query(query, &self.sql_set.campaign).await?;
874 self.renew_lease(executor, lease.unwrap()).await?;
876 }
877 (Ok(_), false, false) => {}
879 }
880 Ok(())
881 }
882
883 async fn renew_lease(&self, mut executor: Executor<'_>, lease: Lease) -> Result<()> {
885 let key = self.election_key();
886 self.update_value_with_lease(
887 &key,
888 &lease.origin,
889 &self.leader_value,
890 self.meta_lease_ttl.as_secs(),
891 &mut executor,
892 )
893 .await?;
894 executor.commit().await?;
895
896 if !self.is_leader() {
897 let key = self.election_key();
898 let leader_key = RdsLeaderKey {
899 name: self.leader_value.clone().into_bytes(),
900 key: key.clone().into_bytes(),
901 ..Default::default()
902 };
903 send_leader_change_and_set_flags(
904 &self.is_leader,
905 &self.leader_infancy,
906 &self.leader_watcher,
907 LeaderChangeMessage::Elected(Arc::new(leader_key)),
908 );
909 }
910
911 Ok(())
912 }
913
914 fn lease_check(&self, lease: &Option<Lease>) -> Result<Lease> {
923 let lease = lease.as_ref().context(ElectionNoLeaderSnafu)?;
924 ensure!(
926 lease.expire_time > lease.current,
927 ElectionLeaderLeaseExpiredSnafu
928 );
929 Ok(lease.clone())
931 }
932
933 async fn step_down_without_lock(&self) -> Result<()> {
935 let key = self.election_key().into_bytes();
936 let leader_key = RdsLeaderKey {
937 name: self.leader_value.clone().into_bytes(),
938 key: key.clone(),
939 ..Default::default()
940 };
941 send_leader_change_and_set_flags(
942 &self.is_leader,
943 &self.leader_infancy,
944 &self.leader_watcher,
945 LeaderChangeMessage::StepDown(Arc::new(leader_key)),
946 );
947 Ok(())
948 }
949
950 async fn elected(
953 &self,
954 mut executor: Executor<'_>,
955 expected_lease: Option<Lease>,
956 ) -> Result<()> {
957 let key = self.election_key();
958 let leader_key = RdsLeaderKey {
959 name: self.leader_value.clone().into_bytes(),
960 key: key.clone().into_bytes(),
961 ..Default::default()
962 };
963 let remote_lease = self.get_value_with_lease(&key, &mut executor).await?;
964 ensure!(
965 expected_lease.map(|lease| lease.origin) == remote_lease.map(|lease| lease.origin),
966 ElectionLeaderLeaseChangedSnafu
967 );
968 self.delete_value(&key, &mut executor).await?;
969 self.put_value_with_lease(
970 &key,
971 &self.leader_value,
972 self.meta_lease_ttl.as_secs(),
973 &mut executor,
974 )
975 .await?;
976 executor.commit().await?;
977
978 send_leader_change_and_set_flags(
979 &self.is_leader,
980 &self.leader_infancy,
981 &self.leader_watcher,
982 LeaderChangeMessage::Elected(Arc::new(leader_key)),
983 );
984 Ok(())
985 }
986}
987
988#[cfg(test)]
989mod tests {
990 use std::{assert_matches, env};
991
992 use common_telemetry::init_default_ut_logging;
993 use sqlx::MySqlPool;
994
995 use super::*;
996 use crate::{error, maybe_skip_mysql_integration_test};
997
998 async fn create_mysql_client(
999 table_name: Option<&str>,
1000 execution_timeout: Duration,
1001 wait_timeout: Duration,
1002 ) -> Result<Mutex<ElectionMysqlClient>> {
1003 init_default_ut_logging();
1004 let endpoint = env::var("GT_MYSQL_ENDPOINTS").unwrap_or_default();
1005 if endpoint.is_empty() {
1006 return UnexpectedSnafu {
1007 err_msg: "MySQL endpoint is empty".to_string(),
1008 }
1009 .fail();
1010 }
1011 let pool = MySqlPool::connect(&endpoint).await.unwrap();
1012 let mut client = ElectionMysqlClient::new(
1013 pool,
1014 execution_timeout,
1015 execution_timeout,
1016 Duration::from_secs(1),
1017 wait_timeout,
1018 table_name.unwrap_or("default_greptime_metakv-election"),
1019 );
1020 client.maybe_init_client().await?;
1021 if table_name.is_some() {
1022 client.ensure_table_exists().await?;
1023 }
1024 Ok(Mutex::new(client))
1025 }
1026
1027 async fn drop_table(client: &Mutex<ElectionMysqlClient>, table_name: &str) {
1028 let mut client = client.lock().await;
1029 let sql = format!("DROP TABLE IF EXISTS `{}`;", table_name);
1030 client.execute(sqlx::query(&sql), &sql).await.unwrap();
1031 }
1032
1033 #[tokio::test]
1034 async fn test_mysql_crud() {
1035 maybe_skip_mysql_integration_test!();
1036 let key = "test_key".to_string();
1037 let value = "test_value".to_string();
1038
1039 let uuid = uuid::Uuid::new_v4().to_string();
1040 let table_name = "test_mysql_crud_greptime-metakv";
1041 let candidate_lease_ttl = Duration::from_secs(10);
1042 let meta_lease_ttl = Duration::from_secs(2);
1043
1044 let execution_timeout = Duration::from_secs(10);
1045 let idle_session_timeout = Duration::from_secs(0);
1046 let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1047 .await
1048 .unwrap();
1049
1050 {
1051 let mut a = client.lock().await;
1052 let txn = a.transaction().await.unwrap();
1053 let mut executor = Executor::Txn(txn);
1054 let raw_query = format!("SELECT * FROM `{}` FOR UPDATE;", table_name);
1055 let query = sqlx::query(&raw_query);
1056 let _ = executor.query(query, &raw_query).await.unwrap();
1057 }
1058
1059 let (tx, _) = broadcast::channel(100);
1060 let mysql_election = MySqlElection {
1061 leader_value: "test_leader".to_string(),
1062 client,
1063 is_leader: AtomicBool::new(false),
1064 leader_infancy: AtomicBool::new(true),
1065 leader_watcher: tx,
1066 store_key_prefix: uuid,
1067 candidate_lease_ttl,
1068 meta_lease_ttl,
1069 sql_set: ElectionSqlFactory::new(table_name).build(),
1070 };
1071 let client = mysql_election.client.lock().await;
1072 let mut executor = Executor::Default(client);
1073 let res = mysql_election
1074 .put_value_with_lease(&key, &value, 10, &mut executor)
1075 .await
1076 .unwrap();
1077 assert!(res);
1078
1079 let lease = mysql_election
1080 .get_value_with_lease(&key, &mut executor)
1081 .await
1082 .unwrap()
1083 .unwrap();
1084 assert_eq!(lease.leader_value, value);
1085
1086 mysql_election
1087 .update_value_with_lease(&key, &lease.origin, &value, 10, &mut executor)
1088 .await
1089 .unwrap();
1090
1091 let res = mysql_election
1092 .delete_value(&key, &mut executor)
1093 .await
1094 .unwrap();
1095 assert!(res);
1096
1097 let res = mysql_election
1098 .get_value_with_lease(&key, &mut executor)
1099 .await
1100 .unwrap();
1101 assert!(res.is_none());
1102
1103 for i in 0..10 {
1104 let key = format!("test_key_{}", i);
1105 let value = format!("test_value_{}", i);
1106 mysql_election
1107 .put_value_with_lease(&key, &value, 10, &mut executor)
1108 .await
1109 .unwrap();
1110 }
1111
1112 let key_prefix = "test_key".to_string();
1113 let (res, _) = mysql_election
1114 .get_value_with_lease_by_prefix(&key_prefix, &mut executor)
1115 .await
1116 .unwrap();
1117 assert_eq!(res.len(), 10);
1118
1119 for i in 0..10 {
1120 let key = format!("test_key_{}", i);
1121 let res = mysql_election
1122 .delete_value(&key, &mut executor)
1123 .await
1124 .unwrap();
1125 assert!(res);
1126 }
1127
1128 let (res, current) = mysql_election
1129 .get_value_with_lease_by_prefix(&key_prefix, &mut executor)
1130 .await
1131 .unwrap();
1132 assert!(res.is_empty());
1133 assert!(current == Timestamp::default());
1134
1135 std::mem::drop(executor);
1137 drop_table(&mysql_election.client, table_name).await;
1138 }
1139
1140 async fn candidate(
1141 leader_value: String,
1142 candidate_lease_ttl: Duration,
1143 store_key_prefix: String,
1144 table_name: String,
1145 ) {
1146 let meta_lease_ttl = Duration::from_secs(2);
1147 let execution_timeout = Duration::from_secs(10);
1148 let idle_session_timeout = Duration::from_secs(0);
1149 let client =
1150 create_mysql_client(Some(&table_name), execution_timeout, idle_session_timeout)
1151 .await
1152 .unwrap();
1153
1154 let (tx, _) = broadcast::channel(100);
1155 let mysql_election = MySqlElection {
1156 leader_value,
1157 client,
1158 is_leader: AtomicBool::new(false),
1159 leader_infancy: AtomicBool::new(true),
1160 leader_watcher: tx,
1161 store_key_prefix,
1162 candidate_lease_ttl,
1163 meta_lease_ttl,
1164 sql_set: ElectionSqlFactory::new(&table_name).build(),
1165 };
1166
1167 let node_info = MetasrvNodeInfo {
1168 addr: "test_addr".to_string(),
1169 version: "test_version".to_string(),
1170 git_commit: "test_git_commit".to_string(),
1171 start_time_ms: 0,
1172 total_cpu_millicores: 0,
1173 total_memory_bytes: 0,
1174 cpu_usage_millicores: 0,
1175 memory_usage_bytes: 0,
1176 hostname: "test_hostname".to_string(),
1177 };
1178 mysql_election.register_candidate(&node_info).await.unwrap();
1179 }
1180
1181 #[tokio::test]
1182 async fn test_candidate_registration() {
1183 maybe_skip_mysql_integration_test!();
1184 let leader_value_prefix = "test_leader".to_string();
1185 let candidate_lease_ttl = Duration::from_secs(2);
1186 let execution_timeout = Duration::from_secs(10);
1187 let meta_lease_ttl = Duration::from_secs(2);
1188 let idle_session_timeout = Duration::from_secs(0);
1189 let uuid = uuid::Uuid::new_v4().to_string();
1190 let table_name = "test_candidate_registration_greptime-metakv";
1191 let mut handles = vec![];
1192 let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1193 .await
1194 .unwrap();
1195
1196 for i in 0..10 {
1197 let leader_value = format!("{}{}", leader_value_prefix, i);
1198 let handle = tokio::spawn(candidate(
1199 leader_value,
1200 candidate_lease_ttl,
1201 uuid.clone(),
1202 table_name.to_string(),
1203 ));
1204 handles.push(handle);
1205 }
1206 tokio::time::sleep(candidate_lease_ttl / 2 + Duration::from_secs(1)).await;
1208
1209 let (tx, _) = broadcast::channel(100);
1210 let leader_value = "test_leader".to_string();
1211 let mysql_election = MySqlElection {
1212 leader_value,
1213 client,
1214 is_leader: AtomicBool::new(false),
1215 leader_infancy: AtomicBool::new(true),
1216 leader_watcher: tx,
1217 store_key_prefix: uuid.clone(),
1218 candidate_lease_ttl,
1219 meta_lease_ttl,
1220 sql_set: ElectionSqlFactory::new(table_name).build(),
1221 };
1222
1223 let candidates = mysql_election.all_candidates().await.unwrap();
1224 assert_eq!(candidates.len(), 10);
1225
1226 for handle in handles {
1227 handle.abort();
1228 }
1229
1230 tokio::time::sleep(candidate_lease_ttl + Duration::from_secs(1)).await;
1232 let candidates = mysql_election.all_candidates().await.unwrap();
1233 assert!(candidates.is_empty());
1234
1235 let client = mysql_election.client.lock().await;
1237 let mut executor = Executor::Default(client);
1238 for i in 0..10 {
1239 let key = format!("{}{}{}{}", uuid, CANDIDATES_ROOT, leader_value_prefix, i);
1240 let res = mysql_election
1241 .delete_value(&key, &mut executor)
1242 .await
1243 .unwrap();
1244 assert!(res);
1245 }
1246
1247 std::mem::drop(executor);
1249 drop_table(&mysql_election.client, table_name).await;
1250 }
1251
1252 async fn elected(
1253 election: &MySqlElection,
1254 table_name: &str,
1255 expected_lease: Option<Lease>,
1256 ) -> Result<()> {
1257 let mut client = election.client.lock().await;
1258 let txn = client.transaction().await.unwrap();
1259 let mut executor = Executor::Txn(txn);
1260 let raw_query = format!("SELECT * FROM `{}` FOR UPDATE;", table_name);
1261 let query = sqlx::query(&raw_query);
1262 let _ = executor.query(query, &raw_query).await.unwrap();
1263 election.elected(executor, expected_lease).await
1264 }
1265
1266 async fn get_lease(election: &MySqlElection) -> Option<Lease> {
1267 let client = election.client.lock().await;
1268 let mut executor = Executor::Default(client);
1269 election
1270 .get_value_with_lease(&election.election_key(), &mut executor)
1271 .await
1272 .unwrap()
1273 }
1274
1275 #[tokio::test]
1276 async fn test_elected_with_incorrect_lease_fails() {
1277 maybe_skip_mysql_integration_test!();
1278 let leader_value = "test_leader".to_string();
1279 let candidate_lease_ttl = Duration::from_secs(5);
1280 let meta_lease_ttl = Duration::from_secs(2);
1281 let execution_timeout = Duration::from_secs(10);
1282 let idle_session_timeout = Duration::from_secs(0);
1283 let uuid = uuid::Uuid::new_v4().to_string();
1284 let table_name = "test_elected_failed_greptime-metakv";
1285 let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1286 .await
1287 .unwrap();
1288
1289 let (tx, _) = broadcast::channel(100);
1290 let leader_mysql_election = MySqlElection {
1291 leader_value: leader_value.clone(),
1292 client,
1293 is_leader: AtomicBool::new(false),
1294 leader_infancy: AtomicBool::new(true),
1295 leader_watcher: tx,
1296 store_key_prefix: uuid,
1297 candidate_lease_ttl,
1298 meta_lease_ttl,
1299 sql_set: ElectionSqlFactory::new(table_name).build(),
1300 };
1301
1302 let incorrect_lease = Lease::default();
1303 let err = elected(&leader_mysql_election, table_name, Some(incorrect_lease))
1304 .await
1305 .unwrap_err();
1306 assert_matches!(err, error::Error::ElectionLeaderLeaseChanged { .. });
1307 let lease = get_lease(&leader_mysql_election).await;
1308 assert!(lease.is_none());
1309 drop_table(&leader_mysql_election.client, table_name).await;
1310 }
1311
1312 #[tokio::test]
1313 async fn test_reelection_with_idle_session_timeout() {
1314 maybe_skip_mysql_integration_test!();
1315 let leader_value = "test_leader".to_string();
1316 let uuid = uuid::Uuid::new_v4().to_string();
1317 let table_name = "test_reelection_greptime-metakv";
1318 let candidate_lease_ttl = Duration::from_secs(5);
1319 let meta_lease_ttl = Duration::from_secs(5);
1320 let execution_timeout = Duration::from_secs(10);
1321 let idle_session_timeout = Duration::from_secs(2);
1322 let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1323 .await
1324 .unwrap();
1325
1326 let (tx, _) = broadcast::channel(100);
1327 let leader_mysql_election = MySqlElection {
1328 leader_value: leader_value.clone(),
1329 client,
1330 is_leader: AtomicBool::new(false),
1331 leader_infancy: AtomicBool::new(true),
1332 leader_watcher: tx,
1333 store_key_prefix: uuid,
1334 candidate_lease_ttl,
1335 meta_lease_ttl,
1336 sql_set: ElectionSqlFactory::new(table_name).build(),
1337 };
1338
1339 elected(&leader_mysql_election, table_name, None)
1340 .await
1341 .unwrap();
1342 let lease = get_lease(&leader_mysql_election).await.unwrap();
1343 assert_eq!(lease.leader_value, leader_value);
1344 assert!(lease.expire_time > lease.current);
1345 assert!(leader_mysql_election.is_leader());
1346 tokio::time::sleep(Duration::from_millis(2100)).await;
1348 leader_mysql_election
1350 .client
1351 .lock()
1352 .await
1353 .query(sqlx::query("SELECT 1"), "SELECT 1")
1354 .await
1355 .unwrap_err();
1356 leader_mysql_election
1358 .client
1359 .lock()
1360 .await
1361 .reset_client()
1362 .await
1363 .unwrap();
1364
1365 elected(&leader_mysql_election, table_name, Some(lease.clone()))
1367 .await
1368 .unwrap();
1369 let lease = get_lease(&leader_mysql_election).await.unwrap();
1370 assert_eq!(lease.leader_value, leader_value);
1371 assert!(lease.expire_time > lease.current);
1372 assert!(leader_mysql_election.is_leader());
1373 drop_table(&leader_mysql_election.client, table_name).await;
1374 }
1375
1376 #[tokio::test]
1377 async fn test_elected_and_step_down() {
1378 maybe_skip_mysql_integration_test!();
1379 let leader_value = "test_leader".to_string();
1380 let candidate_lease_ttl = Duration::from_secs(5);
1381 let meta_lease_ttl = Duration::from_secs(2);
1382 let execution_timeout = Duration::from_secs(10);
1383 let idle_session_timeout = Duration::from_secs(0);
1384 let uuid = uuid::Uuid::new_v4().to_string();
1385 let table_name = "test_elected_and_step_down_greptime-metakv";
1386 let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1387 .await
1388 .unwrap();
1389
1390 let (tx, mut rx) = broadcast::channel(100);
1391 let leader_mysql_election = MySqlElection {
1392 leader_value: leader_value.clone(),
1393 client,
1394 is_leader: AtomicBool::new(false),
1395 leader_infancy: AtomicBool::new(true),
1396 leader_watcher: tx,
1397 store_key_prefix: uuid,
1398 candidate_lease_ttl,
1399 meta_lease_ttl,
1400 sql_set: ElectionSqlFactory::new(table_name).build(),
1401 };
1402
1403 elected(&leader_mysql_election, table_name, None)
1404 .await
1405 .unwrap();
1406 let lease = get_lease(&leader_mysql_election).await.unwrap();
1407 assert_eq!(lease.leader_value, leader_value);
1408 assert!(lease.expire_time > lease.current);
1409 assert!(leader_mysql_election.is_leader());
1410
1411 match rx.recv().await {
1412 Ok(LeaderChangeMessage::Elected(key)) => {
1413 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1414 assert_eq!(
1415 String::from_utf8_lossy(key.key()),
1416 leader_mysql_election.election_key()
1417 );
1418 assert_eq!(key.lease_id(), i64::default());
1419 assert_eq!(key.revision(), i64::default());
1420 }
1421 _ => panic!("Expected LeaderChangeMessage::Elected"),
1422 }
1423
1424 leader_mysql_election
1425 .step_down_without_lock()
1426 .await
1427 .unwrap();
1428 let lease = get_lease(&leader_mysql_election).await.unwrap();
1429 assert_eq!(lease.leader_value, leader_value);
1430 assert!(!leader_mysql_election.is_leader());
1431
1432 match rx.recv().await {
1433 Ok(LeaderChangeMessage::StepDown(key)) => {
1434 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1435 assert_eq!(
1436 String::from_utf8_lossy(key.key()),
1437 leader_mysql_election.election_key()
1438 );
1439 assert_eq!(key.lease_id(), i64::default());
1440 assert_eq!(key.revision(), i64::default());
1441 }
1442 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1443 }
1444
1445 elected(&leader_mysql_election, table_name, Some(lease.clone()))
1446 .await
1447 .unwrap();
1448 let lease = get_lease(&leader_mysql_election).await.unwrap();
1449 assert_eq!(lease.leader_value, leader_value);
1450 assert!(lease.expire_time > lease.current);
1451 assert!(leader_mysql_election.is_leader());
1452
1453 match rx.recv().await {
1454 Ok(LeaderChangeMessage::Elected(key)) => {
1455 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1456 assert_eq!(
1457 String::from_utf8_lossy(key.key()),
1458 leader_mysql_election.election_key()
1459 );
1460 assert_eq!(key.lease_id(), i64::default());
1461 assert_eq!(key.revision(), i64::default());
1462 }
1463 _ => panic!("Expected LeaderChangeMessage::Elected"),
1464 }
1465
1466 drop_table(&leader_mysql_election.client, table_name).await;
1467 }
1468
1469 #[tokio::test]
1470 async fn test_campaign() {
1471 maybe_skip_mysql_integration_test!();
1472 let leader_value = "test_leader".to_string();
1473 let uuid = uuid::Uuid::new_v4().to_string();
1474 let table_name = "test_leader_action_greptime-metakv";
1475 let candidate_lease_ttl = Duration::from_secs(5);
1476 let meta_lease_ttl = Duration::from_secs(2);
1477 let execution_timeout = Duration::from_secs(10);
1478 let idle_session_timeout = Duration::from_secs(0);
1479 let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1480 .await
1481 .unwrap();
1482
1483 let (tx, mut rx) = broadcast::channel(100);
1484 let leader_mysql_election = MySqlElection {
1485 leader_value: leader_value.clone(),
1486 client,
1487 is_leader: AtomicBool::new(false),
1488 leader_infancy: AtomicBool::new(true),
1489 leader_watcher: tx,
1490 store_key_prefix: uuid,
1491 candidate_lease_ttl,
1492 meta_lease_ttl,
1493 sql_set: ElectionSqlFactory::new(table_name).build(),
1494 };
1495
1496 leader_mysql_election.do_campaign().await.unwrap();
1498 let lease = get_lease(&leader_mysql_election).await.unwrap();
1499 assert_eq!(lease.leader_value, leader_value);
1500 assert!(lease.expire_time > lease.current);
1501 assert!(leader_mysql_election.is_leader());
1502
1503 match rx.recv().await {
1504 Ok(LeaderChangeMessage::Elected(key)) => {
1505 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1506 assert_eq!(
1507 String::from_utf8_lossy(key.key()),
1508 leader_mysql_election.election_key()
1509 );
1510 assert_eq!(key.lease_id(), i64::default());
1511 assert_eq!(key.revision(), i64::default());
1512 }
1513 _ => panic!("Expected LeaderChangeMessage::Elected"),
1514 }
1515
1516 leader_mysql_election.do_campaign().await.unwrap();
1518 let new_lease = get_lease(&leader_mysql_election).await.unwrap();
1519 assert_eq!(lease.leader_value, leader_value);
1520 assert!(new_lease.expire_time > lease.expire_time);
1522 assert!(new_lease.expire_time > new_lease.current);
1523 assert!(leader_mysql_election.is_leader());
1524
1525 tokio::time::sleep(meta_lease_ttl + Duration::from_secs(1)).await;
1527 leader_mysql_election.do_campaign().await.unwrap();
1528 let lease = get_lease(&leader_mysql_election).await.unwrap();
1529 assert_eq!(lease.leader_value, leader_value);
1530 assert!(lease.expire_time <= lease.current);
1531 assert!(!leader_mysql_election.is_leader());
1532
1533 match rx.recv().await {
1534 Ok(LeaderChangeMessage::StepDown(key)) => {
1535 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1536 assert_eq!(
1537 String::from_utf8_lossy(key.key()),
1538 leader_mysql_election.election_key()
1539 );
1540 assert_eq!(key.lease_id(), i64::default());
1541 assert_eq!(key.revision(), i64::default());
1542 }
1543 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1544 }
1545
1546 leader_mysql_election.do_campaign().await.unwrap();
1548 let lease = get_lease(&leader_mysql_election).await.unwrap();
1549 assert_eq!(lease.leader_value, leader_value);
1550 assert!(lease.expire_time > lease.current);
1551 assert!(leader_mysql_election.is_leader());
1552
1553 match rx.recv().await {
1554 Ok(LeaderChangeMessage::Elected(key)) => {
1555 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1556 assert_eq!(
1557 String::from_utf8_lossy(key.key()),
1558 leader_mysql_election.election_key()
1559 );
1560 assert_eq!(key.lease_id(), i64::default());
1561 assert_eq!(key.revision(), i64::default());
1562 }
1563 _ => panic!("Expected LeaderChangeMessage::Elected"),
1564 }
1565
1566 {
1568 let client = leader_mysql_election.client.lock().await;
1569 let mut executor = Executor::Default(client);
1570 leader_mysql_election
1571 .delete_value(&leader_mysql_election.election_key(), &mut executor)
1572 .await
1573 .unwrap();
1574 }
1575 leader_mysql_election.do_campaign().await.unwrap();
1576 let res = get_lease(&leader_mysql_election).await;
1577 assert!(res.is_none());
1578 assert!(!leader_mysql_election.is_leader());
1579
1580 match rx.recv().await {
1581 Ok(LeaderChangeMessage::StepDown(key)) => {
1582 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1583 assert_eq!(
1584 String::from_utf8_lossy(key.key()),
1585 leader_mysql_election.election_key()
1586 );
1587 assert_eq!(key.lease_id(), i64::default());
1588 assert_eq!(key.revision(), i64::default());
1589 }
1590 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1591 }
1592
1593 leader_mysql_election.do_campaign().await.unwrap();
1595 let lease = get_lease(&leader_mysql_election).await.unwrap();
1596 assert_eq!(lease.leader_value, leader_value);
1597 assert!(lease.expire_time > lease.current);
1598 assert!(leader_mysql_election.is_leader());
1599
1600 match rx.recv().await {
1601 Ok(LeaderChangeMessage::Elected(key)) => {
1602 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1603 assert_eq!(
1604 String::from_utf8_lossy(key.key()),
1605 leader_mysql_election.election_key()
1606 );
1607 assert_eq!(key.lease_id(), i64::default());
1608 assert_eq!(key.revision(), i64::default());
1609 }
1610 _ => panic!("Expected LeaderChangeMessage::Elected"),
1611 }
1612
1613 let another_leader_key = "another_leader";
1615 {
1616 let client = leader_mysql_election.client.lock().await;
1617 let mut executor = Executor::Default(client);
1618 leader_mysql_election
1619 .delete_value(&leader_mysql_election.election_key(), &mut executor)
1620 .await
1621 .unwrap();
1622 leader_mysql_election
1623 .put_value_with_lease(
1624 &leader_mysql_election.election_key(),
1625 another_leader_key,
1626 10,
1627 &mut executor,
1628 )
1629 .await
1630 .unwrap();
1631 }
1632 leader_mysql_election.do_campaign().await.unwrap();
1633 let lease = get_lease(&leader_mysql_election).await.unwrap();
1634 assert_eq!(lease.leader_value, another_leader_key);
1636 assert!(lease.expire_time > lease.current);
1637 assert!(!leader_mysql_election.is_leader());
1638
1639 match rx.recv().await {
1640 Ok(LeaderChangeMessage::StepDown(key)) => {
1641 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1642 assert_eq!(
1643 String::from_utf8_lossy(key.key()),
1644 leader_mysql_election.election_key()
1645 );
1646 assert_eq!(key.lease_id(), i64::default());
1647 assert_eq!(key.revision(), i64::default());
1648 }
1649 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1650 }
1651
1652 drop_table(&leader_mysql_election.client, table_name).await;
1653 }
1654
1655 #[tokio::test]
1656 async fn test_reset_campaign() {
1657 maybe_skip_mysql_integration_test!();
1658 common_telemetry::init_default_ut_logging();
1659 let leader_value = "test_leader".to_string();
1660 let uuid = uuid::Uuid::new_v4().to_string();
1661 let table_name = "test_reset_campaign_greptime-metakv";
1662 let candidate_lease_ttl = Duration::from_secs(5);
1663 let meta_lease_ttl = Duration::from_secs(2);
1664 let execution_timeout = Duration::from_secs(10);
1665 let idle_session_timeout = Duration::from_secs(0);
1666 let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1667 .await
1668 .unwrap();
1669
1670 let (tx, _) = broadcast::channel(100);
1671 let leader_mysql_election = MySqlElection {
1672 leader_value,
1673 client,
1674 is_leader: AtomicBool::new(false),
1675 leader_infancy: AtomicBool::new(true),
1676 leader_watcher: tx,
1677 store_key_prefix: uuid,
1678 candidate_lease_ttl,
1679 meta_lease_ttl,
1680 sql_set: ElectionSqlFactory::new(table_name).build(),
1681 };
1682 leader_mysql_election
1683 .is_leader
1684 .store(true, Ordering::Relaxed);
1685 leader_mysql_election.reset_campaign().await;
1686 assert!(!leader_mysql_election.is_leader());
1687 drop_table(&leader_mysql_election.client, table_name).await;
1688 }
1689
1690 #[tokio::test]
1691 async fn test_follower_action() {
1692 maybe_skip_mysql_integration_test!();
1693 common_telemetry::init_default_ut_logging();
1694 let candidate_lease_ttl = Duration::from_secs(5);
1695 let meta_lease_ttl = Duration::from_secs(1);
1696 let execution_timeout = Duration::from_secs(10);
1697 let idle_session_timeout = Duration::from_secs(0);
1698 let uuid = uuid::Uuid::new_v4().to_string();
1699 let table_name = "test_follower_action_greptime-metakv";
1700
1701 let follower_client =
1702 create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1703 .await
1704 .unwrap();
1705 let (tx, mut rx) = broadcast::channel(100);
1706 let follower_mysql_election = MySqlElection {
1707 leader_value: "test_follower".to_string(),
1708 client: follower_client,
1709 is_leader: AtomicBool::new(false),
1710 leader_infancy: AtomicBool::new(true),
1711 leader_watcher: tx,
1712 store_key_prefix: uuid.clone(),
1713 candidate_lease_ttl,
1714 meta_lease_ttl,
1715 sql_set: ElectionSqlFactory::new(table_name).build(),
1716 };
1717
1718 let leader_client =
1719 create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1720 .await
1721 .unwrap();
1722 let (tx, _) = broadcast::channel(100);
1723 let leader_mysql_election = MySqlElection {
1724 leader_value: "test_leader".to_string(),
1725 client: leader_client,
1726 is_leader: AtomicBool::new(false),
1727 leader_infancy: AtomicBool::new(true),
1728 leader_watcher: tx,
1729 store_key_prefix: uuid,
1730 candidate_lease_ttl,
1731 meta_lease_ttl,
1732 sql_set: ElectionSqlFactory::new(table_name).build(),
1733 };
1734
1735 leader_mysql_election.do_campaign().await.unwrap();
1736
1737 follower_mysql_election.do_campaign().await.unwrap();
1739
1740 tokio::time::sleep(meta_lease_ttl + Duration::from_secs(1)).await;
1742 follower_mysql_election.do_campaign().await.unwrap();
1743 assert!(follower_mysql_election.is_leader());
1744
1745 match rx.recv().await {
1746 Ok(LeaderChangeMessage::Elected(key)) => {
1747 assert_eq!(String::from_utf8_lossy(key.name()), "test_follower");
1748 assert_eq!(
1749 String::from_utf8_lossy(key.key()),
1750 follower_mysql_election.election_key()
1751 );
1752 assert_eq!(key.lease_id(), i64::default());
1753 assert_eq!(key.revision(), i64::default());
1754 }
1755 _ => panic!("Expected LeaderChangeMessage::Elected"),
1756 }
1757
1758 drop_table(&follower_mysql_election.client, table_name).await;
1759 }
1760
1761 #[tokio::test]
1762 async fn test_wait_timeout() {
1763 maybe_skip_mysql_integration_test!();
1764 common_telemetry::init_default_ut_logging();
1765 let execution_timeout = Duration::from_secs(10);
1766 let idle_session_timeout = Duration::from_secs(1);
1767
1768 let client = create_mysql_client(None, execution_timeout, idle_session_timeout)
1769 .await
1770 .unwrap();
1771 tokio::time::sleep(Duration::from_millis(1100)).await;
1772 let err = client
1774 .lock()
1775 .await
1776 .query(sqlx::query("SELECT 1"), "SELECT 1")
1777 .await
1778 .unwrap_err();
1779 assert_matches!(err, error::Error::MySqlExecution { .. });
1780 client.lock().await.reset_client().await.unwrap();
1782 let _ = client
1783 .lock()
1784 .await
1785 .query(sqlx::query("SELECT 1"), "SELECT 1")
1786 .await
1787 .unwrap();
1788 }
1789}