1pub mod etcd;
16#[cfg(feature = "mysql_kvbackend")]
17pub mod mysql;
18#[cfg(feature = "pg_kvbackend")]
19pub mod postgres;
20
21use std::fmt::{self, Debug};
22use std::sync::Arc;
23
24use common_telemetry::{info, warn};
25use tokio::sync::broadcast::error::RecvError;
26use tokio::sync::broadcast::{self, Receiver, Sender};
27
28use crate::error::Result;
29use crate::metasrv::MetasrvNodeInfo;
30
31pub const ELECTION_KEY: &str = "__metasrv_election";
32pub const CANDIDATES_ROOT: &str = "__metasrv_election_candidates/";
33
34pub(crate) const CANDIDATE_LEASE_SECS: u64 = 600;
35const KEEP_ALIVE_INTERVAL_SECS: u64 = CANDIDATE_LEASE_SECS / 2;
36
37#[derive(Debug, Clone)]
39pub enum LeaderChangeMessage {
40 Elected(Arc<dyn LeaderKey>),
41 StepDown(Arc<dyn LeaderKey>),
42}
43
44pub trait LeaderKey: Send + Sync + Debug {
47 fn name(&self) -> &[u8];
49
50 fn key(&self) -> &[u8];
53
54 fn revision(&self) -> i64;
56
57 fn lease_id(&self) -> i64;
59}
60
61impl fmt::Display for LeaderChangeMessage {
62 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63 let leader_key = match self {
64 LeaderChangeMessage::Elected(leader_key) => {
65 write!(f, "Elected(")?;
66 leader_key
67 }
68 LeaderChangeMessage::StepDown(leader_key) => {
69 write!(f, "StepDown(")?;
70 leader_key
71 }
72 };
73 write!(f, "LeaderKey {{ ")?;
74 write!(f, "name: {}", String::from_utf8_lossy(leader_key.name()))?;
75 write!(f, ", key: {}", String::from_utf8_lossy(leader_key.key()))?;
76 write!(f, ", rev: {}", leader_key.revision())?;
77 write!(f, ", lease: {}", leader_key.lease_id())?;
78 write!(f, " }})")
79 }
80}
81
82fn listen_leader_change(leader_value: String) -> Sender<LeaderChangeMessage> {
83 let (tx, mut rx) = broadcast::channel(100);
84 let _handle = common_runtime::spawn_global(async move {
85 loop {
86 match rx.recv().await {
87 Ok(msg) => match msg {
88 LeaderChangeMessage::Elected(key) => {
89 info!(
90 "[{leader_value}] is elected as leader: {:?}, lease: {}",
91 String::from_utf8_lossy(key.name()),
92 key.lease_id()
93 );
94 }
95 LeaderChangeMessage::StepDown(key) => {
96 warn!(
97 "[{leader_value}] is stepping down: {:?}, lease: {}",
98 String::from_utf8_lossy(key.name()),
99 key.lease_id()
100 );
101 }
102 },
103 Err(RecvError::Lagged(_)) => {
104 warn!("Log printing is too slow or leader changed too fast!");
105 }
106 Err(RecvError::Closed) => break,
107 }
108 }
109 });
110 tx
111}
112
113#[async_trait::async_trait]
114pub trait Election: Send + Sync {
115 type Leader;
116
117 fn is_leader(&self) -> bool;
119
120 fn in_leader_infancy(&self) -> bool;
126
127 async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()>;
129
130 async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>>;
132
133 async fn campaign(&self) -> Result<()>;
138
139 async fn leader(&self) -> Result<Self::Leader>;
141
142 async fn resign(&self) -> Result<()>;
145
146 fn subscribe_leader_change(&self) -> Receiver<LeaderChangeMessage>;
147}