meta_srv/election/
etcd.rs1use std::sync::Arc;
16use std::sync::atomic::{AtomicBool, Ordering};
17use std::time::Duration;
18
19use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, META_LEASE_SECS};
20use common_meta::key::{CANDIDATES_ROOT, ELECTION_KEY};
21use common_telemetry::{error, info, warn};
22use etcd_client::{
23 Client, GetOptions, LeaderKey as EtcdLeaderKey, LeaseKeepAliveStream, LeaseKeeper, PutOptions,
24};
25use snafu::{OptionExt, ResultExt, ensure};
26use tokio::sync::broadcast;
27use tokio::sync::broadcast::Receiver;
28use tokio::time::{MissedTickBehavior, timeout};
29
30use crate::election::{
31 CANDIDATE_LEASE_SECS, Election, KEEP_ALIVE_INTERVAL_SECS, LeaderChangeMessage, LeaderKey,
32 listen_leader_change, send_leader_change_and_set_flags,
33};
34use crate::error;
35use crate::error::Result;
36use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};
37
38impl LeaderKey for EtcdLeaderKey {
39 fn name(&self) -> &[u8] {
40 self.name()
41 }
42
43 fn key(&self) -> &[u8] {
44 self.key()
45 }
46
47 fn revision(&self) -> i64 {
48 self.rev()
49 }
50
51 fn lease_id(&self) -> i64 {
52 self.lease()
53 }
54}
55
56pub struct EtcdElection {
57 leader_value: String,
58 client: Client,
59 is_leader: AtomicBool,
60 infancy: AtomicBool,
61 leader_watcher: broadcast::Sender<LeaderChangeMessage>,
62 store_key_prefix: String,
63}
64
65impl EtcdElection {
66 pub async fn with_etcd_client<E>(
67 leader_value: E,
68 client: Client,
69 store_key_prefix: String,
70 ) -> Result<ElectionRef>
71 where
72 E: AsRef<str>,
73 {
74 let leader_value: String = leader_value.as_ref().into();
75 let tx = listen_leader_change(leader_value.clone());
76 Ok(Arc::new(Self {
77 leader_value,
78 client,
79 is_leader: AtomicBool::new(false),
80 infancy: AtomicBool::new(false),
81 leader_watcher: tx,
82 store_key_prefix,
83 }))
84 }
85
86 fn election_key(&self) -> String {
87 format!("{}{}", self.store_key_prefix, ELECTION_KEY)
88 }
89
90 fn candidate_root(&self) -> String {
91 format!("{}{}", self.store_key_prefix, CANDIDATES_ROOT)
92 }
93
94 fn candidate_key(&self) -> String {
95 format!("{}{}", self.candidate_root(), self.leader_value)
96 }
97}
98
99#[async_trait::async_trait]
100impl Election for EtcdElection {
101 type Leader = LeaderValue;
102
103 fn is_leader(&self) -> bool {
104 self.is_leader.load(Ordering::Relaxed)
105 }
106
107 fn in_leader_infancy(&self) -> bool {
108 self.infancy
109 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
110 .is_ok()
111 }
112
113 async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> {
114 let mut lease_client = self.client.lease_client();
115 let res: etcd_client::LeaseGrantResponse = lease_client
116 .grant(CANDIDATE_LEASE_SECS as i64, None)
117 .await
118 .context(error::EtcdFailedSnafu)?;
119 let lease_id = res.id();
120
121 let key = self.candidate_key().into_bytes();
123 let value = serde_json::to_string(node_info)
124 .with_context(|_| error::SerializeToJsonSnafu {
125 input: format!("{node_info:?}"),
126 })?
127 .into_bytes();
128 self.client
130 .kv_client()
131 .put(key, value, Some(PutOptions::new().with_lease(lease_id)))
132 .await
133 .context(error::EtcdFailedSnafu)?;
134
135 let (mut keeper, mut receiver) = lease_client
136 .keep_alive(lease_id)
137 .await
138 .context(error::EtcdFailedSnafu)?;
139
140 let mut keep_alive_interval =
141 tokio::time::interval(Duration::from_secs(KEEP_ALIVE_INTERVAL_SECS));
142
143 loop {
144 let _ = keep_alive_interval.tick().await;
145 keeper.keep_alive().await.context(error::EtcdFailedSnafu)?;
146
147 if let Some(res) = receiver.message().await.context(error::EtcdFailedSnafu)?
148 && res.ttl() <= 0
149 {
150 warn!("Candidate lease expired, key: {}", self.candidate_key());
151 break;
152 }
153 }
154
155 Ok(())
156 }
157
158 async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>> {
159 let key = self.candidate_root().into_bytes();
160 let res = self
161 .client
162 .kv_client()
163 .get(key, Some(GetOptions::new().with_prefix()))
164 .await
165 .context(error::EtcdFailedSnafu)?;
166
167 let mut nodes = Vec::with_capacity(res.kvs().len());
168 for kv in res.kvs() {
169 let node =
170 serde_json::from_slice::<MetasrvNodeInfo>(kv.value()).with_context(|_| {
171 error::DeserializeFromJsonSnafu {
172 input: String::from_utf8_lossy(kv.value()),
173 }
174 })?;
175 nodes.push(node);
176 }
177
178 Ok(nodes)
179 }
180
181 async fn campaign(&self) -> Result<()> {
182 let mut lease_client = self.client.lease_client();
183 let mut election_client = self.client.election_client();
184 let res = lease_client
185 .grant(META_LEASE_SECS as i64, None)
186 .await
187 .context(error::EtcdFailedSnafu)?;
188 let lease_id = res.id();
189
190 info!("Election grant ttl: {:?}, lease: {:?}", res.ttl(), lease_id);
191
192 let res = election_client
200 .campaign(self.election_key(), self.leader_value.clone(), lease_id)
201 .await
202 .context(error::EtcdFailedSnafu)?;
203
204 if let Some(leader) = res.leader() {
205 let (mut keeper, mut receiver) = lease_client
206 .keep_alive(lease_id)
207 .await
208 .context(error::EtcdFailedSnafu)?;
209
210 let keep_lease_duration = Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS);
211 let mut keep_alive_interval = tokio::time::interval(keep_lease_duration);
212 keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
213 loop {
214 match timeout(
216 keep_lease_duration,
217 self.keep_alive(&mut keeper, &mut receiver, leader.clone()),
218 )
219 .await
220 {
221 Ok(Ok(())) => {
222 let _ = keep_alive_interval.tick().await;
223 }
224 Ok(Err(err)) => {
225 error!(err; "Failed to keep alive");
226 break;
227 }
228 Err(_) => {
229 error!("Refresh lease timeout");
230 break;
231 }
232 }
233 }
234
235 send_leader_change_and_set_flags(
236 &self.is_leader,
237 &self.infancy,
238 &self.leader_watcher,
239 LeaderChangeMessage::StepDown(Arc::new(leader.clone())),
240 );
241 }
242
243 Ok(())
244 }
245
246 async fn leader(&self) -> Result<LeaderValue> {
247 if self.is_leader.load(Ordering::Relaxed) {
248 Ok(self.leader_value.as_bytes().into())
249 } else {
250 let res = self
251 .client
252 .election_client()
253 .leader(self.election_key())
254 .await
255 .context(error::EtcdFailedSnafu)?;
256 let leader_value = res.kv().context(error::NoLeaderSnafu)?.value();
257 Ok(leader_value.into())
258 }
259 }
260
261 async fn resign(&self) -> Result<()> {
262 todo!()
263 }
264
265 fn subscribe_leader_change(&self) -> Receiver<LeaderChangeMessage> {
266 self.leader_watcher.subscribe()
267 }
268}
269
270impl EtcdElection {
271 async fn keep_alive(
272 &self,
273 keeper: &mut LeaseKeeper,
274 receiver: &mut LeaseKeepAliveStream,
275 leader: EtcdLeaderKey,
276 ) -> Result<()> {
277 keeper.keep_alive().await.context(error::EtcdFailedSnafu)?;
278 if let Some(res) = receiver.message().await.context(error::EtcdFailedSnafu)? {
279 ensure!(
280 res.ttl() > 0,
281 error::UnexpectedSnafu {
282 violated: "Failed to refresh the lease",
283 }
284 );
285
286 send_leader_change_and_set_flags(
288 &self.is_leader,
289 &self.infancy,
290 &self.leader_watcher,
291 LeaderChangeMessage::Elected(Arc::new(leader.clone())),
292 );
293 }
294
295 Ok(())
296 }
297}