common_meta/
leadership_notifier.rs1use std::sync::{Arc, Mutex};
16
17use async_trait::async_trait;
18use common_telemetry::{error, info};
19
20use crate::error::Result;
21
22pub type LeadershipChangeNotifierCustomizerRef = Arc<dyn LeadershipChangeNotifierCustomizer>;
23
24pub trait LeadershipChangeNotifierCustomizer: Send + Sync {
26 fn customize(&self, notifier: &mut LeadershipChangeNotifier);
27
28 fn add_listener(&self, listener: Arc<dyn LeadershipChangeListener>);
29}
30
31#[async_trait]
33pub trait LeadershipChangeListener: Send + Sync {
34 fn name(&self) -> &str;
36
37 async fn on_leader_start(&self) -> Result<()>;
39
40 async fn on_leader_stop(&self) -> Result<()>;
42}
43
44#[derive(Default)]
46pub struct LeadershipChangeNotifier {
47 listeners: Vec<Arc<dyn LeadershipChangeListener>>,
48}
49
50#[derive(Default)]
51pub struct DefaultLeadershipChangeNotifierCustomizer {
52 listeners: Mutex<Vec<Arc<dyn LeadershipChangeListener>>>,
53}
54
55impl DefaultLeadershipChangeNotifierCustomizer {
56 pub fn new() -> Self {
57 Self {
58 listeners: Mutex::new(Vec::new()),
59 }
60 }
61}
62
63impl LeadershipChangeNotifierCustomizer for DefaultLeadershipChangeNotifierCustomizer {
64 fn customize(&self, notifier: &mut LeadershipChangeNotifier) {
65 info!("Customizing leadership change notifier");
66 let listeners = self.listeners.lock().unwrap().clone();
67 notifier.listeners.extend(listeners);
68 }
69
70 fn add_listener(&self, listener: Arc<dyn LeadershipChangeListener>) {
71 self.listeners.lock().unwrap().push(listener);
72 }
73}
74
75impl LeadershipChangeNotifier {
76 pub fn add_listener(&mut self, listener: Arc<dyn LeadershipChangeListener>) {
78 self.listeners.push(listener);
79 }
80
81 pub async fn notify_on_leader_start(&self) {
83 for listener in &self.listeners {
84 if let Err(err) = listener.on_leader_start().await {
85 error!(
86 err;
87 "Failed to notify listener: {}, event 'on_leader_start'",
88 listener.name()
89 );
90 }
91 }
92 }
93
94 pub async fn notify_on_leader_stop(&self) {
96 for listener in &self.listeners {
97 if let Err(err) = listener.on_leader_stop().await {
98 error!(
99 err;
100 "Failed to notify listener: {}, event: 'on_follower_start'",
101 listener.name()
102 );
103 }
104 }
105 }
106}
107
108#[cfg(test)]
109mod tests {
110 use std::sync::atomic::{AtomicBool, Ordering};
111 use std::sync::Arc;
112
113 use super::*;
114
115 struct MockListener {
116 name: String,
117 on_leader_start_fn: Option<Box<dyn Fn() -> Result<()> + Send + Sync>>,
118 on_follower_start_fn: Option<Box<dyn Fn() -> Result<()> + Send + Sync>>,
119 }
120
121 #[async_trait::async_trait]
122 impl LeadershipChangeListener for MockListener {
123 fn name(&self) -> &str {
124 &self.name
125 }
126
127 async fn on_leader_start(&self) -> Result<()> {
128 if let Some(f) = &self.on_leader_start_fn {
129 return f();
130 }
131 Ok(())
132 }
133
134 async fn on_leader_stop(&self) -> Result<()> {
135 if let Some(f) = &self.on_follower_start_fn {
136 return f();
137 }
138 Ok(())
139 }
140 }
141
142 #[tokio::test]
143 async fn test_leadership_change_notifier() {
144 let mut notifier = LeadershipChangeNotifier::default();
145 let listener1 = Arc::new(MockListener {
146 name: "listener1".to_string(),
147 on_leader_start_fn: None,
148 on_follower_start_fn: None,
149 });
150 let called_on_leader_start = Arc::new(AtomicBool::new(false));
151 let called_on_follower_start = Arc::new(AtomicBool::new(false));
152 let called_on_leader_start_moved = called_on_leader_start.clone();
153 let called_on_follower_start_moved = called_on_follower_start.clone();
154 let listener2 = Arc::new(MockListener {
155 name: "listener2".to_string(),
156 on_leader_start_fn: Some(Box::new(move || {
157 called_on_leader_start_moved.store(true, Ordering::Relaxed);
158 Ok(())
159 })),
160 on_follower_start_fn: Some(Box::new(move || {
161 called_on_follower_start_moved.store(true, Ordering::Relaxed);
162 Ok(())
163 })),
164 });
165
166 notifier.add_listener(listener1);
167 notifier.add_listener(listener2);
168
169 let listener1 = notifier.listeners.first().unwrap();
170 let listener2 = notifier.listeners.get(1).unwrap();
171
172 assert_eq!(listener1.name(), "listener1");
173 assert_eq!(listener2.name(), "listener2");
174
175 notifier.notify_on_leader_start().await;
176 assert!(!called_on_follower_start.load(Ordering::Relaxed));
177 assert!(called_on_leader_start.load(Ordering::Relaxed));
178
179 notifier.notify_on_leader_stop().await;
180 assert!(called_on_follower_start.load(Ordering::Relaxed));
181 assert!(called_on_leader_start.load(Ordering::Relaxed));
182 }
183}