meta_srv/election/
etcd.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::Arc;
17use std::time::Duration;
18
19use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, META_LEASE_SECS};
20use common_meta::key::{CANDIDATES_ROOT, ELECTION_KEY};
21use common_telemetry::{error, info, warn};
22use etcd_client::{
23    Client, GetOptions, LeaderKey as EtcdLeaderKey, LeaseKeepAliveStream, LeaseKeeper, PutOptions,
24};
25use snafu::{ensure, OptionExt, ResultExt};
26use tokio::sync::broadcast;
27use tokio::sync::broadcast::Receiver;
28use tokio::time::{timeout, MissedTickBehavior};
29
30use crate::election::{
31    listen_leader_change, send_leader_change_and_set_flags, Election, LeaderChangeMessage,
32    LeaderKey, CANDIDATE_LEASE_SECS, KEEP_ALIVE_INTERVAL_SECS,
33};
34use crate::error;
35use crate::error::Result;
36use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};
37
38impl LeaderKey for EtcdLeaderKey {
39    fn name(&self) -> &[u8] {
40        self.name()
41    }
42
43    fn key(&self) -> &[u8] {
44        self.key()
45    }
46
47    fn revision(&self) -> i64 {
48        self.rev()
49    }
50
51    fn lease_id(&self) -> i64 {
52        self.lease()
53    }
54}
55
56pub struct EtcdElection {
57    leader_value: String,
58    client: Client,
59    is_leader: AtomicBool,
60    infancy: AtomicBool,
61    leader_watcher: broadcast::Sender<LeaderChangeMessage>,
62    store_key_prefix: String,
63}
64
65impl EtcdElection {
66    pub async fn with_endpoints<E, S>(
67        leader_value: E,
68        endpoints: S,
69        store_key_prefix: String,
70    ) -> Result<ElectionRef>
71    where
72        E: AsRef<str>,
73        S: AsRef<[E]>,
74    {
75        let client = Client::connect(endpoints, None)
76            .await
77            .context(error::ConnectEtcdSnafu)?;
78
79        Self::with_etcd_client(leader_value, client, store_key_prefix).await
80    }
81
82    pub async fn with_etcd_client<E>(
83        leader_value: E,
84        client: Client,
85        store_key_prefix: String,
86    ) -> Result<ElectionRef>
87    where
88        E: AsRef<str>,
89    {
90        let leader_value: String = leader_value.as_ref().into();
91        let tx = listen_leader_change(leader_value.clone());
92        Ok(Arc::new(Self {
93            leader_value,
94            client,
95            is_leader: AtomicBool::new(false),
96            infancy: AtomicBool::new(false),
97            leader_watcher: tx,
98            store_key_prefix,
99        }))
100    }
101
102    fn election_key(&self) -> String {
103        format!("{}{}", self.store_key_prefix, ELECTION_KEY)
104    }
105
106    fn candidate_root(&self) -> String {
107        format!("{}{}", self.store_key_prefix, CANDIDATES_ROOT)
108    }
109
110    fn candidate_key(&self) -> String {
111        format!("{}{}", self.candidate_root(), self.leader_value)
112    }
113}
114
115#[async_trait::async_trait]
116impl Election for EtcdElection {
117    type Leader = LeaderValue;
118
119    fn is_leader(&self) -> bool {
120        self.is_leader.load(Ordering::Relaxed)
121    }
122
123    fn in_leader_infancy(&self) -> bool {
124        self.infancy
125            .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
126            .is_ok()
127    }
128
129    async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> {
130        let mut lease_client = self.client.lease_client();
131        let res = lease_client
132            .grant(CANDIDATE_LEASE_SECS as i64, None)
133            .await
134            .context(error::EtcdFailedSnafu)?;
135        let lease_id = res.id();
136
137        // The register info: key is the candidate key, value is its node info(addr, version, git_commit).
138        let key = self.candidate_key().into_bytes();
139        let value = serde_json::to_string(node_info)
140            .with_context(|_| error::SerializeToJsonSnafu {
141                input: format!("{node_info:?}"),
142            })?
143            .into_bytes();
144        // Puts with the lease id
145        self.client
146            .kv_client()
147            .put(key, value, Some(PutOptions::new().with_lease(lease_id)))
148            .await
149            .context(error::EtcdFailedSnafu)?;
150
151        let (mut keeper, mut receiver) = lease_client
152            .keep_alive(lease_id)
153            .await
154            .context(error::EtcdFailedSnafu)?;
155
156        let mut keep_alive_interval =
157            tokio::time::interval(Duration::from_secs(KEEP_ALIVE_INTERVAL_SECS));
158
159        loop {
160            let _ = keep_alive_interval.tick().await;
161            keeper.keep_alive().await.context(error::EtcdFailedSnafu)?;
162
163            if let Some(res) = receiver.message().await.context(error::EtcdFailedSnafu)? {
164                if res.ttl() <= 0 {
165                    warn!("Candidate lease expired, key: {}", self.candidate_key());
166                    break;
167                }
168            }
169        }
170
171        Ok(())
172    }
173
174    async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>> {
175        let key = self.candidate_root().into_bytes();
176        let res = self
177            .client
178            .kv_client()
179            .get(key, Some(GetOptions::new().with_prefix()))
180            .await
181            .context(error::EtcdFailedSnafu)?;
182
183        let mut nodes = Vec::with_capacity(res.kvs().len());
184        for kv in res.kvs() {
185            let node =
186                serde_json::from_slice::<MetasrvNodeInfo>(kv.value()).with_context(|_| {
187                    error::DeserializeFromJsonSnafu {
188                        input: String::from_utf8_lossy(kv.value()),
189                    }
190                })?;
191            nodes.push(node);
192        }
193
194        Ok(nodes)
195    }
196
197    async fn campaign(&self) -> Result<()> {
198        let mut lease_client = self.client.lease_client();
199        let mut election_client = self.client.election_client();
200        let res = lease_client
201            .grant(META_LEASE_SECS as i64, None)
202            .await
203            .context(error::EtcdFailedSnafu)?;
204        let lease_id = res.id();
205
206        info!("Election grant ttl: {:?}, lease: {:?}", res.ttl(), lease_id);
207
208        // Campaign, waits to acquire leadership in an election, returning
209        // a LeaderKey representing the leadership if successful.
210        //
211        // The method will be blocked until the election is won, and after
212        // passing the method, it is necessary to execute `keep_alive` immediately
213        // to confirm that it is a valid leader, because it is possible that the
214        // election's lease expires.
215        let res = election_client
216            .campaign(self.election_key(), self.leader_value.clone(), lease_id)
217            .await
218            .context(error::EtcdFailedSnafu)?;
219
220        if let Some(leader) = res.leader() {
221            let (mut keeper, mut receiver) = lease_client
222                .keep_alive(lease_id)
223                .await
224                .context(error::EtcdFailedSnafu)?;
225
226            let keep_lease_duration = Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS);
227            let mut keep_alive_interval = tokio::time::interval(keep_lease_duration);
228            keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
229            loop {
230                // The keep alive operation MUST be done in `META_KEEP_ALIVE_INTERVAL_SECS`.
231                match timeout(
232                    keep_lease_duration,
233                    self.keep_alive(&mut keeper, &mut receiver, leader.clone()),
234                )
235                .await
236                {
237                    Ok(Ok(())) => {
238                        let _ = keep_alive_interval.tick().await;
239                    }
240                    Ok(Err(err)) => {
241                        error!(err; "Failed to keep alive");
242                        break;
243                    }
244                    Err(_) => {
245                        error!("Refresh lease timeout");
246                        break;
247                    }
248                }
249            }
250
251            send_leader_change_and_set_flags(
252                &self.is_leader,
253                &self.infancy,
254                &self.leader_watcher,
255                LeaderChangeMessage::StepDown(Arc::new(leader.clone())),
256            );
257        }
258
259        Ok(())
260    }
261
262    async fn leader(&self) -> Result<LeaderValue> {
263        if self.is_leader.load(Ordering::Relaxed) {
264            Ok(self.leader_value.as_bytes().into())
265        } else {
266            let res = self
267                .client
268                .election_client()
269                .leader(self.election_key())
270                .await
271                .context(error::EtcdFailedSnafu)?;
272            let leader_value = res.kv().context(error::NoLeaderSnafu)?.value();
273            Ok(leader_value.into())
274        }
275    }
276
277    async fn resign(&self) -> Result<()> {
278        todo!()
279    }
280
281    fn subscribe_leader_change(&self) -> Receiver<LeaderChangeMessage> {
282        self.leader_watcher.subscribe()
283    }
284}
285
286impl EtcdElection {
287    async fn keep_alive(
288        &self,
289        keeper: &mut LeaseKeeper,
290        receiver: &mut LeaseKeepAliveStream,
291        leader: EtcdLeaderKey,
292    ) -> Result<()> {
293        keeper.keep_alive().await.context(error::EtcdFailedSnafu)?;
294        if let Some(res) = receiver.message().await.context(error::EtcdFailedSnafu)? {
295            ensure!(
296                res.ttl() > 0,
297                error::UnexpectedSnafu {
298                    violated: "Failed to refresh the lease",
299                }
300            );
301
302            // Only after a successful `keep_alive` is the leader considered official.
303            send_leader_change_and_set_flags(
304                &self.is_leader,
305                &self.infancy,
306                &self.leader_watcher,
307                LeaderChangeMessage::Elected(Arc::new(leader.clone())),
308            );
309        }
310
311        Ok(())
312    }
313}