1use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::Arc;
17use std::time::Duration;
18
19use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, META_LEASE_SECS};
20use common_meta::key::{CANDIDATES_ROOT, ELECTION_KEY};
21use common_telemetry::{error, info, warn};
22use etcd_client::{
23 Client, GetOptions, LeaderKey as EtcdLeaderKey, LeaseKeepAliveStream, LeaseKeeper, PutOptions,
24};
25use snafu::{ensure, OptionExt, ResultExt};
26use tokio::sync::broadcast;
27use tokio::sync::broadcast::Receiver;
28use tokio::time::{timeout, MissedTickBehavior};
29
30use crate::election::{
31 listen_leader_change, send_leader_change_and_set_flags, Election, LeaderChangeMessage,
32 LeaderKey, CANDIDATE_LEASE_SECS, KEEP_ALIVE_INTERVAL_SECS,
33};
34use crate::error;
35use crate::error::Result;
36use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};
37
38impl LeaderKey for EtcdLeaderKey {
39 fn name(&self) -> &[u8] {
40 self.name()
41 }
42
43 fn key(&self) -> &[u8] {
44 self.key()
45 }
46
47 fn revision(&self) -> i64 {
48 self.rev()
49 }
50
51 fn lease_id(&self) -> i64 {
52 self.lease()
53 }
54}
55
56pub struct EtcdElection {
57 leader_value: String,
58 client: Client,
59 is_leader: AtomicBool,
60 infancy: AtomicBool,
61 leader_watcher: broadcast::Sender<LeaderChangeMessage>,
62 store_key_prefix: String,
63}
64
65impl EtcdElection {
66 pub async fn with_endpoints<E, S>(
67 leader_value: E,
68 endpoints: S,
69 store_key_prefix: String,
70 ) -> Result<ElectionRef>
71 where
72 E: AsRef<str>,
73 S: AsRef<[E]>,
74 {
75 let client = Client::connect(endpoints, None)
76 .await
77 .context(error::ConnectEtcdSnafu)?;
78
79 Self::with_etcd_client(leader_value, client, store_key_prefix).await
80 }
81
82 pub async fn with_etcd_client<E>(
83 leader_value: E,
84 client: Client,
85 store_key_prefix: String,
86 ) -> Result<ElectionRef>
87 where
88 E: AsRef<str>,
89 {
90 let leader_value: String = leader_value.as_ref().into();
91 let tx = listen_leader_change(leader_value.clone());
92 Ok(Arc::new(Self {
93 leader_value,
94 client,
95 is_leader: AtomicBool::new(false),
96 infancy: AtomicBool::new(false),
97 leader_watcher: tx,
98 store_key_prefix,
99 }))
100 }
101
102 fn election_key(&self) -> String {
103 format!("{}{}", self.store_key_prefix, ELECTION_KEY)
104 }
105
106 fn candidate_root(&self) -> String {
107 format!("{}{}", self.store_key_prefix, CANDIDATES_ROOT)
108 }
109
110 fn candidate_key(&self) -> String {
111 format!("{}{}", self.candidate_root(), self.leader_value)
112 }
113}
114
115#[async_trait::async_trait]
116impl Election for EtcdElection {
117 type Leader = LeaderValue;
118
119 fn is_leader(&self) -> bool {
120 self.is_leader.load(Ordering::Relaxed)
121 }
122
123 fn in_leader_infancy(&self) -> bool {
124 self.infancy
125 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
126 .is_ok()
127 }
128
129 async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> {
130 let mut lease_client = self.client.lease_client();
131 let res = lease_client
132 .grant(CANDIDATE_LEASE_SECS as i64, None)
133 .await
134 .context(error::EtcdFailedSnafu)?;
135 let lease_id = res.id();
136
137 let key = self.candidate_key().into_bytes();
139 let value = serde_json::to_string(node_info)
140 .with_context(|_| error::SerializeToJsonSnafu {
141 input: format!("{node_info:?}"),
142 })?
143 .into_bytes();
144 self.client
146 .kv_client()
147 .put(key, value, Some(PutOptions::new().with_lease(lease_id)))
148 .await
149 .context(error::EtcdFailedSnafu)?;
150
151 let (mut keeper, mut receiver) = lease_client
152 .keep_alive(lease_id)
153 .await
154 .context(error::EtcdFailedSnafu)?;
155
156 let mut keep_alive_interval =
157 tokio::time::interval(Duration::from_secs(KEEP_ALIVE_INTERVAL_SECS));
158
159 loop {
160 let _ = keep_alive_interval.tick().await;
161 keeper.keep_alive().await.context(error::EtcdFailedSnafu)?;
162
163 if let Some(res) = receiver.message().await.context(error::EtcdFailedSnafu)? {
164 if res.ttl() <= 0 {
165 warn!("Candidate lease expired, key: {}", self.candidate_key());
166 break;
167 }
168 }
169 }
170
171 Ok(())
172 }
173
174 async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>> {
175 let key = self.candidate_root().into_bytes();
176 let res = self
177 .client
178 .kv_client()
179 .get(key, Some(GetOptions::new().with_prefix()))
180 .await
181 .context(error::EtcdFailedSnafu)?;
182
183 let mut nodes = Vec::with_capacity(res.kvs().len());
184 for kv in res.kvs() {
185 let node =
186 serde_json::from_slice::<MetasrvNodeInfo>(kv.value()).with_context(|_| {
187 error::DeserializeFromJsonSnafu {
188 input: String::from_utf8_lossy(kv.value()),
189 }
190 })?;
191 nodes.push(node);
192 }
193
194 Ok(nodes)
195 }
196
197 async fn campaign(&self) -> Result<()> {
198 let mut lease_client = self.client.lease_client();
199 let mut election_client = self.client.election_client();
200 let res = lease_client
201 .grant(META_LEASE_SECS as i64, None)
202 .await
203 .context(error::EtcdFailedSnafu)?;
204 let lease_id = res.id();
205
206 info!("Election grant ttl: {:?}, lease: {:?}", res.ttl(), lease_id);
207
208 let res = election_client
216 .campaign(self.election_key(), self.leader_value.clone(), lease_id)
217 .await
218 .context(error::EtcdFailedSnafu)?;
219
220 if let Some(leader) = res.leader() {
221 let (mut keeper, mut receiver) = lease_client
222 .keep_alive(lease_id)
223 .await
224 .context(error::EtcdFailedSnafu)?;
225
226 let keep_lease_duration = Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS);
227 let mut keep_alive_interval = tokio::time::interval(keep_lease_duration);
228 keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
229 loop {
230 match timeout(
232 keep_lease_duration,
233 self.keep_alive(&mut keeper, &mut receiver, leader.clone()),
234 )
235 .await
236 {
237 Ok(Ok(())) => {
238 let _ = keep_alive_interval.tick().await;
239 }
240 Ok(Err(err)) => {
241 error!(err; "Failed to keep alive");
242 break;
243 }
244 Err(_) => {
245 error!("Refresh lease timeout");
246 break;
247 }
248 }
249 }
250
251 send_leader_change_and_set_flags(
252 &self.is_leader,
253 &self.infancy,
254 &self.leader_watcher,
255 LeaderChangeMessage::StepDown(Arc::new(leader.clone())),
256 );
257 }
258
259 Ok(())
260 }
261
262 async fn leader(&self) -> Result<LeaderValue> {
263 if self.is_leader.load(Ordering::Relaxed) {
264 Ok(self.leader_value.as_bytes().into())
265 } else {
266 let res = self
267 .client
268 .election_client()
269 .leader(self.election_key())
270 .await
271 .context(error::EtcdFailedSnafu)?;
272 let leader_value = res.kv().context(error::NoLeaderSnafu)?.value();
273 Ok(leader_value.into())
274 }
275 }
276
277 async fn resign(&self) -> Result<()> {
278 todo!()
279 }
280
281 fn subscribe_leader_change(&self) -> Receiver<LeaderChangeMessage> {
282 self.leader_watcher.subscribe()
283 }
284}
285
286impl EtcdElection {
287 async fn keep_alive(
288 &self,
289 keeper: &mut LeaseKeeper,
290 receiver: &mut LeaseKeepAliveStream,
291 leader: EtcdLeaderKey,
292 ) -> Result<()> {
293 keeper.keep_alive().await.context(error::EtcdFailedSnafu)?;
294 if let Some(res) = receiver.message().await.context(error::EtcdFailedSnafu)? {
295 ensure!(
296 res.ttl() > 0,
297 error::UnexpectedSnafu {
298 violated: "Failed to refresh the lease",
299 }
300 );
301
302 send_leader_change_and_set_flags(
304 &self.is_leader,
305 &self.infancy,
306 &self.leader_watcher,
307 LeaderChangeMessage::Elected(Arc::new(leader.clone())),
308 );
309 }
310
311 Ok(())
312 }
313}