meta_srv/
election.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod etcd;
16#[cfg(feature = "mysql_kvbackend")]
17pub mod mysql;
18#[cfg(feature = "pg_kvbackend")]
19pub mod postgres;
20
21use std::fmt::{self, Debug};
22use std::sync::Arc;
23
24use common_telemetry::{info, warn};
25use tokio::sync::broadcast::error::RecvError;
26use tokio::sync::broadcast::{self, Receiver, Sender};
27
28use crate::error::Result;
29use crate::metasrv::MetasrvNodeInfo;
30
31pub const ELECTION_KEY: &str = "__metasrv_election";
32pub const CANDIDATES_ROOT: &str = "__metasrv_election_candidates/";
33
34pub(crate) const CANDIDATE_LEASE_SECS: u64 = 600;
35const KEEP_ALIVE_INTERVAL_SECS: u64 = CANDIDATE_LEASE_SECS / 2;
36
37/// Messages sent when the leader changes.
38#[derive(Debug, Clone)]
39pub enum LeaderChangeMessage {
40    Elected(Arc<dyn LeaderKey>),
41    StepDown(Arc<dyn LeaderKey>),
42}
43
44/// LeaderKey is a key that represents the leader of metasrv.
45/// The structure is corresponding to [etcd_client::LeaderKey].
46pub trait LeaderKey: Send + Sync + Debug {
47    /// The name in byte. name is the election identifier that corresponds to the leadership key.
48    fn name(&self) -> &[u8];
49
50    /// The key in byte. key is an opaque key representing the ownership of the election. If the key
51    /// is deleted, then leadership is lost.
52    fn key(&self) -> &[u8];
53
54    /// The creation revision of the key.
55    fn revision(&self) -> i64;
56
57    /// The lease ID of the election leader.
58    fn lease_id(&self) -> i64;
59}
60
61impl fmt::Display for LeaderChangeMessage {
62    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63        let leader_key = match self {
64            LeaderChangeMessage::Elected(leader_key) => {
65                write!(f, "Elected(")?;
66                leader_key
67            }
68            LeaderChangeMessage::StepDown(leader_key) => {
69                write!(f, "StepDown(")?;
70                leader_key
71            }
72        };
73        write!(f, "LeaderKey {{ ")?;
74        write!(f, "name: {}", String::from_utf8_lossy(leader_key.name()))?;
75        write!(f, ", key: {}", String::from_utf8_lossy(leader_key.key()))?;
76        write!(f, ", rev: {}", leader_key.revision())?;
77        write!(f, ", lease: {}", leader_key.lease_id())?;
78        write!(f, " }})")
79    }
80}
81
82fn listen_leader_change(leader_value: String) -> Sender<LeaderChangeMessage> {
83    let (tx, mut rx) = broadcast::channel(100);
84    let _handle = common_runtime::spawn_global(async move {
85        loop {
86            match rx.recv().await {
87                Ok(msg) => match msg {
88                    LeaderChangeMessage::Elected(key) => {
89                        info!(
90                            "[{leader_value}] is elected as leader: {:?}, lease: {}",
91                            String::from_utf8_lossy(key.name()),
92                            key.lease_id()
93                        );
94                    }
95                    LeaderChangeMessage::StepDown(key) => {
96                        warn!(
97                            "[{leader_value}] is stepping down: {:?}, lease: {}",
98                            String::from_utf8_lossy(key.name()),
99                            key.lease_id()
100                        );
101                    }
102                },
103                Err(RecvError::Lagged(_)) => {
104                    warn!("Log printing is too slow or leader changed too fast!");
105                }
106                Err(RecvError::Closed) => break,
107            }
108        }
109    });
110    tx
111}
112
113#[async_trait::async_trait]
114pub trait Election: Send + Sync {
115    type Leader;
116
117    /// Returns `true` if current node is the leader.
118    fn is_leader(&self) -> bool;
119
120    /// When a new leader is born, it may need some initialization
121    /// operations (asynchronous), this method tells us when these
122    /// initialization operations can be performed.
123    ///
124    /// note: a new leader will only return true on the first call.
125    fn in_leader_infancy(&self) -> bool;
126
127    /// Registers a candidate for the election.
128    async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()>;
129
130    /// Gets all candidates in the election.
131    async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>>;
132
133    /// Campaign waits to acquire leadership in an election.
134    ///
135    /// Multiple sessions can participate in the election,
136    /// but only one can be the leader at a time.
137    async fn campaign(&self) -> Result<()>;
138
139    /// Returns the leader value for the current election.
140    async fn leader(&self) -> Result<Self::Leader>;
141
142    /// Releases election leadership so other campaigners may
143    /// acquire leadership on the election.
144    async fn resign(&self) -> Result<()>;
145
146    fn subscribe_leader_change(&self) -> Receiver<LeaderChangeMessage>;
147}