common_meta/
node_expiry_listener.rs1use std::sync::Mutex;
16use std::time::Duration;
17
18use common_telemetry::{debug, error, info, warn};
19use tokio::task::JoinHandle;
20use tokio::time::{interval, MissedTickBehavior};
21
22use crate::cluster::{NodeInfo, NodeInfoKey};
23use crate::error;
24use crate::kv_backend::ResettableKvBackendRef;
25use crate::leadership_notifier::LeadershipChangeListener;
26use crate::rpc::store::RangeRequest;
27use crate::rpc::KeyValue;
28
29pub struct NodeExpiryListener {
32 handle: Mutex<Option<JoinHandle<()>>>,
33 max_idle_time: Duration,
34 in_memory: ResettableKvBackendRef,
35}
36
37impl Drop for NodeExpiryListener {
38 fn drop(&mut self) {
39 self.stop();
40 }
41}
42
43impl NodeExpiryListener {
44 pub fn new(max_idle_time: Duration, in_memory: ResettableKvBackendRef) -> Self {
45 Self {
46 handle: Mutex::new(None),
47 max_idle_time,
48 in_memory,
49 }
50 }
51
52 async fn start(&self) {
53 let mut handle = self.handle.lock().unwrap();
54 if handle.is_none() {
55 let in_memory = self.in_memory.clone();
56
57 let max_idle_time = self.max_idle_time;
58 let ticker_loop = tokio::spawn(async move {
59 let mut interval = interval(Duration::from_secs(60));
61 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
62 loop {
63 interval.tick().await;
64 if let Err(e) = Self::clean_expired_nodes(&in_memory, max_idle_time).await {
65 error!(e; "Failed to clean expired node");
66 }
67 }
68 });
69 *handle = Some(ticker_loop);
70 }
71 }
72
73 fn stop(&self) {
74 if let Some(handle) = self.handle.lock().unwrap().take() {
75 handle.abort();
76 info!("Node expiry listener stopped")
77 }
78 }
79
80 async fn clean_expired_nodes(
82 in_memory: &ResettableKvBackendRef,
83 max_idle_time: Duration,
84 ) -> error::Result<()> {
85 let node_keys = Self::list_expired_nodes(in_memory, max_idle_time).await?;
86 for key in node_keys {
87 let key_bytes: Vec<u8> = (&key).into();
88 if let Err(e) = in_memory.delete(&key_bytes, false).await {
89 warn!(e; "Failed to delete expired node: {:?}", key_bytes);
90 } else {
91 debug!("Deleted expired node key: {:?}", key);
92 }
93 }
94 Ok(())
95 }
96
97 async fn list_expired_nodes(
99 in_memory: &ResettableKvBackendRef,
100 max_idle_time: Duration,
101 ) -> error::Result<impl Iterator<Item = NodeInfoKey>> {
102 let prefix = NodeInfoKey::key_prefix();
103 let req = RangeRequest::new().with_prefix(prefix);
104 let current_time_millis = common_time::util::current_time_millis();
105 let resp = in_memory.range(req).await?;
106 Ok(resp
107 .kvs
108 .into_iter()
109 .filter_map(move |KeyValue { key, value }| {
110 let Ok(info) = NodeInfo::try_from(value).inspect_err(|e| {
111 warn!(e; "Unrecognized node info value");
112 }) else {
113 return None;
114 };
115 if (current_time_millis - info.last_activity_ts) > max_idle_time.as_millis() as i64
116 {
117 NodeInfoKey::try_from(key)
118 .inspect_err(|e| {
119 warn!(e; "Unrecognized node info key: {:?}", info.peer);
120 })
121 .ok()
122 .inspect(|node_key| {
123 debug!("Found expired node: {:?}", node_key);
124 })
125 } else {
126 None
127 }
128 }))
129 }
130}
131
132#[async_trait::async_trait]
133impl LeadershipChangeListener for NodeExpiryListener {
134 fn name(&self) -> &str {
135 "NodeExpiryListener"
136 }
137
138 async fn on_leader_start(&self) -> error::Result<()> {
139 self.start().await;
140 info!(
141 "On leader start, node expiry listener started with max idle time: {:?}",
142 self.max_idle_time
143 );
144 Ok(())
145 }
146
147 async fn on_leader_stop(&self) -> error::Result<()> {
148 self.stop();
149 info!("On leader stop, node expiry listener stopped");
150 Ok(())
151 }
152}