common_meta/
leadership_notifier.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, Mutex};
16
17use async_trait::async_trait;
18use common_telemetry::{error, info};
19
20use crate::error::Result;
21
22pub type LeadershipChangeNotifierCustomizerRef = Arc<dyn LeadershipChangeNotifierCustomizer>;
23
24/// A trait for customizing the leadership change notifier.
25pub trait LeadershipChangeNotifierCustomizer: Send + Sync {
26    fn customize(&self, notifier: &mut LeadershipChangeNotifier);
27
28    fn add_listener(&self, listener: Arc<dyn LeadershipChangeListener>);
29}
30
31/// A trait for handling leadership change events in a distributed system.
32#[async_trait]
33pub trait LeadershipChangeListener: Send + Sync {
34    /// Returns the listener name.
35    fn name(&self) -> &str;
36
37    /// Called when the node transitions to the leader role.
38    async fn on_leader_start(&self) -> Result<()>;
39
40    /// Called when the node transitions to the follower role.
41    async fn on_leader_stop(&self) -> Result<()>;
42}
43
44/// A notifier for leadership change events.
45#[derive(Default)]
46pub struct LeadershipChangeNotifier {
47    listeners: Vec<Arc<dyn LeadershipChangeListener>>,
48}
49
50#[derive(Default)]
51pub struct DefaultLeadershipChangeNotifierCustomizer {
52    listeners: Mutex<Vec<Arc<dyn LeadershipChangeListener>>>,
53}
54
55impl DefaultLeadershipChangeNotifierCustomizer {
56    pub fn new() -> Self {
57        Self {
58            listeners: Mutex::new(Vec::new()),
59        }
60    }
61}
62
63impl LeadershipChangeNotifierCustomizer for DefaultLeadershipChangeNotifierCustomizer {
64    fn customize(&self, notifier: &mut LeadershipChangeNotifier) {
65        info!("Customizing leadership change notifier");
66        let listeners = self.listeners.lock().unwrap().clone();
67        notifier.listeners.extend(listeners);
68    }
69
70    fn add_listener(&self, listener: Arc<dyn LeadershipChangeListener>) {
71        self.listeners.lock().unwrap().push(listener);
72    }
73}
74
75impl LeadershipChangeNotifier {
76    /// Adds a listener to the notifier.
77    pub fn add_listener(&mut self, listener: Arc<dyn LeadershipChangeListener>) {
78        self.listeners.push(listener);
79    }
80
81    /// Notify all listeners that the node has become a leader.
82    pub async fn notify_on_leader_start(&self) {
83        for listener in &self.listeners {
84            if let Err(err) = listener.on_leader_start().await {
85                error!(
86                    err;
87                    "Failed to notify listener: {}, event 'on_leader_start'",
88                    listener.name()
89                );
90            }
91        }
92    }
93
94    /// Notify all listeners that the node has become a follower.
95    pub async fn notify_on_leader_stop(&self) {
96        for listener in &self.listeners {
97            if let Err(err) = listener.on_leader_stop().await {
98                error!(
99                    err;
100                    "Failed to notify listener: {}, event: 'on_follower_start'",
101                    listener.name()
102                );
103            }
104        }
105    }
106}
107
108#[cfg(test)]
109mod tests {
110    use std::sync::atomic::{AtomicBool, Ordering};
111    use std::sync::Arc;
112
113    use super::*;
114
115    struct MockListener {
116        name: String,
117        on_leader_start_fn: Option<Box<dyn Fn() -> Result<()> + Send + Sync>>,
118        on_follower_start_fn: Option<Box<dyn Fn() -> Result<()> + Send + Sync>>,
119    }
120
121    #[async_trait::async_trait]
122    impl LeadershipChangeListener for MockListener {
123        fn name(&self) -> &str {
124            &self.name
125        }
126
127        async fn on_leader_start(&self) -> Result<()> {
128            if let Some(f) = &self.on_leader_start_fn {
129                return f();
130            }
131            Ok(())
132        }
133
134        async fn on_leader_stop(&self) -> Result<()> {
135            if let Some(f) = &self.on_follower_start_fn {
136                return f();
137            }
138            Ok(())
139        }
140    }
141
142    #[tokio::test]
143    async fn test_leadership_change_notifier() {
144        let mut notifier = LeadershipChangeNotifier::default();
145        let listener1 = Arc::new(MockListener {
146            name: "listener1".to_string(),
147            on_leader_start_fn: None,
148            on_follower_start_fn: None,
149        });
150        let called_on_leader_start = Arc::new(AtomicBool::new(false));
151        let called_on_follower_start = Arc::new(AtomicBool::new(false));
152        let called_on_leader_start_moved = called_on_leader_start.clone();
153        let called_on_follower_start_moved = called_on_follower_start.clone();
154        let listener2 = Arc::new(MockListener {
155            name: "listener2".to_string(),
156            on_leader_start_fn: Some(Box::new(move || {
157                called_on_leader_start_moved.store(true, Ordering::Relaxed);
158                Ok(())
159            })),
160            on_follower_start_fn: Some(Box::new(move || {
161                called_on_follower_start_moved.store(true, Ordering::Relaxed);
162                Ok(())
163            })),
164        });
165
166        notifier.add_listener(listener1);
167        notifier.add_listener(listener2);
168
169        let listener1 = notifier.listeners.first().unwrap();
170        let listener2 = notifier.listeners.get(1).unwrap();
171
172        assert_eq!(listener1.name(), "listener1");
173        assert_eq!(listener2.name(), "listener2");
174
175        notifier.notify_on_leader_start().await;
176        assert!(!called_on_follower_start.load(Ordering::Relaxed));
177        assert!(called_on_leader_start.load(Ordering::Relaxed));
178
179        notifier.notify_on_leader_stop().await;
180        assert!(called_on_follower_start.load(Ordering::Relaxed));
181        assert!(called_on_leader_start.load(Ordering::Relaxed));
182    }
183}