1pub mod etcd;
16#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
17pub mod rds;
18
19use std::fmt::{self, Debug};
20use std::sync::atomic::{AtomicBool, Ordering};
21use std::sync::Arc;
22
23use common_telemetry::{error, info, warn};
24use tokio::sync::broadcast::error::RecvError;
25use tokio::sync::broadcast::{self, Receiver, Sender};
26
27use crate::error::Result;
28use crate::metasrv::MetasrvNodeInfo;
29
30pub(crate) const CANDIDATE_LEASE_SECS: u64 = 600;
31const KEEP_ALIVE_INTERVAL_SECS: u64 = CANDIDATE_LEASE_SECS / 2;
32
33#[derive(Debug, Clone)]
35pub enum LeaderChangeMessage {
36 Elected(Arc<dyn LeaderKey>),
37 StepDown(Arc<dyn LeaderKey>),
38}
39
40pub trait LeaderKey: Send + Sync + Debug {
43 fn name(&self) -> &[u8];
45
46 fn key(&self) -> &[u8];
49
50 fn revision(&self) -> i64;
52
53 fn lease_id(&self) -> i64;
55}
56
57impl fmt::Display for LeaderChangeMessage {
58 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59 let leader_key = match self {
60 LeaderChangeMessage::Elected(leader_key) => {
61 write!(f, "Elected(")?;
62 leader_key
63 }
64 LeaderChangeMessage::StepDown(leader_key) => {
65 write!(f, "StepDown(")?;
66 leader_key
67 }
68 };
69 write!(f, "LeaderKey {{ ")?;
70 write!(f, "name: {}", String::from_utf8_lossy(leader_key.name()))?;
71 write!(f, ", key: {}", String::from_utf8_lossy(leader_key.key()))?;
72 write!(f, ", rev: {}", leader_key.revision())?;
73 write!(f, ", lease: {}", leader_key.lease_id())?;
74 write!(f, " }})")
75 }
76}
77
78fn listen_leader_change(leader_value: String) -> Sender<LeaderChangeMessage> {
79 let (tx, mut rx) = broadcast::channel(100);
80 let _handle = common_runtime::spawn_global(async move {
81 loop {
82 match rx.recv().await {
83 Ok(msg) => match msg {
84 LeaderChangeMessage::Elected(key) => {
85 info!(
86 "[{leader_value}] is elected as leader: {:?}, lease: {}",
87 String::from_utf8_lossy(key.name()),
88 key.lease_id()
89 );
90 }
91 LeaderChangeMessage::StepDown(key) => {
92 warn!(
93 "[{leader_value}] is stepping down: {:?}, lease: {}",
94 String::from_utf8_lossy(key.name()),
95 key.lease_id()
96 );
97 }
98 },
99 Err(RecvError::Lagged(_)) => {
100 warn!("Log printing is too slow or leader changed too fast!");
101 }
102 Err(RecvError::Closed) => break,
103 }
104 }
105 });
106 tx
107}
108
109fn send_leader_change_and_set_flags(
112 is_leader: &AtomicBool,
113 leader_infancy: &AtomicBool,
114 tx: &Sender<LeaderChangeMessage>,
115 msg: LeaderChangeMessage,
116) {
117 let is_elected = matches!(msg, LeaderChangeMessage::Elected(_));
118 if is_leader
119 .compare_exchange(!is_elected, is_elected, Ordering::AcqRel, Ordering::Acquire)
120 .is_ok()
121 {
122 if is_elected {
123 leader_infancy.store(true, Ordering::Release);
124 }
125 if let Err(e) = tx.send(msg) {
126 error!(e; "Failed to send leader change message");
127 }
128 }
129}
130
131#[async_trait::async_trait]
132pub trait Election: Send + Sync {
133 type Leader;
134
135 fn is_leader(&self) -> bool;
137
138 fn in_leader_infancy(&self) -> bool;
144
145 async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()>;
147
148 async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>>;
150
151 async fn campaign(&self) -> Result<()>;
156
157 async fn reset_campaign(&self) {}
161
162 async fn leader(&self) -> Result<Self::Leader>;
164
165 async fn resign(&self) -> Result<()>;
168
169 fn subscribe_leader_change(&self) -> Receiver<LeaderChangeMessage>;
170}