meta_srv/election/
etcd.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::Arc;
17use std::time::Duration;
18
19use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, META_LEASE_SECS};
20use common_telemetry::{error, info, warn};
21use etcd_client::{
22    Client, GetOptions, LeaderKey as EtcdLeaderKey, LeaseKeepAliveStream, LeaseKeeper, PutOptions,
23};
24use snafu::{ensure, OptionExt, ResultExt};
25use tokio::sync::broadcast;
26use tokio::sync::broadcast::Receiver;
27use tokio::time::{timeout, MissedTickBehavior};
28
29use crate::election::{
30    listen_leader_change, Election, LeaderChangeMessage, LeaderKey, CANDIDATES_ROOT,
31    CANDIDATE_LEASE_SECS, ELECTION_KEY, KEEP_ALIVE_INTERVAL_SECS,
32};
33use crate::error;
34use crate::error::Result;
35use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};
36
37impl LeaderKey for EtcdLeaderKey {
38    fn name(&self) -> &[u8] {
39        self.name()
40    }
41
42    fn key(&self) -> &[u8] {
43        self.key()
44    }
45
46    fn revision(&self) -> i64 {
47        self.rev()
48    }
49
50    fn lease_id(&self) -> i64 {
51        self.lease()
52    }
53}
54
55pub struct EtcdElection {
56    leader_value: String,
57    client: Client,
58    is_leader: AtomicBool,
59    infancy: AtomicBool,
60    leader_watcher: broadcast::Sender<LeaderChangeMessage>,
61    store_key_prefix: String,
62}
63
64impl EtcdElection {
65    pub async fn with_endpoints<E, S>(
66        leader_value: E,
67        endpoints: S,
68        store_key_prefix: String,
69    ) -> Result<ElectionRef>
70    where
71        E: AsRef<str>,
72        S: AsRef<[E]>,
73    {
74        let client = Client::connect(endpoints, None)
75            .await
76            .context(error::ConnectEtcdSnafu)?;
77
78        Self::with_etcd_client(leader_value, client, store_key_prefix).await
79    }
80
81    pub async fn with_etcd_client<E>(
82        leader_value: E,
83        client: Client,
84        store_key_prefix: String,
85    ) -> Result<ElectionRef>
86    where
87        E: AsRef<str>,
88    {
89        let leader_value: String = leader_value.as_ref().into();
90        let tx = listen_leader_change(leader_value.clone());
91        Ok(Arc::new(Self {
92            leader_value,
93            client,
94            is_leader: AtomicBool::new(false),
95            infancy: AtomicBool::new(false),
96            leader_watcher: tx,
97            store_key_prefix,
98        }))
99    }
100
101    fn election_key(&self) -> String {
102        format!("{}{}", self.store_key_prefix, ELECTION_KEY)
103    }
104
105    fn candidate_root(&self) -> String {
106        format!("{}{}", self.store_key_prefix, CANDIDATES_ROOT)
107    }
108
109    fn candidate_key(&self) -> String {
110        format!("{}{}", self.candidate_root(), self.leader_value)
111    }
112}
113
114#[async_trait::async_trait]
115impl Election for EtcdElection {
116    type Leader = LeaderValue;
117
118    fn is_leader(&self) -> bool {
119        self.is_leader.load(Ordering::Relaxed)
120    }
121
122    fn in_leader_infancy(&self) -> bool {
123        self.infancy
124            .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
125            .is_ok()
126    }
127
128    async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> {
129        let mut lease_client = self.client.lease_client();
130        let res = lease_client
131            .grant(CANDIDATE_LEASE_SECS as i64, None)
132            .await
133            .context(error::EtcdFailedSnafu)?;
134        let lease_id = res.id();
135
136        // The register info: key is the candidate key, value is its node info(addr, version, git_commit).
137        let key = self.candidate_key().into_bytes();
138        let value = serde_json::to_string(node_info)
139            .with_context(|_| error::SerializeToJsonSnafu {
140                input: format!("{node_info:?}"),
141            })?
142            .into_bytes();
143        // Puts with the lease id
144        self.client
145            .kv_client()
146            .put(key, value, Some(PutOptions::new().with_lease(lease_id)))
147            .await
148            .context(error::EtcdFailedSnafu)?;
149
150        let (mut keeper, mut receiver) = lease_client
151            .keep_alive(lease_id)
152            .await
153            .context(error::EtcdFailedSnafu)?;
154
155        let mut keep_alive_interval =
156            tokio::time::interval(Duration::from_secs(KEEP_ALIVE_INTERVAL_SECS));
157
158        loop {
159            let _ = keep_alive_interval.tick().await;
160            keeper.keep_alive().await.context(error::EtcdFailedSnafu)?;
161
162            if let Some(res) = receiver.message().await.context(error::EtcdFailedSnafu)? {
163                if res.ttl() <= 0 {
164                    warn!("Candidate lease expired, key: {}", self.candidate_key());
165                    break;
166                }
167            }
168        }
169
170        Ok(())
171    }
172
173    async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>> {
174        let key = self.candidate_root().into_bytes();
175        let res = self
176            .client
177            .kv_client()
178            .get(key, Some(GetOptions::new().with_prefix()))
179            .await
180            .context(error::EtcdFailedSnafu)?;
181
182        let mut nodes = Vec::with_capacity(res.kvs().len());
183        for kv in res.kvs() {
184            let node =
185                serde_json::from_slice::<MetasrvNodeInfo>(kv.value()).with_context(|_| {
186                    error::DeserializeFromJsonSnafu {
187                        input: String::from_utf8_lossy(kv.value()),
188                    }
189                })?;
190            nodes.push(node);
191        }
192
193        Ok(nodes)
194    }
195
196    async fn campaign(&self) -> Result<()> {
197        let mut lease_client = self.client.lease_client();
198        let mut election_client = self.client.election_client();
199        let res = lease_client
200            .grant(META_LEASE_SECS as i64, None)
201            .await
202            .context(error::EtcdFailedSnafu)?;
203        let lease_id = res.id();
204
205        info!("Election grant ttl: {:?}, lease: {:?}", res.ttl(), lease_id);
206
207        // Campaign, waits to acquire leadership in an election, returning
208        // a LeaderKey representing the leadership if successful.
209        //
210        // The method will be blocked until the election is won, and after
211        // passing the method, it is necessary to execute `keep_alive` immediately
212        // to confirm that it is a valid leader, because it is possible that the
213        // election's lease expires.
214        let res = election_client
215            .campaign(self.election_key(), self.leader_value.clone(), lease_id)
216            .await
217            .context(error::EtcdFailedSnafu)?;
218
219        if let Some(leader) = res.leader() {
220            let (mut keeper, mut receiver) = lease_client
221                .keep_alive(lease_id)
222                .await
223                .context(error::EtcdFailedSnafu)?;
224
225            let keep_lease_duration = Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS);
226            let mut keep_alive_interval = tokio::time::interval(keep_lease_duration);
227            keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
228            loop {
229                // The keep alive operation MUST be done in `META_KEEP_ALIVE_INTERVAL_SECS`.
230                match timeout(
231                    keep_lease_duration,
232                    self.keep_alive(&mut keeper, &mut receiver, leader.clone()),
233                )
234                .await
235                {
236                    Ok(Ok(())) => {
237                        let _ = keep_alive_interval.tick().await;
238                    }
239                    Ok(Err(err)) => {
240                        error!(err; "Failed to keep alive");
241                        break;
242                    }
243                    Err(_) => {
244                        error!("Refresh lease timeout");
245                        break;
246                    }
247                }
248            }
249
250            if self
251                .is_leader
252                .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
253                .is_ok()
254            {
255                if let Err(e) = self
256                    .leader_watcher
257                    .send(LeaderChangeMessage::StepDown(Arc::new(leader.clone())))
258                {
259                    error!(e; "Failed to send leader change message");
260                }
261            }
262        }
263
264        Ok(())
265    }
266
267    async fn leader(&self) -> Result<LeaderValue> {
268        if self.is_leader.load(Ordering::Relaxed) {
269            Ok(self.leader_value.as_bytes().into())
270        } else {
271            let res = self
272                .client
273                .election_client()
274                .leader(self.election_key())
275                .await
276                .context(error::EtcdFailedSnafu)?;
277            let leader_value = res.kv().context(error::NoLeaderSnafu)?.value();
278            Ok(leader_value.into())
279        }
280    }
281
282    async fn resign(&self) -> Result<()> {
283        todo!()
284    }
285
286    fn subscribe_leader_change(&self) -> Receiver<LeaderChangeMessage> {
287        self.leader_watcher.subscribe()
288    }
289}
290
291impl EtcdElection {
292    async fn keep_alive(
293        &self,
294        keeper: &mut LeaseKeeper,
295        receiver: &mut LeaseKeepAliveStream,
296        leader: EtcdLeaderKey,
297    ) -> Result<()> {
298        keeper.keep_alive().await.context(error::EtcdFailedSnafu)?;
299        if let Some(res) = receiver.message().await.context(error::EtcdFailedSnafu)? {
300            ensure!(
301                res.ttl() > 0,
302                error::UnexpectedSnafu {
303                    violated: "Failed to refresh the lease",
304                }
305            );
306
307            // Only after a successful `keep_alive` is the leader considered official.
308            if self
309                .is_leader
310                .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
311                .is_ok()
312            {
313                self.infancy.store(true, Ordering::Relaxed);
314
315                if let Err(e) = self
316                    .leader_watcher
317                    .send(LeaderChangeMessage::Elected(Arc::new(leader)))
318                {
319                    error!(e; "Failed to send leader change message");
320                }
321            }
322        }
323
324        Ok(())
325    }
326}