common_meta/key/
runtime_switch.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use common_error::ext::BoxedError;
19use common_procedure::local::PauseAware;
20use moka::future::Cache;
21use snafu::ResultExt;
22
23use crate::error::{GetCacheSnafu, Result};
24use crate::key::{LEGACY_MAINTENANCE_KEY, MAINTENANCE_KEY, PAUSE_PROCEDURE_KEY, RECOVERY_MODE_KEY};
25use crate::kv_backend::KvBackendRef;
26use crate::rpc::store::{BatchDeleteRequest, PutRequest};
27
28pub type RuntimeSwitchManagerRef = Arc<RuntimeSwitchManager>;
29
30/// The runtime switch manager.
31///
32/// Used to enable or disable runtime switches.
33#[derive(Clone)]
34pub struct RuntimeSwitchManager {
35    kv_backend: KvBackendRef,
36    cache: Cache<Vec<u8>, Option<Vec<u8>>>,
37}
38
39#[async_trait::async_trait]
40impl PauseAware for RuntimeSwitchManager {
41    async fn is_paused(&self) -> std::result::Result<bool, BoxedError> {
42        self.is_procedure_paused().await.map_err(BoxedError::new)
43    }
44}
45
46const CACHE_TTL: Duration = Duration::from_secs(10);
47const MAX_CAPACITY: u64 = 32;
48
49impl RuntimeSwitchManager {
50    pub fn new(kv_backend: KvBackendRef) -> Self {
51        let cache = Cache::builder()
52            .time_to_live(CACHE_TTL)
53            .max_capacity(MAX_CAPACITY)
54            .build();
55        Self { kv_backend, cache }
56    }
57
58    async fn put_key(&self, key: &str) -> Result<()> {
59        let req = PutRequest {
60            key: Vec::from(key),
61            value: vec![],
62            prev_kv: false,
63        };
64        self.kv_backend.put(req).await?;
65        self.cache.invalidate(key.as_bytes()).await;
66        Ok(())
67    }
68
69    async fn delete_keys(&self, keys: &[&str]) -> Result<()> {
70        let req = BatchDeleteRequest::new()
71            .with_keys(keys.iter().map(|x| x.as_bytes().to_vec()).collect());
72        self.kv_backend.batch_delete(req).await?;
73        for key in keys {
74            self.cache.invalidate(key.as_bytes()).await;
75        }
76        Ok(())
77    }
78
79    /// Returns true if the key exists.
80    async fn exists(&self, key: &str) -> Result<bool> {
81        let key = key.as_bytes().to_vec();
82        let kv_backend = self.kv_backend.clone();
83        let value = self
84            .cache
85            .try_get_with(key.clone(), async move {
86                kv_backend.get(&key).await.map(|v| v.map(|v| v.value))
87            })
88            .await
89            .context(GetCacheSnafu)?;
90
91        Ok(value.is_some())
92    }
93
94    /// Enables maintenance mode.
95    pub async fn set_maintenance_mode(&self) -> Result<()> {
96        self.put_key(MAINTENANCE_KEY).await
97    }
98
99    /// Unsets maintenance mode.
100    pub async fn unset_maintenance_mode(&self) -> Result<()> {
101        self.delete_keys(&[MAINTENANCE_KEY, LEGACY_MAINTENANCE_KEY])
102            .await
103    }
104
105    /// Returns true if maintenance mode is enabled.
106    pub async fn maintenance_mode(&self) -> Result<bool> {
107        let exists = self.exists(MAINTENANCE_KEY).await?;
108        if exists {
109            return Ok(true);
110        }
111
112        let exists = self.exists(LEGACY_MAINTENANCE_KEY).await?;
113        if exists {
114            return Ok(true);
115        }
116
117        Ok(false)
118    }
119
120    // Pauses handling of incoming procedure requests.
121    pub async fn pasue_procedure(&self) -> Result<()> {
122        self.put_key(PAUSE_PROCEDURE_KEY).await
123    }
124
125    /// Resumes processing of incoming procedure requests.
126    pub async fn resume_procedure(&self) -> Result<()> {
127        self.delete_keys(&[PAUSE_PROCEDURE_KEY]).await
128    }
129
130    /// Returns true if the system is currently pausing incoming procedure requests.
131    pub async fn is_procedure_paused(&self) -> Result<bool> {
132        self.exists(PAUSE_PROCEDURE_KEY).await
133    }
134
135    /// Enables recovery mode.
136    pub async fn set_recovery_mode(&self) -> Result<()> {
137        self.put_key(RECOVERY_MODE_KEY).await
138    }
139
140    /// Unsets recovery mode.
141    pub async fn unset_recovery_mode(&self) -> Result<()> {
142        self.delete_keys(&[RECOVERY_MODE_KEY]).await
143    }
144
145    /// Returns true if the system is currently in recovery mode.
146    pub async fn recovery_mode(&self) -> Result<bool> {
147        self.exists(RECOVERY_MODE_KEY).await
148    }
149}
150
151#[cfg(test)]
152mod tests {
153    use std::sync::Arc;
154
155    use crate::key::runtime_switch::RuntimeSwitchManager;
156    use crate::key::{LEGACY_MAINTENANCE_KEY, MAINTENANCE_KEY};
157    use crate::kv_backend::memory::MemoryKvBackend;
158    use crate::kv_backend::KvBackend;
159    use crate::rpc::store::PutRequest;
160
161    #[tokio::test]
162    async fn test_runtime_switch_manager_basic() {
163        let runtime_switch_manager =
164            Arc::new(RuntimeSwitchManager::new(Arc::new(MemoryKvBackend::new())));
165        runtime_switch_manager
166            .put_key(MAINTENANCE_KEY)
167            .await
168            .unwrap();
169        let v = runtime_switch_manager
170            .cache
171            .get(MAINTENANCE_KEY.as_bytes())
172            .await;
173        assert!(v.is_none());
174        runtime_switch_manager
175            .exists(MAINTENANCE_KEY)
176            .await
177            .unwrap();
178        let v = runtime_switch_manager
179            .cache
180            .get(MAINTENANCE_KEY.as_bytes())
181            .await;
182        assert!(v.is_some());
183        runtime_switch_manager
184            .delete_keys(&[MAINTENANCE_KEY])
185            .await
186            .unwrap();
187        let v = runtime_switch_manager
188            .cache
189            .get(MAINTENANCE_KEY.as_bytes())
190            .await;
191        assert!(v.is_none());
192    }
193
194    #[tokio::test]
195    async fn test_runtime_switch_manager() {
196        let runtime_switch_manager =
197            Arc::new(RuntimeSwitchManager::new(Arc::new(MemoryKvBackend::new())));
198        assert!(!runtime_switch_manager.maintenance_mode().await.unwrap());
199        runtime_switch_manager.set_maintenance_mode().await.unwrap();
200        assert!(runtime_switch_manager.maintenance_mode().await.unwrap());
201        runtime_switch_manager
202            .unset_maintenance_mode()
203            .await
204            .unwrap();
205        assert!(!runtime_switch_manager.maintenance_mode().await.unwrap());
206    }
207
208    #[tokio::test]
209    async fn test_runtime_switch_manager_with_legacy_key() {
210        let kv_backend = Arc::new(MemoryKvBackend::new());
211        kv_backend
212            .put(PutRequest {
213                key: Vec::from(LEGACY_MAINTENANCE_KEY),
214                value: vec![],
215                prev_kv: false,
216            })
217            .await
218            .unwrap();
219        let runtime_switch_manager = Arc::new(RuntimeSwitchManager::new(kv_backend));
220        assert!(runtime_switch_manager.maintenance_mode().await.unwrap());
221        runtime_switch_manager
222            .unset_maintenance_mode()
223            .await
224            .unwrap();
225        assert!(!runtime_switch_manager.maintenance_mode().await.unwrap());
226        runtime_switch_manager.set_maintenance_mode().await.unwrap();
227        assert!(runtime_switch_manager.maintenance_mode().await.unwrap());
228    }
229
230    #[tokio::test]
231    async fn test_pasue_procedure() {
232        let runtime_switch_manager =
233            Arc::new(RuntimeSwitchManager::new(Arc::new(MemoryKvBackend::new())));
234        runtime_switch_manager.pasue_procedure().await.unwrap();
235        assert!(runtime_switch_manager.is_procedure_paused().await.unwrap());
236        runtime_switch_manager.resume_procedure().await.unwrap();
237        assert!(!runtime_switch_manager.is_procedure_paused().await.unwrap());
238    }
239
240    #[tokio::test]
241    async fn test_recovery_mode() {
242        let runtime_switch_manager =
243            Arc::new(RuntimeSwitchManager::new(Arc::new(MemoryKvBackend::new())));
244        assert!(!runtime_switch_manager.recovery_mode().await.unwrap());
245        runtime_switch_manager.set_recovery_mode().await.unwrap();
246        assert!(runtime_switch_manager.recovery_mode().await.unwrap());
247        runtime_switch_manager.unset_recovery_mode().await.unwrap();
248        assert!(!runtime_switch_manager.recovery_mode().await.unwrap());
249    }
250}