1use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::Arc;
17use std::time::Duration;
18
19use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, META_LEASE_SECS};
20use common_telemetry::{error, info, warn};
21use etcd_client::{
22 Client, GetOptions, LeaderKey as EtcdLeaderKey, LeaseKeepAliveStream, LeaseKeeper, PutOptions,
23};
24use snafu::{ensure, OptionExt, ResultExt};
25use tokio::sync::broadcast;
26use tokio::sync::broadcast::Receiver;
27use tokio::time::{timeout, MissedTickBehavior};
28
29use crate::election::{
30 listen_leader_change, Election, LeaderChangeMessage, LeaderKey, CANDIDATES_ROOT,
31 CANDIDATE_LEASE_SECS, ELECTION_KEY, KEEP_ALIVE_INTERVAL_SECS,
32};
33use crate::error;
34use crate::error::Result;
35use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};
36
37impl LeaderKey for EtcdLeaderKey {
38 fn name(&self) -> &[u8] {
39 self.name()
40 }
41
42 fn key(&self) -> &[u8] {
43 self.key()
44 }
45
46 fn revision(&self) -> i64 {
47 self.rev()
48 }
49
50 fn lease_id(&self) -> i64 {
51 self.lease()
52 }
53}
54
55pub struct EtcdElection {
56 leader_value: String,
57 client: Client,
58 is_leader: AtomicBool,
59 infancy: AtomicBool,
60 leader_watcher: broadcast::Sender<LeaderChangeMessage>,
61 store_key_prefix: String,
62}
63
64impl EtcdElection {
65 pub async fn with_endpoints<E, S>(
66 leader_value: E,
67 endpoints: S,
68 store_key_prefix: String,
69 ) -> Result<ElectionRef>
70 where
71 E: AsRef<str>,
72 S: AsRef<[E]>,
73 {
74 let client = Client::connect(endpoints, None)
75 .await
76 .context(error::ConnectEtcdSnafu)?;
77
78 Self::with_etcd_client(leader_value, client, store_key_prefix).await
79 }
80
81 pub async fn with_etcd_client<E>(
82 leader_value: E,
83 client: Client,
84 store_key_prefix: String,
85 ) -> Result<ElectionRef>
86 where
87 E: AsRef<str>,
88 {
89 let leader_value: String = leader_value.as_ref().into();
90 let tx = listen_leader_change(leader_value.clone());
91 Ok(Arc::new(Self {
92 leader_value,
93 client,
94 is_leader: AtomicBool::new(false),
95 infancy: AtomicBool::new(false),
96 leader_watcher: tx,
97 store_key_prefix,
98 }))
99 }
100
101 fn election_key(&self) -> String {
102 format!("{}{}", self.store_key_prefix, ELECTION_KEY)
103 }
104
105 fn candidate_root(&self) -> String {
106 format!("{}{}", self.store_key_prefix, CANDIDATES_ROOT)
107 }
108
109 fn candidate_key(&self) -> String {
110 format!("{}{}", self.candidate_root(), self.leader_value)
111 }
112}
113
114#[async_trait::async_trait]
115impl Election for EtcdElection {
116 type Leader = LeaderValue;
117
118 fn is_leader(&self) -> bool {
119 self.is_leader.load(Ordering::Relaxed)
120 }
121
122 fn in_leader_infancy(&self) -> bool {
123 self.infancy
124 .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
125 .is_ok()
126 }
127
128 async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> {
129 let mut lease_client = self.client.lease_client();
130 let res = lease_client
131 .grant(CANDIDATE_LEASE_SECS as i64, None)
132 .await
133 .context(error::EtcdFailedSnafu)?;
134 let lease_id = res.id();
135
136 let key = self.candidate_key().into_bytes();
138 let value = serde_json::to_string(node_info)
139 .with_context(|_| error::SerializeToJsonSnafu {
140 input: format!("{node_info:?}"),
141 })?
142 .into_bytes();
143 self.client
145 .kv_client()
146 .put(key, value, Some(PutOptions::new().with_lease(lease_id)))
147 .await
148 .context(error::EtcdFailedSnafu)?;
149
150 let (mut keeper, mut receiver) = lease_client
151 .keep_alive(lease_id)
152 .await
153 .context(error::EtcdFailedSnafu)?;
154
155 let mut keep_alive_interval =
156 tokio::time::interval(Duration::from_secs(KEEP_ALIVE_INTERVAL_SECS));
157
158 loop {
159 let _ = keep_alive_interval.tick().await;
160 keeper.keep_alive().await.context(error::EtcdFailedSnafu)?;
161
162 if let Some(res) = receiver.message().await.context(error::EtcdFailedSnafu)? {
163 if res.ttl() <= 0 {
164 warn!("Candidate lease expired, key: {}", self.candidate_key());
165 break;
166 }
167 }
168 }
169
170 Ok(())
171 }
172
173 async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>> {
174 let key = self.candidate_root().into_bytes();
175 let res = self
176 .client
177 .kv_client()
178 .get(key, Some(GetOptions::new().with_prefix()))
179 .await
180 .context(error::EtcdFailedSnafu)?;
181
182 let mut nodes = Vec::with_capacity(res.kvs().len());
183 for kv in res.kvs() {
184 let node =
185 serde_json::from_slice::<MetasrvNodeInfo>(kv.value()).with_context(|_| {
186 error::DeserializeFromJsonSnafu {
187 input: String::from_utf8_lossy(kv.value()),
188 }
189 })?;
190 nodes.push(node);
191 }
192
193 Ok(nodes)
194 }
195
196 async fn campaign(&self) -> Result<()> {
197 let mut lease_client = self.client.lease_client();
198 let mut election_client = self.client.election_client();
199 let res = lease_client
200 .grant(META_LEASE_SECS as i64, None)
201 .await
202 .context(error::EtcdFailedSnafu)?;
203 let lease_id = res.id();
204
205 info!("Election grant ttl: {:?}, lease: {:?}", res.ttl(), lease_id);
206
207 let res = election_client
215 .campaign(self.election_key(), self.leader_value.clone(), lease_id)
216 .await
217 .context(error::EtcdFailedSnafu)?;
218
219 if let Some(leader) = res.leader() {
220 let (mut keeper, mut receiver) = lease_client
221 .keep_alive(lease_id)
222 .await
223 .context(error::EtcdFailedSnafu)?;
224
225 let keep_lease_duration = Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS);
226 let mut keep_alive_interval = tokio::time::interval(keep_lease_duration);
227 keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
228 loop {
229 match timeout(
231 keep_lease_duration,
232 self.keep_alive(&mut keeper, &mut receiver, leader.clone()),
233 )
234 .await
235 {
236 Ok(Ok(())) => {
237 let _ = keep_alive_interval.tick().await;
238 }
239 Ok(Err(err)) => {
240 error!(err; "Failed to keep alive");
241 break;
242 }
243 Err(_) => {
244 error!("Refresh lease timeout");
245 break;
246 }
247 }
248 }
249
250 if self
251 .is_leader
252 .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
253 .is_ok()
254 {
255 if let Err(e) = self
256 .leader_watcher
257 .send(LeaderChangeMessage::StepDown(Arc::new(leader.clone())))
258 {
259 error!(e; "Failed to send leader change message");
260 }
261 }
262 }
263
264 Ok(())
265 }
266
267 async fn leader(&self) -> Result<LeaderValue> {
268 if self.is_leader.load(Ordering::Relaxed) {
269 Ok(self.leader_value.as_bytes().into())
270 } else {
271 let res = self
272 .client
273 .election_client()
274 .leader(self.election_key())
275 .await
276 .context(error::EtcdFailedSnafu)?;
277 let leader_value = res.kv().context(error::NoLeaderSnafu)?.value();
278 Ok(leader_value.into())
279 }
280 }
281
282 async fn resign(&self) -> Result<()> {
283 todo!()
284 }
285
286 fn subscribe_leader_change(&self) -> Receiver<LeaderChangeMessage> {
287 self.leader_watcher.subscribe()
288 }
289}
290
291impl EtcdElection {
292 async fn keep_alive(
293 &self,
294 keeper: &mut LeaseKeeper,
295 receiver: &mut LeaseKeepAliveStream,
296 leader: EtcdLeaderKey,
297 ) -> Result<()> {
298 keeper.keep_alive().await.context(error::EtcdFailedSnafu)?;
299 if let Some(res) = receiver.message().await.context(error::EtcdFailedSnafu)? {
300 ensure!(
301 res.ttl() > 0,
302 error::UnexpectedSnafu {
303 violated: "Failed to refresh the lease",
304 }
305 );
306
307 if self
309 .is_leader
310 .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
311 .is_ok()
312 {
313 self.infancy.store(true, Ordering::Relaxed);
314
315 if let Err(e) = self
316 .leader_watcher
317 .send(LeaderChangeMessage::Elected(Arc::new(leader)))
318 {
319 error!(e; "Failed to send leader change message");
320 }
321 }
322 }
323
324 Ok(())
325 }
326}