Skip to main content

common_meta/
election.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod etcd;
16#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
17pub mod rds;
18
19use std::fmt::{self, Debug};
20use std::sync::Arc;
21use std::sync::atomic::{AtomicBool, Ordering};
22
23use common_telemetry::{error, info, warn};
24use serde::{Deserialize, Serialize};
25use tokio::sync::broadcast::error::RecvError;
26use tokio::sync::broadcast::{self, Receiver, Sender};
27
28use crate::error::Result;
29
30pub const CANDIDATE_LEASE_SECS: u64 = 600;
31const KEEP_ALIVE_INTERVAL_SECS: u64 = CANDIDATE_LEASE_SECS / 2;
32
33/// The value of the leader. It is used to store the leader's address.
34pub struct LeaderValue(pub String);
35
36impl<T: AsRef<[u8]>> From<T> for LeaderValue {
37    fn from(value: T) -> Self {
38        let string = String::from_utf8_lossy(value.as_ref());
39        Self(string.to_string())
40    }
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct MetasrvNodeInfo {
45    // The metasrv's address
46    pub addr: String,
47    // The node build version
48    pub version: String,
49    // The node build git commit hash
50    pub git_commit: String,
51    // The node start timestamp in milliseconds
52    pub start_time_ms: u64,
53    // The node total cpu millicores
54    #[serde(default)]
55    pub total_cpu_millicores: i64,
56    // The node total memory bytes
57    #[serde(default)]
58    pub total_memory_bytes: i64,
59    /// The node build cpu usage millicores
60    #[serde(default)]
61    pub cpu_usage_millicores: i64,
62    /// The node build memory usage bytes
63    #[serde(default)]
64    pub memory_usage_bytes: i64,
65    // The node hostname
66    #[serde(default)]
67    pub hostname: String,
68}
69
70// TODO(zyy17): Allow deprecated fields for backward compatibility. Remove this when the deprecated top-level fields are removed from the proto.
71#[allow(deprecated)]
72impl From<MetasrvNodeInfo> for api::v1::meta::MetasrvNodeInfo {
73    fn from(node_info: MetasrvNodeInfo) -> Self {
74        Self {
75            peer: Some(api::v1::meta::Peer {
76                addr: node_info.addr,
77                ..Default::default()
78            }),
79            // TODO(zyy17): The following top-level fields are deprecated. They are kept for backward compatibility and will be removed in a future version.
80            // New code should use the fields in `info.NodeInfo` instead.
81            version: node_info.version.clone(),
82            git_commit: node_info.git_commit.clone(),
83            start_time_ms: node_info.start_time_ms,
84            cpus: node_info.total_cpu_millicores as u32,
85            memory_bytes: node_info.total_memory_bytes as u64,
86            // The canonical location for node information.
87            info: Some(api::v1::meta::NodeInfo {
88                version: node_info.version,
89                git_commit: node_info.git_commit,
90                start_time_ms: node_info.start_time_ms,
91                total_cpu_millicores: node_info.total_cpu_millicores,
92                total_memory_bytes: node_info.total_memory_bytes,
93                cpu_usage_millicores: node_info.cpu_usage_millicores,
94                memory_usage_bytes: node_info.memory_usage_bytes,
95                cpus: node_info.total_cpu_millicores as u32,
96                memory_bytes: node_info.total_memory_bytes as u64,
97                hostname: node_info.hostname,
98            }),
99        }
100    }
101}
102
103/// Messages sent when the leader changes.
104#[derive(Debug, Clone)]
105pub enum LeaderChangeMessage {
106    Elected(Arc<dyn LeaderKey>),
107    StepDown(Arc<dyn LeaderKey>),
108}
109
110/// LeaderKey is a key that represents the leader of metasrv.
111/// The structure is corresponding to [etcd_client::LeaderKey].
112pub trait LeaderKey: Send + Sync + Debug {
113    /// The name in byte. name is the election identifier that corresponds to the leadership key.
114    fn name(&self) -> &[u8];
115
116    /// The key in byte. key is an opaque key representing the ownership of the election. If the key
117    /// is deleted, then leadership is lost.
118    fn key(&self) -> &[u8];
119
120    /// The creation revision of the key.
121    fn revision(&self) -> i64;
122
123    /// The lease ID of the election leader.
124    fn lease_id(&self) -> i64;
125}
126
127impl fmt::Display for LeaderChangeMessage {
128    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
129        let leader_key = match self {
130            LeaderChangeMessage::Elected(leader_key) => {
131                write!(f, "Elected(")?;
132                leader_key
133            }
134            LeaderChangeMessage::StepDown(leader_key) => {
135                write!(f, "StepDown(")?;
136                leader_key
137            }
138        };
139        write!(f, "LeaderKey {{ ")?;
140        write!(f, "name: {}", String::from_utf8_lossy(leader_key.name()))?;
141        write!(f, ", key: {}", String::from_utf8_lossy(leader_key.key()))?;
142        write!(f, ", rev: {}", leader_key.revision())?;
143        write!(f, ", lease: {}", leader_key.lease_id())?;
144        write!(f, " }})")
145    }
146}
147
148fn listen_leader_change(leader_value: String) -> Sender<LeaderChangeMessage> {
149    let (tx, mut rx) = broadcast::channel(100);
150    let _handle = common_runtime::spawn_global(async move {
151        loop {
152            match rx.recv().await {
153                Ok(msg) => match msg {
154                    LeaderChangeMessage::Elected(key) => {
155                        info!(
156                            "[{leader_value}] is elected as leader: {:?}, lease: {}",
157                            String::from_utf8_lossy(key.name()),
158                            key.lease_id()
159                        );
160                    }
161                    LeaderChangeMessage::StepDown(key) => {
162                        warn!(
163                            "[{leader_value}] is stepping down: {:?}, lease: {}",
164                            String::from_utf8_lossy(key.name()),
165                            key.lease_id()
166                        );
167                    }
168                },
169                Err(RecvError::Lagged(_)) => {
170                    warn!("Log printing is too slow or leader changed too fast!");
171                }
172                Err(RecvError::Closed) => break,
173            }
174        }
175    });
176    tx
177}
178
179/// Sends a leader change message to the channel and sets the `is_leader` flag.
180/// If a leader is elected, it will also set the `leader_infancy` flag to true.
181fn send_leader_change_and_set_flags(
182    is_leader: &AtomicBool,
183    leader_infancy: &AtomicBool,
184    tx: &Sender<LeaderChangeMessage>,
185    msg: LeaderChangeMessage,
186) {
187    let is_elected = matches!(msg, LeaderChangeMessage::Elected(_));
188    if is_leader
189        .compare_exchange(!is_elected, is_elected, Ordering::AcqRel, Ordering::Acquire)
190        .is_ok()
191    {
192        if is_elected {
193            leader_infancy.store(true, Ordering::Release);
194        }
195        if let Err(e) = tx.send(msg) {
196            error!(e; "Failed to send leader change message");
197        }
198    }
199}
200
201#[async_trait::async_trait]
202pub trait Election: Send + Sync {
203    type Leader;
204
205    /// Returns `true` if current node is the leader.
206    fn is_leader(&self) -> bool;
207
208    /// When a new leader is born, it may need some initialization
209    /// operations (asynchronous), this method tells us when these
210    /// initialization operations can be performed.
211    ///
212    /// note: a new leader will only return true on the first call.
213    fn in_leader_infancy(&self) -> bool;
214
215    /// Registers a candidate for the election.
216    async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()>;
217
218    /// Gets all candidates in the election.
219    async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>>;
220
221    /// Campaign waits to acquire leadership in an election.
222    ///
223    /// Multiple sessions can participate in the election,
224    /// but only one can be the leader at a time.
225    async fn campaign(&self) -> Result<()>;
226
227    /// Resets the campaign.
228    ///
229    /// Reset the client and the leader flag if needed.
230    async fn reset_campaign(&self) {}
231
232    /// Returns the leader value for the current election.
233    async fn leader(&self) -> Result<Self::Leader>;
234
235    /// Releases election leadership so other campaigners may
236    /// acquire leadership on the election.
237    async fn resign(&self) -> Result<()>;
238
239    fn subscribe_leader_change(&self) -> Receiver<LeaderChangeMessage>;
240}
241
242pub type ElectionRef = Arc<dyn Election<Leader = LeaderValue>>;