common_meta/
node_expiry_listener.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Mutex;
16use std::time::Duration;
17
18use common_telemetry::{debug, error, info, warn};
19use tokio::task::JoinHandle;
20use tokio::time::{interval, MissedTickBehavior};
21
22use crate::cluster::{NodeInfo, NodeInfoKey};
23use crate::error;
24use crate::kv_backend::ResettableKvBackendRef;
25use crate::leadership_notifier::LeadershipChangeListener;
26use crate::rpc::store::RangeRequest;
27use crate::rpc::KeyValue;
28
29/// [NodeExpiryListener] periodically checks all node info in memory and removes
30/// expired node info to prevent memory leak.
31pub struct NodeExpiryListener {
32    handle: Mutex<Option<JoinHandle<()>>>,
33    max_idle_time: Duration,
34    in_memory: ResettableKvBackendRef,
35}
36
37impl Drop for NodeExpiryListener {
38    fn drop(&mut self) {
39        self.stop();
40    }
41}
42
43impl NodeExpiryListener {
44    pub fn new(max_idle_time: Duration, in_memory: ResettableKvBackendRef) -> Self {
45        Self {
46            handle: Mutex::new(None),
47            max_idle_time,
48            in_memory,
49        }
50    }
51
52    async fn start(&self) {
53        let mut handle = self.handle.lock().unwrap();
54        if handle.is_none() {
55            let in_memory = self.in_memory.clone();
56
57            let max_idle_time = self.max_idle_time;
58            let ticker_loop = tokio::spawn(async move {
59                // Run clean task every minute.
60                let mut interval = interval(Duration::from_secs(60));
61                interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
62                loop {
63                    interval.tick().await;
64                    if let Err(e) = Self::clean_expired_nodes(&in_memory, max_idle_time).await {
65                        error!(e; "Failed to clean expired node");
66                    }
67                }
68            });
69            *handle = Some(ticker_loop);
70        }
71    }
72
73    fn stop(&self) {
74        if let Some(handle) = self.handle.lock().unwrap().take() {
75            handle.abort();
76            info!("Node expiry listener stopped")
77        }
78    }
79
80    /// Cleans expired nodes from memory.
81    async fn clean_expired_nodes(
82        in_memory: &ResettableKvBackendRef,
83        max_idle_time: Duration,
84    ) -> error::Result<()> {
85        let node_keys = Self::list_expired_nodes(in_memory, max_idle_time).await?;
86        for key in node_keys {
87            let key_bytes: Vec<u8> = (&key).into();
88            if let Err(e) = in_memory.delete(&key_bytes, false).await {
89                warn!(e; "Failed to delete expired node: {:?}", key_bytes);
90            } else {
91                debug!("Deleted expired node key: {:?}", key);
92            }
93        }
94        Ok(())
95    }
96
97    /// Lists expired nodes that have been inactive more than `max_idle_time`.
98    async fn list_expired_nodes(
99        in_memory: &ResettableKvBackendRef,
100        max_idle_time: Duration,
101    ) -> error::Result<impl Iterator<Item = NodeInfoKey>> {
102        let prefix = NodeInfoKey::key_prefix();
103        let req = RangeRequest::new().with_prefix(prefix);
104        let current_time_millis = common_time::util::current_time_millis();
105        let resp = in_memory.range(req).await?;
106        Ok(resp
107            .kvs
108            .into_iter()
109            .filter_map(move |KeyValue { key, value }| {
110                let Ok(info) = NodeInfo::try_from(value).inspect_err(|e| {
111                    warn!(e; "Unrecognized node info value");
112                }) else {
113                    return None;
114                };
115                if (current_time_millis - info.last_activity_ts) > max_idle_time.as_millis() as i64
116                {
117                    NodeInfoKey::try_from(key)
118                        .inspect_err(|e| {
119                            warn!(e; "Unrecognized node info key: {:?}", info.peer);
120                        })
121                        .ok()
122                        .inspect(|node_key| {
123                            debug!("Found expired node: {:?}", node_key);
124                        })
125                } else {
126                    None
127                }
128            }))
129    }
130}
131
132#[async_trait::async_trait]
133impl LeadershipChangeListener for NodeExpiryListener {
134    fn name(&self) -> &str {
135        "NodeExpiryListener"
136    }
137
138    async fn on_leader_start(&self) -> error::Result<()> {
139        self.start().await;
140        info!(
141            "On leader start, node expiry listener started with max idle time: {:?}",
142            self.max_idle_time
143        );
144        Ok(())
145    }
146
147    async fn on_leader_stop(&self) -> error::Result<()> {
148        self.stop();
149        info!("On leader stop, node expiry listener stopped");
150        Ok(())
151    }
152}