meta_srv/election/
etcd.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::sync::atomic::{AtomicBool, Ordering};
17use std::time::Duration;
18
19use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, META_LEASE_SECS};
20use common_meta::key::{CANDIDATES_ROOT, ELECTION_KEY};
21use common_telemetry::{error, info, warn};
22use etcd_client::{
23    Client, GetOptions, LeaderKey as EtcdLeaderKey, LeaseKeepAliveStream, LeaseKeeper, PutOptions,
24};
25use snafu::{OptionExt, ResultExt, ensure};
26use tokio::sync::broadcast;
27use tokio::sync::broadcast::Receiver;
28use tokio::time::{MissedTickBehavior, timeout};
29
30use crate::election::{
31    CANDIDATE_LEASE_SECS, Election, KEEP_ALIVE_INTERVAL_SECS, LeaderChangeMessage, LeaderKey,
32    listen_leader_change, send_leader_change_and_set_flags,
33};
34use crate::error;
35use crate::error::Result;
36use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};
37
38impl LeaderKey for EtcdLeaderKey {
39    fn name(&self) -> &[u8] {
40        self.name()
41    }
42
43    fn key(&self) -> &[u8] {
44        self.key()
45    }
46
47    fn revision(&self) -> i64 {
48        self.rev()
49    }
50
51    fn lease_id(&self) -> i64 {
52        self.lease()
53    }
54}
55
56pub struct EtcdElection {
57    leader_value: String,
58    client: Client,
59    is_leader: AtomicBool,
60    infancy: AtomicBool,
61    leader_watcher: broadcast::Sender<LeaderChangeMessage>,
62    store_key_prefix: String,
63}
64
65impl EtcdElection {
66    pub async fn with_etcd_client<E>(
67        leader_value: E,
68        client: Client,
69        store_key_prefix: String,
70    ) -> Result<ElectionRef>
71    where
72        E: AsRef<str>,
73    {
74        let leader_value: String = leader_value.as_ref().into();
75        let tx = listen_leader_change(leader_value.clone());
76        Ok(Arc::new(Self {
77            leader_value,
78            client,
79            is_leader: AtomicBool::new(false),
80            infancy: AtomicBool::new(false),
81            leader_watcher: tx,
82            store_key_prefix,
83        }))
84    }
85
86    fn election_key(&self) -> String {
87        format!("{}{}", self.store_key_prefix, ELECTION_KEY)
88    }
89
90    fn candidate_root(&self) -> String {
91        format!("{}{}", self.store_key_prefix, CANDIDATES_ROOT)
92    }
93
94    fn candidate_key(&self) -> String {
95        format!("{}{}", self.candidate_root(), self.leader_value)
96    }
97}
98
99#[async_trait::async_trait]
100impl Election for EtcdElection {
101    type Leader = LeaderValue;
102
103    fn is_leader(&self) -> bool {
104        self.is_leader.load(Ordering::Relaxed)
105    }
106
107    fn in_leader_infancy(&self) -> bool {
108        self.infancy
109            .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
110            .is_ok()
111    }
112
113    async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> {
114        let mut lease_client = self.client.lease_client();
115        let res: etcd_client::LeaseGrantResponse = lease_client
116            .grant(CANDIDATE_LEASE_SECS as i64, None)
117            .await
118            .context(error::EtcdFailedSnafu)?;
119        let lease_id = res.id();
120
121        // The register info: key is the candidate key, value is its node info(addr, version, git_commit).
122        let key = self.candidate_key().into_bytes();
123        let value = serde_json::to_string(node_info)
124            .with_context(|_| error::SerializeToJsonSnafu {
125                input: format!("{node_info:?}"),
126            })?
127            .into_bytes();
128        // Puts with the lease id
129        self.client
130            .kv_client()
131            .put(key, value, Some(PutOptions::new().with_lease(lease_id)))
132            .await
133            .context(error::EtcdFailedSnafu)?;
134
135        let (mut keeper, mut receiver) = lease_client
136            .keep_alive(lease_id)
137            .await
138            .context(error::EtcdFailedSnafu)?;
139
140        let mut keep_alive_interval =
141            tokio::time::interval(Duration::from_secs(KEEP_ALIVE_INTERVAL_SECS));
142
143        loop {
144            let _ = keep_alive_interval.tick().await;
145            keeper.keep_alive().await.context(error::EtcdFailedSnafu)?;
146
147            if let Some(res) = receiver.message().await.context(error::EtcdFailedSnafu)?
148                && res.ttl() <= 0
149            {
150                warn!("Candidate lease expired, key: {}", self.candidate_key());
151                break;
152            }
153        }
154
155        Ok(())
156    }
157
158    async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>> {
159        let key = self.candidate_root().into_bytes();
160        let res = self
161            .client
162            .kv_client()
163            .get(key, Some(GetOptions::new().with_prefix()))
164            .await
165            .context(error::EtcdFailedSnafu)?;
166
167        let mut nodes = Vec::with_capacity(res.kvs().len());
168        for kv in res.kvs() {
169            let node =
170                serde_json::from_slice::<MetasrvNodeInfo>(kv.value()).with_context(|_| {
171                    error::DeserializeFromJsonSnafu {
172                        input: String::from_utf8_lossy(kv.value()),
173                    }
174                })?;
175            nodes.push(node);
176        }
177
178        Ok(nodes)
179    }
180
181    async fn campaign(&self) -> Result<()> {
182        let mut lease_client = self.client.lease_client();
183        let mut election_client = self.client.election_client();
184        let res = lease_client
185            .grant(META_LEASE_SECS as i64, None)
186            .await
187            .context(error::EtcdFailedSnafu)?;
188        let lease_id = res.id();
189
190        info!("Election grant ttl: {:?}, lease: {:?}", res.ttl(), lease_id);
191
192        // Campaign, waits to acquire leadership in an election, returning
193        // a LeaderKey representing the leadership if successful.
194        //
195        // The method will be blocked until the election is won, and after
196        // passing the method, it is necessary to execute `keep_alive` immediately
197        // to confirm that it is a valid leader, because it is possible that the
198        // election's lease expires.
199        let res = election_client
200            .campaign(self.election_key(), self.leader_value.clone(), lease_id)
201            .await
202            .context(error::EtcdFailedSnafu)?;
203
204        if let Some(leader) = res.leader() {
205            let (mut keeper, mut receiver) = lease_client
206                .keep_alive(lease_id)
207                .await
208                .context(error::EtcdFailedSnafu)?;
209
210            let keep_lease_duration = Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS);
211            let mut keep_alive_interval = tokio::time::interval(keep_lease_duration);
212            keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
213            loop {
214                // The keep alive operation MUST be done in `META_KEEP_ALIVE_INTERVAL_SECS`.
215                match timeout(
216                    keep_lease_duration,
217                    self.keep_alive(&mut keeper, &mut receiver, leader.clone()),
218                )
219                .await
220                {
221                    Ok(Ok(())) => {
222                        let _ = keep_alive_interval.tick().await;
223                    }
224                    Ok(Err(err)) => {
225                        error!(err; "Failed to keep alive");
226                        break;
227                    }
228                    Err(_) => {
229                        error!("Refresh lease timeout");
230                        break;
231                    }
232                }
233            }
234
235            send_leader_change_and_set_flags(
236                &self.is_leader,
237                &self.infancy,
238                &self.leader_watcher,
239                LeaderChangeMessage::StepDown(Arc::new(leader.clone())),
240            );
241        }
242
243        Ok(())
244    }
245
246    async fn leader(&self) -> Result<LeaderValue> {
247        if self.is_leader.load(Ordering::Relaxed) {
248            Ok(self.leader_value.as_bytes().into())
249        } else {
250            let res = self
251                .client
252                .election_client()
253                .leader(self.election_key())
254                .await
255                .context(error::EtcdFailedSnafu)?;
256            let leader_value = res.kv().context(error::NoLeaderSnafu)?.value();
257            Ok(leader_value.into())
258        }
259    }
260
261    async fn resign(&self) -> Result<()> {
262        todo!()
263    }
264
265    fn subscribe_leader_change(&self) -> Receiver<LeaderChangeMessage> {
266        self.leader_watcher.subscribe()
267    }
268}
269
270impl EtcdElection {
271    async fn keep_alive(
272        &self,
273        keeper: &mut LeaseKeeper,
274        receiver: &mut LeaseKeepAliveStream,
275        leader: EtcdLeaderKey,
276    ) -> Result<()> {
277        keeper.keep_alive().await.context(error::EtcdFailedSnafu)?;
278        if let Some(res) = receiver.message().await.context(error::EtcdFailedSnafu)? {
279            ensure!(
280                res.ttl() > 0,
281                error::UnexpectedSnafu {
282                    violated: "Failed to refresh the lease",
283                }
284            );
285
286            // Only after a successful `keep_alive` is the leader considered official.
287            send_leader_change_and_set_flags(
288                &self.is_leader,
289                &self.infancy,
290                &self.leader_watcher,
291                LeaderChangeMessage::Elected(Arc::new(leader.clone())),
292            );
293        }
294
295        Ok(())
296    }
297}