1use std::sync::Arc;
16use std::sync::atomic::{AtomicBool, Ordering};
17use std::time::Duration;
18
19use common_meta::key::{CANDIDATES_ROOT, ELECTION_KEY};
20use common_telemetry::{error, info, warn};
21use common_time::Timestamp;
22use snafu::{OptionExt, ResultExt, ensure};
23use sqlx::mysql::{MySqlArguments, MySqlRow};
24use sqlx::pool::PoolConnection;
25use sqlx::query::Query;
26use sqlx::{MySql, MySqlPool, MySqlTransaction, Row};
27use tokio::sync::{Mutex, MutexGuard, broadcast};
28use tokio::time::MissedTickBehavior;
29
30use crate::election::rds::{LEASE_SEP, Lease, RdsLeaderKey, parse_value_and_expire_time};
31use crate::election::{
32 Election, LeaderChangeMessage, listen_leader_change, send_leader_change_and_set_flags,
33};
34use crate::error::{
35 AcquireMySqlClientSnafu, DecodeSqlValueSnafu, DeserializeFromJsonSnafu,
36 LeaderLeaseChangedSnafu, LeaderLeaseExpiredSnafu, MySqlExecutionSnafu, NoLeaderSnafu, Result,
37 SerializeToJsonSnafu, SqlExecutionTimeoutSnafu, UnexpectedSnafu,
38};
39use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};
40
41struct ElectionSqlFactory<'a> {
42 table_name: &'a str,
43}
44
45struct ElectionSqlSet {
46 campaign: String,
47 put_value_with_lease: String,
57 update_value_with_lease: String,
65 get_value_with_lease: String,
70 get_value_with_lease_by_prefix: String,
79 delete_value: String,
87}
88
89impl<'a> ElectionSqlFactory<'a> {
90 fn new(table_name: &'a str) -> Self {
91 Self { table_name }
92 }
93
94 fn build(self) -> ElectionSqlSet {
95 ElectionSqlSet {
96 campaign: self.campaign_sql(),
97 put_value_with_lease: self.put_value_with_lease_sql(),
98 update_value_with_lease: self.update_value_with_lease_sql(),
99 get_value_with_lease: self.get_value_with_lease_sql(),
100 get_value_with_lease_by_prefix: self.get_value_with_lease_by_prefix_sql(),
101 delete_value: self.delete_value_sql(),
102 }
103 }
104
105 fn campaign_sql(&self) -> String {
108 format!("SELECT * FROM `{}` FOR UPDATE;", self.table_name)
109 }
110
111 fn put_value_with_lease_sql(&self) -> String {
112 format!(
113 r#"
114 INSERT INTO `{}` (k, v) VALUES (
115 ?,
116 CONCAT(
117 ?,
118 '{}',
119 DATE_FORMAT(DATE_ADD(NOW(4), INTERVAL ? SECOND), '%Y-%m-%d %T.%f')
120 )
121 )
122 ON DUPLICATE KEY UPDATE v = VALUES(v);
123 "#,
124 self.table_name, LEASE_SEP
125 )
126 }
127
128 fn update_value_with_lease_sql(&self) -> String {
129 format!(
130 r#"UPDATE `{}`
131 SET v = CONCAT(?, '{}', DATE_FORMAT(DATE_ADD(NOW(4), INTERVAL ? SECOND), '%Y-%m-%d %T.%f'))
132 WHERE k = ? AND v = ?"#,
133 self.table_name, LEASE_SEP
134 )
135 }
136
137 fn get_value_with_lease_sql(&self) -> String {
138 format!(
139 r#"SELECT v, DATE_FORMAT(NOW(4), '%Y-%m-%d %T.%f') FROM `{}` WHERE k = ?"#,
140 self.table_name
141 )
142 }
143
144 fn get_value_with_lease_by_prefix_sql(&self) -> String {
145 format!(
146 r#"SELECT v, DATE_FORMAT(NOW(4), '%Y-%m-%d %T.%f') FROM `{}` WHERE k LIKE ?"#,
147 self.table_name
148 )
149 }
150
151 fn delete_value_sql(&self) -> String {
152 format!("DELETE FROM {} WHERE k = ?;", self.table_name)
153 }
154}
155
156enum Executor<'a> {
157 Default(MutexGuard<'a, ElectionMysqlClient>),
158 Txn(TransactionWithExecutionTimeout<'a>),
159}
160
161impl Executor<'_> {
162 async fn query(
163 &mut self,
164 query: Query<'_, MySql, MySqlArguments>,
165 sql: &str,
166 ) -> Result<Vec<MySqlRow>> {
167 match self {
168 Executor::Default(client) => client.query(query, sql).await,
169 Executor::Txn(txn) => txn.query(query, sql).await,
170 }
171 }
172
173 async fn execute(&mut self, query: Query<'_, MySql, MySqlArguments>, sql: &str) -> Result<u64> {
174 match self {
175 Executor::Default(client) => client.execute(query, sql).await,
176 Executor::Txn(txn) => txn.execute(query, sql).await,
177 }
178 }
179
180 async fn commit(self) -> Result<()> {
181 match self {
182 Executor::Txn(txn) => txn.commit().await,
183 _ => Ok(()),
184 }
185 }
186}
187
188pub struct ElectionMysqlClient {
190 current: Option<PoolConnection<MySql>>,
191 pool: MySqlPool,
192
193 execution_timeout: Duration,
198
199 max_execution_time: Duration,
204
205 innode_lock_wait_timeout: Duration,
210
211 wait_timeout: Duration,
217
218 table_for_election: String,
220}
221
222impl ElectionMysqlClient {
223 pub fn new(
224 pool: MySqlPool,
225 execution_timeout: Duration,
226 max_execution_time: Duration,
227 innode_lock_wait_timeout: Duration,
228 wait_timeout: Duration,
229 table_for_election: &str,
230 ) -> Self {
231 Self {
232 current: None,
233 pool,
234 execution_timeout,
235 max_execution_time,
236 innode_lock_wait_timeout,
237 wait_timeout,
238 table_for_election: table_for_election.to_string(),
239 }
240 }
241
242 fn create_table_sql(&self) -> String {
243 format!(
244 r#"
245 CREATE TABLE IF NOT EXISTS `{}` (
246 k VARBINARY(3072) PRIMARY KEY,
247 v BLOB
248 );"#,
249 self.table_for_election
250 )
251 }
252
253 fn insert_once_sql(&self) -> String {
254 format!(
255 "INSERT IGNORE INTO `{}` (k, v) VALUES ('__place_holder_for_lock', '');",
256 self.table_for_election
257 )
258 }
259
260 fn check_version_sql(&self) -> String {
261 "SELECT @@version;".to_string()
262 }
263
264 async fn reset_client(&mut self) -> Result<()> {
265 self.current = None;
266 self.maybe_init_client().await
267 }
268
269 async fn ensure_table_exists(&mut self) -> Result<()> {
270 let create_table_sql = self.create_table_sql();
271 let query = sqlx::query(&create_table_sql);
272 self.execute(query, &create_table_sql).await?;
273 let insert_once_sql = self.insert_once_sql();
275 let query = sqlx::query(&insert_once_sql);
276 self.execute(query, &insert_once_sql).await?;
277 Ok(())
278 }
279
280 async fn maybe_init_client(&mut self) -> Result<()> {
281 if self.current.is_none() {
282 let client = self.pool.acquire().await.context(AcquireMySqlClientSnafu)?;
283
284 self.current = Some(client);
285 let (query, sql) = if !self.wait_timeout.is_zero() {
286 let sql = "SET SESSION wait_timeout = ?, innodb_lock_wait_timeout = ?, max_execution_time = ?;";
287 (
288 sqlx::query(sql)
289 .bind(self.wait_timeout.as_secs())
290 .bind(self.innode_lock_wait_timeout.as_secs())
291 .bind(self.max_execution_time.as_millis() as u64),
292 sql,
293 )
294 } else {
295 let sql = "SET SESSION innodb_lock_wait_timeout = ?, max_execution_time = ?;";
296 (
297 sqlx::query(sql)
298 .bind(self.innode_lock_wait_timeout.as_secs())
299 .bind(self.max_execution_time.as_millis() as u64),
300 sql,
301 )
302 };
303 self.set_session_isolation_level().await?;
304 self.execute(query, sql).await?;
305 self.check_version(&self.check_version_sql()).await?;
306 }
307 Ok(())
308 }
309
310 async fn set_session_isolation_level(&mut self) -> Result<()> {
315 let sql = "SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE";
316 let query = sqlx::query(sql);
317 self.execute(query, sql).await?;
318 Ok(())
319 }
320
321 async fn check_version(&mut self, sql: &str) -> Result<()> {
326 let query = sqlx::query(sql);
328 let client = self.current.as_mut().unwrap();
330 let row = tokio::time::timeout(self.execution_timeout, query.fetch_one(&mut **client))
331 .await
332 .map_err(|_| {
333 SqlExecutionTimeoutSnafu {
334 sql,
335 duration: self.execution_timeout,
336 }
337 .build()
338 })?;
339 match row {
340 Ok(row) => {
341 let version: String = row.try_get(0).context(DecodeSqlValueSnafu {})?;
342 if !version.starts_with("8.0") && !version.starts_with("5.7") {
343 warn!(
344 "Unsupported MySQL version: {}, expected: [5.7, 8.0]",
345 version
346 );
347 }
348 }
349 Err(e) => {
350 warn!(e; "Failed to check MySQL version through sql: {}", sql);
351 }
352 }
353 Ok(())
354 }
355
356 async fn execute(&mut self, query: Query<'_, MySql, MySqlArguments>, sql: &str) -> Result<u64> {
361 let client = self.current.as_mut().unwrap();
363 let future = query.execute(&mut **client);
364 let rows_affected = tokio::time::timeout(self.execution_timeout, future)
365 .await
366 .map_err(|_| {
367 SqlExecutionTimeoutSnafu {
368 sql,
369 duration: self.execution_timeout,
370 }
371 .build()
372 })?
373 .context(MySqlExecutionSnafu { sql })?
374 .rows_affected();
375 Ok(rows_affected)
376 }
377
378 async fn query(
383 &mut self,
384 query: Query<'_, MySql, MySqlArguments>,
385 sql: &str,
386 ) -> Result<Vec<MySqlRow>> {
387 let client = self.current.as_mut().unwrap();
389 let future = query.fetch_all(&mut **client);
390 tokio::time::timeout(self.execution_timeout, future)
391 .await
392 .map_err(|_| {
393 SqlExecutionTimeoutSnafu {
394 sql,
395 duration: self.execution_timeout,
396 }
397 .build()
398 })?
399 .context(MySqlExecutionSnafu { sql })
400 }
401
402 async fn transaction(&mut self) -> Result<TransactionWithExecutionTimeout<'_>> {
403 use sqlx::Acquire;
404 let client = self.current.as_mut().unwrap();
405 let transaction = client
406 .begin()
407 .await
408 .context(MySqlExecutionSnafu { sql: "BEGIN" })?;
409
410 Ok(TransactionWithExecutionTimeout {
411 transaction,
412 execution_timeout: self.execution_timeout,
413 })
414 }
415}
416
417struct TransactionWithExecutionTimeout<'a> {
418 transaction: MySqlTransaction<'a>,
419 execution_timeout: Duration,
420}
421
422impl TransactionWithExecutionTimeout<'_> {
423 async fn query(
424 &mut self,
425 query: Query<'_, MySql, MySqlArguments>,
426 sql: &str,
427 ) -> Result<Vec<MySqlRow>> {
428 let res = tokio::time::timeout(
429 self.execution_timeout,
430 query.fetch_all(&mut *self.transaction),
431 )
432 .await
433 .map_err(|_| {
434 SqlExecutionTimeoutSnafu {
435 sql,
436 duration: self.execution_timeout,
437 }
438 .build()
439 })?
440 .context(MySqlExecutionSnafu { sql })?;
441 Ok(res)
442 }
443
444 async fn execute(&mut self, query: Query<'_, MySql, MySqlArguments>, sql: &str) -> Result<u64> {
445 let res = tokio::time::timeout(
446 self.execution_timeout,
447 query.execute(&mut *self.transaction),
448 )
449 .await
450 .map_err(|_| {
451 SqlExecutionTimeoutSnafu {
452 sql,
453 duration: self.execution_timeout,
454 }
455 .build()
456 })?
457 .context(MySqlExecutionSnafu { sql })?;
458 Ok(res.rows_affected())
459 }
460
461 async fn commit(self) -> Result<()> {
462 tokio::time::timeout(self.execution_timeout, self.transaction.commit())
463 .await
464 .map_err(|_| {
465 SqlExecutionTimeoutSnafu {
466 sql: "COMMIT",
467 duration: self.execution_timeout,
468 }
469 .build()
470 })?
471 .context(MySqlExecutionSnafu { sql: "COMMIT" })?;
472 Ok(())
473 }
474}
475
476pub struct MySqlElection {
478 leader_value: String,
479 client: Mutex<ElectionMysqlClient>,
480 is_leader: AtomicBool,
481 leader_infancy: AtomicBool,
482 leader_watcher: broadcast::Sender<LeaderChangeMessage>,
483 store_key_prefix: String,
484 candidate_lease_ttl: Duration,
485 meta_lease_ttl: Duration,
486 sql_set: ElectionSqlSet,
487}
488
489impl MySqlElection {
490 pub async fn with_mysql_client(
491 leader_value: String,
492 mut client: ElectionMysqlClient,
493 store_key_prefix: String,
494 candidate_lease_ttl: Duration,
495 meta_lease_ttl: Duration,
496 table_name: &str,
497 ) -> Result<ElectionRef> {
498 let sql_factory = ElectionSqlFactory::new(table_name);
499 client.maybe_init_client().await?;
500 client.ensure_table_exists().await?;
501 let tx = listen_leader_change(leader_value.clone());
502 Ok(Arc::new(Self {
503 leader_value,
504 client: Mutex::new(client),
505 is_leader: AtomicBool::new(false),
506 leader_infancy: AtomicBool::new(false),
507 leader_watcher: tx,
508 store_key_prefix,
509 candidate_lease_ttl,
510 meta_lease_ttl,
511 sql_set: sql_factory.build(),
512 }))
513 }
514
515 fn election_key(&self) -> String {
516 format!("{}{}", self.store_key_prefix, ELECTION_KEY)
517 }
518
519 fn candidate_root(&self) -> String {
520 format!("{}{}", self.store_key_prefix, CANDIDATES_ROOT)
521 }
522
523 fn candidate_key(&self) -> String {
524 format!("{}{}", self.candidate_root(), self.leader_value)
525 }
526
527 async fn maybe_init_client(&self) -> Result<()> {
528 let mut client = self.client.lock().await;
529 client.maybe_init_client().await?;
530 Ok(())
531 }
532}
533
534#[async_trait::async_trait]
535impl Election for MySqlElection {
536 type Leader = LeaderValue;
537
538 fn is_leader(&self) -> bool {
539 self.is_leader.load(Ordering::Relaxed)
540 }
541
542 fn in_leader_infancy(&self) -> bool {
543 self.leader_infancy
544 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
545 .is_ok()
546 }
547
548 async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> {
549 self.maybe_init_client().await?;
550 let key = self.candidate_key();
551 let node_info =
552 serde_json::to_string(node_info).with_context(|_| SerializeToJsonSnafu {
553 input: format!("{node_info:?}"),
554 })?;
555
556 {
557 let client = self.client.lock().await;
558 let mut executor = Executor::Default(client);
559 let res = self
560 .put_value_with_lease(
561 &key,
562 &node_info,
563 self.candidate_lease_ttl.as_secs(),
564 &mut executor,
565 )
566 .await?;
567 if !res {
569 warn!("Candidate already registered, update the lease");
570 self.delete_value(&key, &mut executor).await?;
571 self.put_value_with_lease(
572 &key,
573 &node_info,
574 self.candidate_lease_ttl.as_secs(),
575 &mut executor,
576 )
577 .await?;
578 }
579 }
580
581 let mut keep_alive_interval = tokio::time::interval(self.candidate_lease_ttl / 2);
583 loop {
584 let _ = keep_alive_interval.tick().await;
585 let client = self.client.lock().await;
586 let mut executor = Executor::Default(client);
587 let lease = self
588 .get_value_with_lease(&key, &mut executor)
589 .await?
590 .unwrap_or_default();
591
592 ensure!(
593 lease.expire_time > lease.current,
594 UnexpectedSnafu {
595 violated: format!(
596 "Candidate lease expired at {:?} (current time: {:?}), key: {:?}",
597 lease.expire_time,
598 lease.current,
599 String::from_utf8_lossy(&key.into_bytes())
600 ),
601 }
602 );
603
604 self.update_value_with_lease(
605 &key,
606 &lease.origin,
607 &node_info,
608 self.candidate_lease_ttl.as_secs(),
609 &mut executor,
610 )
611 .await?;
612 std::mem::drop(executor);
613 }
614 }
615
616 async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>> {
617 self.maybe_init_client().await?;
618 let key_prefix = self.candidate_root();
619 let client = self.client.lock().await;
620 let mut executor = Executor::Default(client);
621 let (mut candidates, current) = self
622 .get_value_with_lease_by_prefix(&key_prefix, &mut executor)
623 .await?;
624 candidates.retain(|c| c.1 > current);
626 let mut valid_candidates = Vec::with_capacity(candidates.len());
627 for (c, _) in candidates {
628 let node_info: MetasrvNodeInfo =
629 serde_json::from_str(&c).with_context(|_| DeserializeFromJsonSnafu {
630 input: format!("{:?}", c),
631 })?;
632 valid_candidates.push(node_info);
633 }
634 Ok(valid_candidates)
635 }
636
637 async fn campaign(&self) -> Result<()> {
638 self.maybe_init_client().await?;
639 let mut keep_alive_interval = tokio::time::interval(self.meta_lease_ttl / 2);
640 keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
641 loop {
642 self.do_campaign().await?;
643 keep_alive_interval.tick().await;
644 }
645 }
646
647 async fn reset_campaign(&self) {
648 info!("Resetting campaign");
649 if self.is_leader.load(Ordering::Relaxed) {
650 if let Err(err) = self.step_down_without_lock().await {
651 error!(err; "Failed to step down without lock");
652 }
653 info!("Step down without lock successfully, due to reset campaign");
654 }
655 if let Err(err) = self.client.lock().await.reset_client().await {
656 error!(err; "Failed to reset client");
657 }
658 }
659
660 async fn leader(&self) -> Result<Self::Leader> {
661 self.maybe_init_client().await?;
662 if self.is_leader.load(Ordering::Relaxed) {
663 Ok(self.leader_value.as_bytes().into())
664 } else {
665 let key = self.election_key();
666
667 let client = self.client.lock().await;
668 let mut executor = Executor::Default(client);
669 if let Some(lease) = self.get_value_with_lease(&key, &mut executor).await? {
670 ensure!(lease.expire_time > lease.current, NoLeaderSnafu);
671 Ok(lease.leader_value.as_bytes().into())
672 } else {
673 NoLeaderSnafu.fail()
674 }
675 }
676 }
677
678 async fn resign(&self) -> Result<()> {
679 todo!()
680 }
681
682 fn subscribe_leader_change(&self) -> broadcast::Receiver<LeaderChangeMessage> {
683 self.leader_watcher.subscribe()
684 }
685}
686
687impl MySqlElection {
688 async fn get_value_with_lease(
690 &self,
691 key: &str,
692 executor: &mut Executor<'_>,
693 ) -> Result<Option<Lease>> {
694 let key = key.as_bytes();
695 let query = sqlx::query(&self.sql_set.get_value_with_lease).bind(key);
696 let res = executor
697 .query(query, &self.sql_set.get_value_with_lease)
698 .await?;
699
700 if res.is_empty() {
701 return Ok(None);
702 }
703 let current_time_str = String::from_utf8_lossy(res[0].try_get(1).unwrap());
705 let current_time = match Timestamp::from_str(¤t_time_str, None) {
706 Ok(ts) => ts,
707 Err(_) => UnexpectedSnafu {
708 violated: format!("Invalid timestamp: {}", current_time_str),
709 }
710 .fail()?,
711 };
712 let value_and_expire_time = String::from_utf8_lossy(res[0].try_get(0).unwrap_or_default());
714 let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
715
716 Ok(Some(Lease {
717 leader_value: value,
718 expire_time,
719 current: current_time,
720 origin: value_and_expire_time.to_string(),
721 }))
722 }
723
724 async fn get_value_with_lease_by_prefix(
726 &self,
727 key_prefix: &str,
728 executor: &mut Executor<'_>,
729 ) -> Result<(Vec<(String, Timestamp)>, Timestamp)> {
730 let key_prefix = format!("{}%", key_prefix).as_bytes().to_vec();
731 let query = sqlx::query(&self.sql_set.get_value_with_lease_by_prefix).bind(key_prefix);
732 let res = executor
733 .query(query, &self.sql_set.get_value_with_lease_by_prefix)
734 .await?;
735
736 let mut values_with_leases = vec![];
737 let mut current = Timestamp::default();
738 for row in res {
739 let current_time_str = row.try_get(1).unwrap_or_default();
740 current = match Timestamp::from_str(current_time_str, None) {
741 Ok(ts) => ts,
742 Err(_) => UnexpectedSnafu {
743 violated: format!("Invalid timestamp: {}", current_time_str),
744 }
745 .fail()?,
746 };
747
748 let value_and_expire_time = String::from_utf8_lossy(row.try_get(0).unwrap_or_default());
749 let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
750
751 values_with_leases.push((value, expire_time));
752 }
753 Ok((values_with_leases, current))
754 }
755
756 async fn update_value_with_lease(
757 &self,
758 key: &str,
759 prev: &str,
760 updated: &str,
761 lease_ttl: u64,
762 executor: &mut Executor<'_>,
763 ) -> Result<()> {
764 let key = key.as_bytes();
765 let prev = prev.as_bytes();
766 let updated = updated.as_bytes();
767
768 let query = sqlx::query(&self.sql_set.update_value_with_lease)
769 .bind(updated)
770 .bind(lease_ttl as f64)
771 .bind(key)
772 .bind(prev);
773 let res = executor
774 .execute(query, &self.sql_set.update_value_with_lease)
775 .await?;
776
777 ensure!(
778 res == 1,
779 UnexpectedSnafu {
780 violated: format!("Failed to update key: {}", String::from_utf8_lossy(key)),
781 }
782 );
783
784 Ok(())
785 }
786
787 async fn put_value_with_lease(
789 &self,
790 key: &str,
791 value: &str,
792 lease_ttl_secs: u64,
793 executor: &mut Executor<'_>,
794 ) -> Result<bool> {
795 let key = key.as_bytes();
796 let lease_ttl_secs = lease_ttl_secs as f64;
797 let query = sqlx::query(&self.sql_set.put_value_with_lease)
798 .bind(key)
799 .bind(value)
800 .bind(lease_ttl_secs);
801 let res = executor
802 .execute(query, &self.sql_set.put_value_with_lease)
803 .await?;
804 Ok(res == 1)
805 }
806
807 async fn delete_value(&self, key: &str, executor: &mut Executor<'_>) -> Result<bool> {
810 let key = key.as_bytes();
811 let query = sqlx::query(&self.sql_set.delete_value).bind(key);
812 let res = executor.execute(query, &self.sql_set.delete_value).await?;
813
814 Ok(res == 1)
815 }
816
817 async fn do_campaign(&self) -> Result<()> {
820 let lease = {
821 let client = self.client.lock().await;
822 let mut executor = Executor::Default(client);
823 self.get_value_with_lease(&self.election_key(), &mut executor)
824 .await?
825 };
826
827 let is_leader = self.is_leader();
828 let is_current_leader = lease
831 .as_ref()
832 .map(|lease| lease.leader_value == self.leader_value)
833 .unwrap_or(false);
834 match (self.lease_check(&lease), is_leader, is_current_leader) {
835 (Ok(_), true, true) => {
837 let mut client = self.client.lock().await;
838 let txn = client.transaction().await?;
839 let mut executor = Executor::Txn(txn);
840 let query = sqlx::query(&self.sql_set.campaign);
841 executor.query(query, &self.sql_set.campaign).await?;
842 self.renew_lease(executor, lease.unwrap()).await?;
844 }
845 (Err(err), true, _) => {
848 warn!(err; "Leader lease expired, step down...");
849 self.step_down_without_lock().await?;
850 }
851 (Ok(_), true, false) => {
852 warn!("Leader lease expired, step down...");
853 self.step_down_without_lock().await?;
854 }
855 (Err(err), false, _) => {
857 warn!(err; "Leader lease expired, elect myself.");
858 let mut client = self.client.lock().await;
859 let txn = client.transaction().await?;
860 let mut executor = Executor::Txn(txn);
861 let query = sqlx::query(&self.sql_set.campaign);
862 executor.query(query, &self.sql_set.campaign).await?;
863 self.elected(executor, lease).await?;
864 }
865 (Ok(_), false, true) => {
868 warn!("I should be the leader, but I don't think so. Something went wrong.");
869 let mut client = self.client.lock().await;
870 let txn = client.transaction().await?;
871 let mut executor = Executor::Txn(txn);
872 let query = sqlx::query(&self.sql_set.campaign);
873 executor.query(query, &self.sql_set.campaign).await?;
874 self.renew_lease(executor, lease.unwrap()).await?;
876 }
877 (Ok(_), false, false) => {}
879 }
880 Ok(())
881 }
882
883 async fn renew_lease(&self, mut executor: Executor<'_>, lease: Lease) -> Result<()> {
885 let key = self.election_key();
886 self.update_value_with_lease(
887 &key,
888 &lease.origin,
889 &self.leader_value,
890 self.meta_lease_ttl.as_secs(),
891 &mut executor,
892 )
893 .await?;
894 executor.commit().await?;
895
896 if !self.is_leader() {
897 let key = self.election_key();
898 let leader_key = RdsLeaderKey {
899 name: self.leader_value.clone().into_bytes(),
900 key: key.clone().into_bytes(),
901 ..Default::default()
902 };
903 send_leader_change_and_set_flags(
904 &self.is_leader,
905 &self.leader_infancy,
906 &self.leader_watcher,
907 LeaderChangeMessage::Elected(Arc::new(leader_key)),
908 );
909 }
910
911 Ok(())
912 }
913
914 fn lease_check(&self, lease: &Option<Lease>) -> Result<Lease> {
923 let lease = lease.as_ref().context(NoLeaderSnafu)?;
924 ensure!(lease.expire_time > lease.current, LeaderLeaseExpiredSnafu);
926 Ok(lease.clone())
928 }
929
930 async fn step_down_without_lock(&self) -> Result<()> {
932 let key = self.election_key().into_bytes();
933 let leader_key = RdsLeaderKey {
934 name: self.leader_value.clone().into_bytes(),
935 key: key.clone(),
936 ..Default::default()
937 };
938 send_leader_change_and_set_flags(
939 &self.is_leader,
940 &self.leader_infancy,
941 &self.leader_watcher,
942 LeaderChangeMessage::StepDown(Arc::new(leader_key)),
943 );
944 Ok(())
945 }
946
947 async fn elected(
950 &self,
951 mut executor: Executor<'_>,
952 expected_lease: Option<Lease>,
953 ) -> Result<()> {
954 let key = self.election_key();
955 let leader_key = RdsLeaderKey {
956 name: self.leader_value.clone().into_bytes(),
957 key: key.clone().into_bytes(),
958 ..Default::default()
959 };
960 let remote_lease = self.get_value_with_lease(&key, &mut executor).await?;
961 ensure!(
962 expected_lease.map(|lease| lease.origin) == remote_lease.map(|lease| lease.origin),
963 LeaderLeaseChangedSnafu
964 );
965 self.delete_value(&key, &mut executor).await?;
966 self.put_value_with_lease(
967 &key,
968 &self.leader_value,
969 self.meta_lease_ttl.as_secs(),
970 &mut executor,
971 )
972 .await?;
973 executor.commit().await?;
974
975 send_leader_change_and_set_flags(
976 &self.is_leader,
977 &self.leader_infancy,
978 &self.leader_watcher,
979 LeaderChangeMessage::Elected(Arc::new(leader_key)),
980 );
981 Ok(())
982 }
983}
984
985#[cfg(test)]
986mod tests {
987 use std::assert_matches::assert_matches;
988 use std::env;
989
990 use common_meta::maybe_skip_mysql_integration_test;
991 use common_telemetry::init_default_ut_logging;
992
993 use super::*;
994 use crate::error;
995 use crate::utils::mysql::create_mysql_pool;
996
997 async fn create_mysql_client(
998 table_name: Option<&str>,
999 execution_timeout: Duration,
1000 wait_timeout: Duration,
1001 ) -> Result<Mutex<ElectionMysqlClient>> {
1002 init_default_ut_logging();
1003 let endpoint = env::var("GT_MYSQL_ENDPOINTS").unwrap_or_default();
1004 if endpoint.is_empty() {
1005 return UnexpectedSnafu {
1006 violated: "MySQL endpoint is empty".to_string(),
1007 }
1008 .fail();
1009 }
1010 let pool = create_mysql_pool(&[endpoint], None).await.unwrap();
1011 let mut client = ElectionMysqlClient::new(
1012 pool,
1013 execution_timeout,
1014 execution_timeout,
1015 Duration::from_secs(1),
1016 wait_timeout,
1017 table_name.unwrap_or("default_greptime_metakv_election"),
1018 );
1019 client.maybe_init_client().await?;
1020 if table_name.is_some() {
1021 client.ensure_table_exists().await?;
1022 }
1023 Ok(Mutex::new(client))
1024 }
1025
1026 async fn drop_table(client: &Mutex<ElectionMysqlClient>, table_name: &str) {
1027 let mut client = client.lock().await;
1028 let sql = format!("DROP TABLE IF EXISTS {};", table_name);
1029 client.execute(sqlx::query(&sql), &sql).await.unwrap();
1030 }
1031
1032 #[tokio::test]
1033 async fn test_mysql_crud() {
1034 maybe_skip_mysql_integration_test!();
1035 let key = "test_key".to_string();
1036 let value = "test_value".to_string();
1037
1038 let uuid = uuid::Uuid::new_v4().to_string();
1039 let table_name = "test_mysql_crud_greptime_metakv";
1040 let candidate_lease_ttl = Duration::from_secs(10);
1041 let meta_lease_ttl = Duration::from_secs(2);
1042
1043 let execution_timeout = Duration::from_secs(10);
1044 let idle_session_timeout = Duration::from_secs(0);
1045 let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1046 .await
1047 .unwrap();
1048
1049 {
1050 let mut a = client.lock().await;
1051 let txn = a.transaction().await.unwrap();
1052 let mut executor = Executor::Txn(txn);
1053 let raw_query = format!("SELECT * FROM {} FOR UPDATE;", table_name);
1054 let query = sqlx::query(&raw_query);
1055 let _ = executor.query(query, &raw_query).await.unwrap();
1056 }
1057
1058 let (tx, _) = broadcast::channel(100);
1059 let mysql_election = MySqlElection {
1060 leader_value: "test_leader".to_string(),
1061 client,
1062 is_leader: AtomicBool::new(false),
1063 leader_infancy: AtomicBool::new(true),
1064 leader_watcher: tx,
1065 store_key_prefix: uuid,
1066 candidate_lease_ttl,
1067 meta_lease_ttl,
1068 sql_set: ElectionSqlFactory::new(table_name).build(),
1069 };
1070 let client = mysql_election.client.lock().await;
1071 let mut executor = Executor::Default(client);
1072 let res = mysql_election
1073 .put_value_with_lease(&key, &value, 10, &mut executor)
1074 .await
1075 .unwrap();
1076 assert!(res);
1077
1078 let lease = mysql_election
1079 .get_value_with_lease(&key, &mut executor)
1080 .await
1081 .unwrap()
1082 .unwrap();
1083 assert_eq!(lease.leader_value, value);
1084
1085 mysql_election
1086 .update_value_with_lease(&key, &lease.origin, &value, 10, &mut executor)
1087 .await
1088 .unwrap();
1089
1090 let res = mysql_election
1091 .delete_value(&key, &mut executor)
1092 .await
1093 .unwrap();
1094 assert!(res);
1095
1096 let res = mysql_election
1097 .get_value_with_lease(&key, &mut executor)
1098 .await
1099 .unwrap();
1100 assert!(res.is_none());
1101
1102 for i in 0..10 {
1103 let key = format!("test_key_{}", i);
1104 let value = format!("test_value_{}", i);
1105 mysql_election
1106 .put_value_with_lease(&key, &value, 10, &mut executor)
1107 .await
1108 .unwrap();
1109 }
1110
1111 let key_prefix = "test_key".to_string();
1112 let (res, _) = mysql_election
1113 .get_value_with_lease_by_prefix(&key_prefix, &mut executor)
1114 .await
1115 .unwrap();
1116 assert_eq!(res.len(), 10);
1117
1118 for i in 0..10 {
1119 let key = format!("test_key_{}", i);
1120 let res = mysql_election
1121 .delete_value(&key, &mut executor)
1122 .await
1123 .unwrap();
1124 assert!(res);
1125 }
1126
1127 let (res, current) = mysql_election
1128 .get_value_with_lease_by_prefix(&key_prefix, &mut executor)
1129 .await
1130 .unwrap();
1131 assert!(res.is_empty());
1132 assert!(current == Timestamp::default());
1133
1134 std::mem::drop(executor);
1136 drop_table(&mysql_election.client, table_name).await;
1137 }
1138
1139 async fn candidate(
1140 leader_value: String,
1141 candidate_lease_ttl: Duration,
1142 store_key_prefix: String,
1143 table_name: String,
1144 ) {
1145 let meta_lease_ttl = Duration::from_secs(2);
1146 let execution_timeout = Duration::from_secs(10);
1147 let idle_session_timeout = Duration::from_secs(0);
1148 let client =
1149 create_mysql_client(Some(&table_name), execution_timeout, idle_session_timeout)
1150 .await
1151 .unwrap();
1152
1153 let (tx, _) = broadcast::channel(100);
1154 let mysql_election = MySqlElection {
1155 leader_value,
1156 client,
1157 is_leader: AtomicBool::new(false),
1158 leader_infancy: AtomicBool::new(true),
1159 leader_watcher: tx,
1160 store_key_prefix,
1161 candidate_lease_ttl,
1162 meta_lease_ttl,
1163 sql_set: ElectionSqlFactory::new(&table_name).build(),
1164 };
1165
1166 let node_info = MetasrvNodeInfo {
1167 addr: "test_addr".to_string(),
1168 version: "test_version".to_string(),
1169 git_commit: "test_git_commit".to_string(),
1170 start_time_ms: 0,
1171 total_cpu_millicores: 0,
1172 total_memory_bytes: 0,
1173 cpu_usage_millicores: 0,
1174 memory_usage_bytes: 0,
1175 hostname: "test_hostname".to_string(),
1176 };
1177 mysql_election.register_candidate(&node_info).await.unwrap();
1178 }
1179
1180 #[tokio::test]
1181 async fn test_candidate_registration() {
1182 maybe_skip_mysql_integration_test!();
1183 let leader_value_prefix = "test_leader".to_string();
1184 let candidate_lease_ttl = Duration::from_secs(2);
1185 let execution_timeout = Duration::from_secs(10);
1186 let meta_lease_ttl = Duration::from_secs(2);
1187 let idle_session_timeout = Duration::from_secs(0);
1188 let uuid = uuid::Uuid::new_v4().to_string();
1189 let table_name = "test_candidate_registration_greptime_metakv";
1190 let mut handles = vec![];
1191 let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1192 .await
1193 .unwrap();
1194
1195 for i in 0..10 {
1196 let leader_value = format!("{}{}", leader_value_prefix, i);
1197 let handle = tokio::spawn(candidate(
1198 leader_value,
1199 candidate_lease_ttl,
1200 uuid.clone(),
1201 table_name.to_string(),
1202 ));
1203 handles.push(handle);
1204 }
1205 tokio::time::sleep(candidate_lease_ttl / 2 + Duration::from_secs(1)).await;
1207
1208 let (tx, _) = broadcast::channel(100);
1209 let leader_value = "test_leader".to_string();
1210 let mysql_election = MySqlElection {
1211 leader_value,
1212 client,
1213 is_leader: AtomicBool::new(false),
1214 leader_infancy: AtomicBool::new(true),
1215 leader_watcher: tx,
1216 store_key_prefix: uuid.clone(),
1217 candidate_lease_ttl,
1218 meta_lease_ttl,
1219 sql_set: ElectionSqlFactory::new(table_name).build(),
1220 };
1221
1222 let candidates = mysql_election.all_candidates().await.unwrap();
1223 assert_eq!(candidates.len(), 10);
1224
1225 for handle in handles {
1226 handle.abort();
1227 }
1228
1229 tokio::time::sleep(candidate_lease_ttl + Duration::from_secs(1)).await;
1231 let candidates = mysql_election.all_candidates().await.unwrap();
1232 assert!(candidates.is_empty());
1233
1234 let client = mysql_election.client.lock().await;
1236 let mut executor = Executor::Default(client);
1237 for i in 0..10 {
1238 let key = format!("{}{}{}{}", uuid, CANDIDATES_ROOT, leader_value_prefix, i);
1239 let res = mysql_election
1240 .delete_value(&key, &mut executor)
1241 .await
1242 .unwrap();
1243 assert!(res);
1244 }
1245
1246 std::mem::drop(executor);
1248 drop_table(&mysql_election.client, table_name).await;
1249 }
1250
1251 async fn elected(
1252 election: &MySqlElection,
1253 table_name: &str,
1254 expected_lease: Option<Lease>,
1255 ) -> Result<()> {
1256 let mut client = election.client.lock().await;
1257 let txn = client.transaction().await.unwrap();
1258 let mut executor = Executor::Txn(txn);
1259 let raw_query = format!("SELECT * FROM {} FOR UPDATE;", table_name);
1260 let query = sqlx::query(&raw_query);
1261 let _ = executor.query(query, &raw_query).await.unwrap();
1262 election.elected(executor, expected_lease).await
1263 }
1264
1265 async fn get_lease(election: &MySqlElection) -> Option<Lease> {
1266 let client = election.client.lock().await;
1267 let mut executor = Executor::Default(client);
1268 election
1269 .get_value_with_lease(&election.election_key(), &mut executor)
1270 .await
1271 .unwrap()
1272 }
1273
1274 #[tokio::test]
1275 async fn test_elected_with_incorrect_lease_fails() {
1276 maybe_skip_mysql_integration_test!();
1277 let leader_value = "test_leader".to_string();
1278 let candidate_lease_ttl = Duration::from_secs(5);
1279 let meta_lease_ttl = Duration::from_secs(2);
1280 let execution_timeout = Duration::from_secs(10);
1281 let idle_session_timeout = Duration::from_secs(0);
1282 let uuid = uuid::Uuid::new_v4().to_string();
1283 let table_name = "test_elected_failed_greptime_metakv";
1284 let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1285 .await
1286 .unwrap();
1287
1288 let (tx, _) = broadcast::channel(100);
1289 let leader_mysql_election = MySqlElection {
1290 leader_value: leader_value.clone(),
1291 client,
1292 is_leader: AtomicBool::new(false),
1293 leader_infancy: AtomicBool::new(true),
1294 leader_watcher: tx,
1295 store_key_prefix: uuid,
1296 candidate_lease_ttl,
1297 meta_lease_ttl,
1298 sql_set: ElectionSqlFactory::new(table_name).build(),
1299 };
1300
1301 let incorrect_lease = Lease::default();
1302 let err = elected(&leader_mysql_election, table_name, Some(incorrect_lease))
1303 .await
1304 .unwrap_err();
1305 assert_matches!(err, error::Error::LeaderLeaseChanged { .. });
1306 let lease = get_lease(&leader_mysql_election).await;
1307 assert!(lease.is_none());
1308 drop_table(&leader_mysql_election.client, table_name).await;
1309 }
1310
1311 #[tokio::test]
1312 async fn test_reelection_with_idle_session_timeout() {
1313 maybe_skip_mysql_integration_test!();
1314 let leader_value = "test_leader".to_string();
1315 let uuid = uuid::Uuid::new_v4().to_string();
1316 let table_name = "test_reelection_greptime_metakv";
1317 let candidate_lease_ttl = Duration::from_secs(5);
1318 let meta_lease_ttl = Duration::from_secs(5);
1319 let execution_timeout = Duration::from_secs(10);
1320 let idle_session_timeout = Duration::from_secs(2);
1321 let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1322 .await
1323 .unwrap();
1324
1325 let (tx, _) = broadcast::channel(100);
1326 let leader_mysql_election = MySqlElection {
1327 leader_value: leader_value.clone(),
1328 client,
1329 is_leader: AtomicBool::new(false),
1330 leader_infancy: AtomicBool::new(true),
1331 leader_watcher: tx,
1332 store_key_prefix: uuid,
1333 candidate_lease_ttl,
1334 meta_lease_ttl,
1335 sql_set: ElectionSqlFactory::new(table_name).build(),
1336 };
1337
1338 elected(&leader_mysql_election, table_name, None)
1339 .await
1340 .unwrap();
1341 let lease = get_lease(&leader_mysql_election).await.unwrap();
1342 assert_eq!(lease.leader_value, leader_value);
1343 assert!(lease.expire_time > lease.current);
1344 assert!(leader_mysql_election.is_leader());
1345 tokio::time::sleep(Duration::from_millis(2100)).await;
1347 leader_mysql_election
1349 .client
1350 .lock()
1351 .await
1352 .query(sqlx::query("SELECT 1"), "SELECT 1")
1353 .await
1354 .unwrap_err();
1355 leader_mysql_election
1357 .client
1358 .lock()
1359 .await
1360 .reset_client()
1361 .await
1362 .unwrap();
1363
1364 elected(&leader_mysql_election, table_name, Some(lease.clone()))
1366 .await
1367 .unwrap();
1368 let lease = get_lease(&leader_mysql_election).await.unwrap();
1369 assert_eq!(lease.leader_value, leader_value);
1370 assert!(lease.expire_time > lease.current);
1371 assert!(leader_mysql_election.is_leader());
1372 drop_table(&leader_mysql_election.client, table_name).await;
1373 }
1374
1375 #[tokio::test]
1376 async fn test_elected_and_step_down() {
1377 maybe_skip_mysql_integration_test!();
1378 let leader_value = "test_leader".to_string();
1379 let candidate_lease_ttl = Duration::from_secs(5);
1380 let meta_lease_ttl = Duration::from_secs(2);
1381 let execution_timeout = Duration::from_secs(10);
1382 let idle_session_timeout = Duration::from_secs(0);
1383 let uuid = uuid::Uuid::new_v4().to_string();
1384 let table_name = "test_elected_and_step_down_greptime_metakv";
1385 let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1386 .await
1387 .unwrap();
1388
1389 let (tx, mut rx) = broadcast::channel(100);
1390 let leader_mysql_election = MySqlElection {
1391 leader_value: leader_value.clone(),
1392 client,
1393 is_leader: AtomicBool::new(false),
1394 leader_infancy: AtomicBool::new(true),
1395 leader_watcher: tx,
1396 store_key_prefix: uuid,
1397 candidate_lease_ttl,
1398 meta_lease_ttl,
1399 sql_set: ElectionSqlFactory::new(table_name).build(),
1400 };
1401
1402 elected(&leader_mysql_election, table_name, None)
1403 .await
1404 .unwrap();
1405 let lease = get_lease(&leader_mysql_election).await.unwrap();
1406 assert_eq!(lease.leader_value, leader_value);
1407 assert!(lease.expire_time > lease.current);
1408 assert!(leader_mysql_election.is_leader());
1409
1410 match rx.recv().await {
1411 Ok(LeaderChangeMessage::Elected(key)) => {
1412 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1413 assert_eq!(
1414 String::from_utf8_lossy(key.key()),
1415 leader_mysql_election.election_key()
1416 );
1417 assert_eq!(key.lease_id(), i64::default());
1418 assert_eq!(key.revision(), i64::default());
1419 }
1420 _ => panic!("Expected LeaderChangeMessage::Elected"),
1421 }
1422
1423 leader_mysql_election
1424 .step_down_without_lock()
1425 .await
1426 .unwrap();
1427 let lease = get_lease(&leader_mysql_election).await.unwrap();
1428 assert_eq!(lease.leader_value, leader_value);
1429 assert!(!leader_mysql_election.is_leader());
1430
1431 match rx.recv().await {
1432 Ok(LeaderChangeMessage::StepDown(key)) => {
1433 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1434 assert_eq!(
1435 String::from_utf8_lossy(key.key()),
1436 leader_mysql_election.election_key()
1437 );
1438 assert_eq!(key.lease_id(), i64::default());
1439 assert_eq!(key.revision(), i64::default());
1440 }
1441 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1442 }
1443
1444 elected(&leader_mysql_election, table_name, Some(lease.clone()))
1445 .await
1446 .unwrap();
1447 let lease = get_lease(&leader_mysql_election).await.unwrap();
1448 assert_eq!(lease.leader_value, leader_value);
1449 assert!(lease.expire_time > lease.current);
1450 assert!(leader_mysql_election.is_leader());
1451
1452 match rx.recv().await {
1453 Ok(LeaderChangeMessage::Elected(key)) => {
1454 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1455 assert_eq!(
1456 String::from_utf8_lossy(key.key()),
1457 leader_mysql_election.election_key()
1458 );
1459 assert_eq!(key.lease_id(), i64::default());
1460 assert_eq!(key.revision(), i64::default());
1461 }
1462 _ => panic!("Expected LeaderChangeMessage::Elected"),
1463 }
1464
1465 drop_table(&leader_mysql_election.client, table_name).await;
1466 }
1467
1468 #[tokio::test]
1469 async fn test_campaign() {
1470 maybe_skip_mysql_integration_test!();
1471 let leader_value = "test_leader".to_string();
1472 let uuid = uuid::Uuid::new_v4().to_string();
1473 let table_name = "test_leader_action_greptime_metakv";
1474 let candidate_lease_ttl = Duration::from_secs(5);
1475 let meta_lease_ttl = Duration::from_secs(2);
1476 let execution_timeout = Duration::from_secs(10);
1477 let idle_session_timeout = Duration::from_secs(0);
1478 let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1479 .await
1480 .unwrap();
1481
1482 let (tx, mut rx) = broadcast::channel(100);
1483 let leader_mysql_election = MySqlElection {
1484 leader_value: leader_value.clone(),
1485 client,
1486 is_leader: AtomicBool::new(false),
1487 leader_infancy: AtomicBool::new(true),
1488 leader_watcher: tx,
1489 store_key_prefix: uuid,
1490 candidate_lease_ttl,
1491 meta_lease_ttl,
1492 sql_set: ElectionSqlFactory::new(table_name).build(),
1493 };
1494
1495 leader_mysql_election.do_campaign().await.unwrap();
1497 let lease = get_lease(&leader_mysql_election).await.unwrap();
1498 assert_eq!(lease.leader_value, leader_value);
1499 assert!(lease.expire_time > lease.current);
1500 assert!(leader_mysql_election.is_leader());
1501
1502 match rx.recv().await {
1503 Ok(LeaderChangeMessage::Elected(key)) => {
1504 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1505 assert_eq!(
1506 String::from_utf8_lossy(key.key()),
1507 leader_mysql_election.election_key()
1508 );
1509 assert_eq!(key.lease_id(), i64::default());
1510 assert_eq!(key.revision(), i64::default());
1511 }
1512 _ => panic!("Expected LeaderChangeMessage::Elected"),
1513 }
1514
1515 leader_mysql_election.do_campaign().await.unwrap();
1517 let new_lease = get_lease(&leader_mysql_election).await.unwrap();
1518 assert_eq!(lease.leader_value, leader_value);
1519 assert!(new_lease.expire_time > lease.expire_time);
1521 assert!(new_lease.expire_time > new_lease.current);
1522 assert!(leader_mysql_election.is_leader());
1523
1524 tokio::time::sleep(meta_lease_ttl + Duration::from_secs(1)).await;
1526 leader_mysql_election.do_campaign().await.unwrap();
1527 let lease = get_lease(&leader_mysql_election).await.unwrap();
1528 assert_eq!(lease.leader_value, leader_value);
1529 assert!(lease.expire_time <= lease.current);
1530 assert!(!leader_mysql_election.is_leader());
1531
1532 match rx.recv().await {
1533 Ok(LeaderChangeMessage::StepDown(key)) => {
1534 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1535 assert_eq!(
1536 String::from_utf8_lossy(key.key()),
1537 leader_mysql_election.election_key()
1538 );
1539 assert_eq!(key.lease_id(), i64::default());
1540 assert_eq!(key.revision(), i64::default());
1541 }
1542 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1543 }
1544
1545 leader_mysql_election.do_campaign().await.unwrap();
1547 let lease = get_lease(&leader_mysql_election).await.unwrap();
1548 assert_eq!(lease.leader_value, leader_value);
1549 assert!(lease.expire_time > lease.current);
1550 assert!(leader_mysql_election.is_leader());
1551
1552 match rx.recv().await {
1553 Ok(LeaderChangeMessage::Elected(key)) => {
1554 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1555 assert_eq!(
1556 String::from_utf8_lossy(key.key()),
1557 leader_mysql_election.election_key()
1558 );
1559 assert_eq!(key.lease_id(), i64::default());
1560 assert_eq!(key.revision(), i64::default());
1561 }
1562 _ => panic!("Expected LeaderChangeMessage::Elected"),
1563 }
1564
1565 {
1567 let client = leader_mysql_election.client.lock().await;
1568 let mut executor = Executor::Default(client);
1569 leader_mysql_election
1570 .delete_value(&leader_mysql_election.election_key(), &mut executor)
1571 .await
1572 .unwrap();
1573 }
1574 leader_mysql_election.do_campaign().await.unwrap();
1575 let res = get_lease(&leader_mysql_election).await;
1576 assert!(res.is_none());
1577 assert!(!leader_mysql_election.is_leader());
1578
1579 match rx.recv().await {
1580 Ok(LeaderChangeMessage::StepDown(key)) => {
1581 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1582 assert_eq!(
1583 String::from_utf8_lossy(key.key()),
1584 leader_mysql_election.election_key()
1585 );
1586 assert_eq!(key.lease_id(), i64::default());
1587 assert_eq!(key.revision(), i64::default());
1588 }
1589 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1590 }
1591
1592 leader_mysql_election.do_campaign().await.unwrap();
1594 let lease = get_lease(&leader_mysql_election).await.unwrap();
1595 assert_eq!(lease.leader_value, leader_value);
1596 assert!(lease.expire_time > lease.current);
1597 assert!(leader_mysql_election.is_leader());
1598
1599 match rx.recv().await {
1600 Ok(LeaderChangeMessage::Elected(key)) => {
1601 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1602 assert_eq!(
1603 String::from_utf8_lossy(key.key()),
1604 leader_mysql_election.election_key()
1605 );
1606 assert_eq!(key.lease_id(), i64::default());
1607 assert_eq!(key.revision(), i64::default());
1608 }
1609 _ => panic!("Expected LeaderChangeMessage::Elected"),
1610 }
1611
1612 let another_leader_key = "another_leader";
1614 {
1615 let client = leader_mysql_election.client.lock().await;
1616 let mut executor = Executor::Default(client);
1617 leader_mysql_election
1618 .delete_value(&leader_mysql_election.election_key(), &mut executor)
1619 .await
1620 .unwrap();
1621 leader_mysql_election
1622 .put_value_with_lease(
1623 &leader_mysql_election.election_key(),
1624 another_leader_key,
1625 10,
1626 &mut executor,
1627 )
1628 .await
1629 .unwrap();
1630 }
1631 leader_mysql_election.do_campaign().await.unwrap();
1632 let lease = get_lease(&leader_mysql_election).await.unwrap();
1633 assert_eq!(lease.leader_value, another_leader_key);
1635 assert!(lease.expire_time > lease.current);
1636 assert!(!leader_mysql_election.is_leader());
1637
1638 match rx.recv().await {
1639 Ok(LeaderChangeMessage::StepDown(key)) => {
1640 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1641 assert_eq!(
1642 String::from_utf8_lossy(key.key()),
1643 leader_mysql_election.election_key()
1644 );
1645 assert_eq!(key.lease_id(), i64::default());
1646 assert_eq!(key.revision(), i64::default());
1647 }
1648 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1649 }
1650
1651 drop_table(&leader_mysql_election.client, table_name).await;
1652 }
1653
1654 #[tokio::test]
1655 async fn test_follower_action() {
1656 maybe_skip_mysql_integration_test!();
1657 common_telemetry::init_default_ut_logging();
1658 let candidate_lease_ttl = Duration::from_secs(5);
1659 let meta_lease_ttl = Duration::from_secs(1);
1660 let execution_timeout = Duration::from_secs(10);
1661 let idle_session_timeout = Duration::from_secs(0);
1662 let uuid = uuid::Uuid::new_v4().to_string();
1663 let table_name = "test_follower_action_greptime_metakv";
1664
1665 let follower_client =
1666 create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1667 .await
1668 .unwrap();
1669 let (tx, mut rx) = broadcast::channel(100);
1670 let follower_mysql_election = MySqlElection {
1671 leader_value: "test_follower".to_string(),
1672 client: follower_client,
1673 is_leader: AtomicBool::new(false),
1674 leader_infancy: AtomicBool::new(true),
1675 leader_watcher: tx,
1676 store_key_prefix: uuid.clone(),
1677 candidate_lease_ttl,
1678 meta_lease_ttl,
1679 sql_set: ElectionSqlFactory::new(table_name).build(),
1680 };
1681
1682 let leader_client =
1683 create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)
1684 .await
1685 .unwrap();
1686 let (tx, _) = broadcast::channel(100);
1687 let leader_mysql_election = MySqlElection {
1688 leader_value: "test_leader".to_string(),
1689 client: leader_client,
1690 is_leader: AtomicBool::new(false),
1691 leader_infancy: AtomicBool::new(true),
1692 leader_watcher: tx,
1693 store_key_prefix: uuid,
1694 candidate_lease_ttl,
1695 meta_lease_ttl,
1696 sql_set: ElectionSqlFactory::new(table_name).build(),
1697 };
1698
1699 leader_mysql_election.do_campaign().await.unwrap();
1700
1701 follower_mysql_election.do_campaign().await.unwrap();
1703
1704 tokio::time::sleep(meta_lease_ttl + Duration::from_secs(1)).await;
1706 follower_mysql_election.do_campaign().await.unwrap();
1707 assert!(follower_mysql_election.is_leader());
1708
1709 match rx.recv().await {
1710 Ok(LeaderChangeMessage::Elected(key)) => {
1711 assert_eq!(String::from_utf8_lossy(key.name()), "test_follower");
1712 assert_eq!(
1713 String::from_utf8_lossy(key.key()),
1714 follower_mysql_election.election_key()
1715 );
1716 assert_eq!(key.lease_id(), i64::default());
1717 assert_eq!(key.revision(), i64::default());
1718 }
1719 _ => panic!("Expected LeaderChangeMessage::Elected"),
1720 }
1721
1722 drop_table(&follower_mysql_election.client, table_name).await;
1723 }
1724
1725 #[tokio::test]
1726 async fn test_wait_timeout() {
1727 maybe_skip_mysql_integration_test!();
1728 common_telemetry::init_default_ut_logging();
1729 let execution_timeout = Duration::from_secs(10);
1730 let idle_session_timeout = Duration::from_secs(1);
1731
1732 let client = create_mysql_client(None, execution_timeout, idle_session_timeout)
1733 .await
1734 .unwrap();
1735 tokio::time::sleep(Duration::from_millis(1100)).await;
1736 let err = client
1738 .lock()
1739 .await
1740 .query(sqlx::query("SELECT 1"), "SELECT 1")
1741 .await
1742 .unwrap_err();
1743 assert_matches!(err, error::Error::MySqlExecution { .. });
1744 client.lock().await.reset_client().await.unwrap();
1746 let _ = client
1747 .lock()
1748 .await
1749 .query(sqlx::query("SELECT 1"), "SELECT 1")
1750 .await
1751 .unwrap();
1752 }
1753}