1pub mod etcd;
16#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
17pub mod rds;
18
19use std::fmt::{self, Debug};
20use std::sync::Arc;
21use std::sync::atomic::{AtomicBool, Ordering};
22
23use common_telemetry::{error, info, warn};
24use serde::{Deserialize, Serialize};
25use tokio::sync::broadcast::error::RecvError;
26use tokio::sync::broadcast::{self, Receiver, Sender};
27
28use crate::error::Result;
29
30pub const CANDIDATE_LEASE_SECS: u64 = 600;
31const KEEP_ALIVE_INTERVAL_SECS: u64 = CANDIDATE_LEASE_SECS / 2;
32
33pub struct LeaderValue(pub String);
35
36impl<T: AsRef<[u8]>> From<T> for LeaderValue {
37 fn from(value: T) -> Self {
38 let string = String::from_utf8_lossy(value.as_ref());
39 Self(string.to_string())
40 }
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct MetasrvNodeInfo {
45 pub addr: String,
47 pub version: String,
49 pub git_commit: String,
51 pub start_time_ms: u64,
53 #[serde(default)]
55 pub total_cpu_millicores: i64,
56 #[serde(default)]
58 pub total_memory_bytes: i64,
59 #[serde(default)]
61 pub cpu_usage_millicores: i64,
62 #[serde(default)]
64 pub memory_usage_bytes: i64,
65 #[serde(default)]
67 pub hostname: String,
68}
69
70#[allow(deprecated)]
72impl From<MetasrvNodeInfo> for api::v1::meta::MetasrvNodeInfo {
73 fn from(node_info: MetasrvNodeInfo) -> Self {
74 Self {
75 peer: Some(api::v1::meta::Peer {
76 addr: node_info.addr,
77 ..Default::default()
78 }),
79 version: node_info.version.clone(),
82 git_commit: node_info.git_commit.clone(),
83 start_time_ms: node_info.start_time_ms,
84 cpus: node_info.total_cpu_millicores as u32,
85 memory_bytes: node_info.total_memory_bytes as u64,
86 info: Some(api::v1::meta::NodeInfo {
88 version: node_info.version,
89 git_commit: node_info.git_commit,
90 start_time_ms: node_info.start_time_ms,
91 total_cpu_millicores: node_info.total_cpu_millicores,
92 total_memory_bytes: node_info.total_memory_bytes,
93 cpu_usage_millicores: node_info.cpu_usage_millicores,
94 memory_usage_bytes: node_info.memory_usage_bytes,
95 cpus: node_info.total_cpu_millicores as u32,
96 memory_bytes: node_info.total_memory_bytes as u64,
97 hostname: node_info.hostname,
98 }),
99 }
100 }
101}
102
103#[derive(Debug, Clone)]
105pub enum LeaderChangeMessage {
106 Elected(Arc<dyn LeaderKey>),
107 StepDown(Arc<dyn LeaderKey>),
108}
109
110pub trait LeaderKey: Send + Sync + Debug {
113 fn name(&self) -> &[u8];
115
116 fn key(&self) -> &[u8];
119
120 fn revision(&self) -> i64;
122
123 fn lease_id(&self) -> i64;
125}
126
127impl fmt::Display for LeaderChangeMessage {
128 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
129 let leader_key = match self {
130 LeaderChangeMessage::Elected(leader_key) => {
131 write!(f, "Elected(")?;
132 leader_key
133 }
134 LeaderChangeMessage::StepDown(leader_key) => {
135 write!(f, "StepDown(")?;
136 leader_key
137 }
138 };
139 write!(f, "LeaderKey {{ ")?;
140 write!(f, "name: {}", String::from_utf8_lossy(leader_key.name()))?;
141 write!(f, ", key: {}", String::from_utf8_lossy(leader_key.key()))?;
142 write!(f, ", rev: {}", leader_key.revision())?;
143 write!(f, ", lease: {}", leader_key.lease_id())?;
144 write!(f, " }})")
145 }
146}
147
148fn listen_leader_change(leader_value: String) -> Sender<LeaderChangeMessage> {
149 let (tx, mut rx) = broadcast::channel(100);
150 let _handle = common_runtime::spawn_global(async move {
151 loop {
152 match rx.recv().await {
153 Ok(msg) => match msg {
154 LeaderChangeMessage::Elected(key) => {
155 info!(
156 "[{leader_value}] is elected as leader: {:?}, lease: {}",
157 String::from_utf8_lossy(key.name()),
158 key.lease_id()
159 );
160 }
161 LeaderChangeMessage::StepDown(key) => {
162 warn!(
163 "[{leader_value}] is stepping down: {:?}, lease: {}",
164 String::from_utf8_lossy(key.name()),
165 key.lease_id()
166 );
167 }
168 },
169 Err(RecvError::Lagged(_)) => {
170 warn!("Log printing is too slow or leader changed too fast!");
171 }
172 Err(RecvError::Closed) => break,
173 }
174 }
175 });
176 tx
177}
178
179fn send_leader_change_and_set_flags(
182 is_leader: &AtomicBool,
183 leader_infancy: &AtomicBool,
184 tx: &Sender<LeaderChangeMessage>,
185 msg: LeaderChangeMessage,
186) {
187 let is_elected = matches!(msg, LeaderChangeMessage::Elected(_));
188 if is_leader
189 .compare_exchange(!is_elected, is_elected, Ordering::AcqRel, Ordering::Acquire)
190 .is_ok()
191 {
192 if is_elected {
193 leader_infancy.store(true, Ordering::Release);
194 }
195 if let Err(e) = tx.send(msg) {
196 error!(e; "Failed to send leader change message");
197 }
198 }
199}
200
201#[async_trait::async_trait]
202pub trait Election: Send + Sync {
203 type Leader;
204
205 fn is_leader(&self) -> bool;
207
208 fn in_leader_infancy(&self) -> bool;
214
215 async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()>;
217
218 async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>>;
220
221 async fn campaign(&self) -> Result<()>;
226
227 async fn reset_campaign(&self) {}
231
232 async fn leader(&self) -> Result<Self::Leader>;
234
235 async fn resign(&self) -> Result<()>;
238
239 fn subscribe_leader_change(&self) -> Receiver<LeaderChangeMessage>;
240}
241
242pub type ElectionRef = Arc<dyn Election<Leader = LeaderValue>>;