meta_srv/
election.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod etcd;
16#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
17pub mod rds;
18
19use std::fmt::{self, Debug};
20use std::sync::atomic::{AtomicBool, Ordering};
21use std::sync::Arc;
22
23use common_telemetry::{error, info, warn};
24use tokio::sync::broadcast::error::RecvError;
25use tokio::sync::broadcast::{self, Receiver, Sender};
26
27use crate::error::Result;
28use crate::metasrv::MetasrvNodeInfo;
29
30pub(crate) const CANDIDATE_LEASE_SECS: u64 = 600;
31const KEEP_ALIVE_INTERVAL_SECS: u64 = CANDIDATE_LEASE_SECS / 2;
32
33/// Messages sent when the leader changes.
34#[derive(Debug, Clone)]
35pub enum LeaderChangeMessage {
36    Elected(Arc<dyn LeaderKey>),
37    StepDown(Arc<dyn LeaderKey>),
38}
39
40/// LeaderKey is a key that represents the leader of metasrv.
41/// The structure is corresponding to [etcd_client::LeaderKey].
42pub trait LeaderKey: Send + Sync + Debug {
43    /// The name in byte. name is the election identifier that corresponds to the leadership key.
44    fn name(&self) -> &[u8];
45
46    /// The key in byte. key is an opaque key representing the ownership of the election. If the key
47    /// is deleted, then leadership is lost.
48    fn key(&self) -> &[u8];
49
50    /// The creation revision of the key.
51    fn revision(&self) -> i64;
52
53    /// The lease ID of the election leader.
54    fn lease_id(&self) -> i64;
55}
56
57impl fmt::Display for LeaderChangeMessage {
58    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59        let leader_key = match self {
60            LeaderChangeMessage::Elected(leader_key) => {
61                write!(f, "Elected(")?;
62                leader_key
63            }
64            LeaderChangeMessage::StepDown(leader_key) => {
65                write!(f, "StepDown(")?;
66                leader_key
67            }
68        };
69        write!(f, "LeaderKey {{ ")?;
70        write!(f, "name: {}", String::from_utf8_lossy(leader_key.name()))?;
71        write!(f, ", key: {}", String::from_utf8_lossy(leader_key.key()))?;
72        write!(f, ", rev: {}", leader_key.revision())?;
73        write!(f, ", lease: {}", leader_key.lease_id())?;
74        write!(f, " }})")
75    }
76}
77
78fn listen_leader_change(leader_value: String) -> Sender<LeaderChangeMessage> {
79    let (tx, mut rx) = broadcast::channel(100);
80    let _handle = common_runtime::spawn_global(async move {
81        loop {
82            match rx.recv().await {
83                Ok(msg) => match msg {
84                    LeaderChangeMessage::Elected(key) => {
85                        info!(
86                            "[{leader_value}] is elected as leader: {:?}, lease: {}",
87                            String::from_utf8_lossy(key.name()),
88                            key.lease_id()
89                        );
90                    }
91                    LeaderChangeMessage::StepDown(key) => {
92                        warn!(
93                            "[{leader_value}] is stepping down: {:?}, lease: {}",
94                            String::from_utf8_lossy(key.name()),
95                            key.lease_id()
96                        );
97                    }
98                },
99                Err(RecvError::Lagged(_)) => {
100                    warn!("Log printing is too slow or leader changed too fast!");
101                }
102                Err(RecvError::Closed) => break,
103            }
104        }
105    });
106    tx
107}
108
109/// Sends a leader change message to the channel and sets the `is_leader` flag.
110/// If a leader is elected, it will also set the `leader_infancy` flag to true.
111fn send_leader_change_and_set_flags(
112    is_leader: &AtomicBool,
113    leader_infancy: &AtomicBool,
114    tx: &Sender<LeaderChangeMessage>,
115    msg: LeaderChangeMessage,
116) {
117    let is_elected = matches!(msg, LeaderChangeMessage::Elected(_));
118    if is_leader
119        .compare_exchange(!is_elected, is_elected, Ordering::AcqRel, Ordering::Acquire)
120        .is_ok()
121    {
122        if is_elected {
123            leader_infancy.store(true, Ordering::Release);
124        }
125        if let Err(e) = tx.send(msg) {
126            error!(e; "Failed to send leader change message");
127        }
128    }
129}
130
131#[async_trait::async_trait]
132pub trait Election: Send + Sync {
133    type Leader;
134
135    /// Returns `true` if current node is the leader.
136    fn is_leader(&self) -> bool;
137
138    /// When a new leader is born, it may need some initialization
139    /// operations (asynchronous), this method tells us when these
140    /// initialization operations can be performed.
141    ///
142    /// note: a new leader will only return true on the first call.
143    fn in_leader_infancy(&self) -> bool;
144
145    /// Registers a candidate for the election.
146    async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()>;
147
148    /// Gets all candidates in the election.
149    async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>>;
150
151    /// Campaign waits to acquire leadership in an election.
152    ///
153    /// Multiple sessions can participate in the election,
154    /// but only one can be the leader at a time.
155    async fn campaign(&self) -> Result<()>;
156
157    /// Resets the campaign.
158    ///
159    /// Reset the client and the leader flag if needed.
160    async fn reset_campaign(&self) {}
161
162    /// Returns the leader value for the current election.
163    async fn leader(&self) -> Result<Self::Leader>;
164
165    /// Releases election leadership so other campaigners may
166    /// acquire leadership on the election.
167    async fn resign(&self) -> Result<()>;
168
169    fn subscribe_leader_change(&self) -> Receiver<LeaderChangeMessage>;
170}