common_meta/key/
runtime_switch.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use common_error::ext::BoxedError;
19use common_procedure::local::PauseAware;
20use moka::future::Cache;
21use snafu::ResultExt;
22
23use crate::error::{GetCacheSnafu, Result};
24use crate::key::{LEGACY_MAINTENANCE_KEY, MAINTENANCE_KEY, PAUSE_PROCEDURE_KEY};
25use crate::kv_backend::KvBackendRef;
26use crate::rpc::store::{BatchDeleteRequest, PutRequest};
27
28pub type RuntimeSwitchManagerRef = Arc<RuntimeSwitchManager>;
29
30/// The runtime switch manager.
31///
32/// Used to enable or disable runtime switches.
33#[derive(Clone)]
34pub struct RuntimeSwitchManager {
35    kv_backend: KvBackendRef,
36    cache: Cache<Vec<u8>, Option<Vec<u8>>>,
37}
38
39#[async_trait::async_trait]
40impl PauseAware for RuntimeSwitchManager {
41    async fn is_paused(&self) -> std::result::Result<bool, BoxedError> {
42        self.is_procedure_paused().await.map_err(BoxedError::new)
43    }
44}
45
46const CACHE_TTL: Duration = Duration::from_secs(10);
47const MAX_CAPACITY: u64 = 32;
48
49impl RuntimeSwitchManager {
50    pub fn new(kv_backend: KvBackendRef) -> Self {
51        let cache = Cache::builder()
52            .time_to_live(CACHE_TTL)
53            .max_capacity(MAX_CAPACITY)
54            .build();
55        Self { kv_backend, cache }
56    }
57
58    async fn put_key(&self, key: &str) -> Result<()> {
59        let req = PutRequest {
60            key: Vec::from(key),
61            value: vec![],
62            prev_kv: false,
63        };
64        self.kv_backend.put(req).await?;
65        self.cache.invalidate(key.as_bytes()).await;
66        Ok(())
67    }
68
69    async fn delete_keys(&self, keys: &[&str]) -> Result<()> {
70        let req = BatchDeleteRequest::new()
71            .with_keys(keys.iter().map(|x| x.as_bytes().to_vec()).collect());
72        self.kv_backend.batch_delete(req).await?;
73        for key in keys {
74            self.cache.invalidate(key.as_bytes()).await;
75        }
76        Ok(())
77    }
78
79    /// Returns true if the key exists.
80    async fn exists(&self, key: &str) -> Result<bool> {
81        let key = key.as_bytes().to_vec();
82        let kv_backend = self.kv_backend.clone();
83        let value = self
84            .cache
85            .try_get_with(key.clone(), async move {
86                kv_backend.get(&key).await.map(|v| v.map(|v| v.value))
87            })
88            .await
89            .context(GetCacheSnafu)?;
90
91        Ok(value.is_some())
92    }
93
94    /// Enables maintenance mode.
95    pub async fn set_maintenance_mode(&self) -> Result<()> {
96        self.put_key(MAINTENANCE_KEY).await
97    }
98
99    /// Unsets maintenance mode.
100    pub async fn unset_maintenance_mode(&self) -> Result<()> {
101        self.delete_keys(&[MAINTENANCE_KEY, LEGACY_MAINTENANCE_KEY])
102            .await
103    }
104
105    /// Returns true if maintenance mode is enabled.
106    pub async fn maintenance_mode(&self) -> Result<bool> {
107        let exists = self.exists(MAINTENANCE_KEY).await?;
108        if exists {
109            return Ok(true);
110        }
111
112        let exists = self.exists(LEGACY_MAINTENANCE_KEY).await?;
113        if exists {
114            return Ok(true);
115        }
116
117        Ok(false)
118    }
119
120    // Pauses handling of incoming procedure requests.
121    pub async fn pasue_procedure(&self) -> Result<()> {
122        self.put_key(PAUSE_PROCEDURE_KEY).await
123    }
124
125    /// Resumes processing of incoming procedure requests.
126    pub async fn resume_procedure(&self) -> Result<()> {
127        self.delete_keys(&[PAUSE_PROCEDURE_KEY]).await
128    }
129
130    /// Returns true if the system is currently pausing incoming procedure requests.
131    pub async fn is_procedure_paused(&self) -> Result<bool> {
132        self.exists(PAUSE_PROCEDURE_KEY).await
133    }
134}
135
136#[cfg(test)]
137mod tests {
138    use std::sync::Arc;
139
140    use crate::key::runtime_switch::RuntimeSwitchManager;
141    use crate::key::{LEGACY_MAINTENANCE_KEY, MAINTENANCE_KEY};
142    use crate::kv_backend::memory::MemoryKvBackend;
143    use crate::kv_backend::KvBackend;
144    use crate::rpc::store::PutRequest;
145
146    #[tokio::test]
147    async fn test_runtime_switch_manager_basic() {
148        let runtime_switch_manager =
149            Arc::new(RuntimeSwitchManager::new(Arc::new(MemoryKvBackend::new())));
150        runtime_switch_manager
151            .put_key(MAINTENANCE_KEY)
152            .await
153            .unwrap();
154        let v = runtime_switch_manager
155            .cache
156            .get(MAINTENANCE_KEY.as_bytes())
157            .await;
158        assert!(v.is_none());
159        runtime_switch_manager
160            .exists(MAINTENANCE_KEY)
161            .await
162            .unwrap();
163        let v = runtime_switch_manager
164            .cache
165            .get(MAINTENANCE_KEY.as_bytes())
166            .await;
167        assert!(v.is_some());
168        runtime_switch_manager
169            .delete_keys(&[MAINTENANCE_KEY])
170            .await
171            .unwrap();
172        let v = runtime_switch_manager
173            .cache
174            .get(MAINTENANCE_KEY.as_bytes())
175            .await;
176        assert!(v.is_none());
177    }
178
179    #[tokio::test]
180    async fn test_runtime_switch_manager() {
181        let runtime_switch_manager =
182            Arc::new(RuntimeSwitchManager::new(Arc::new(MemoryKvBackend::new())));
183        assert!(!runtime_switch_manager.maintenance_mode().await.unwrap());
184        runtime_switch_manager.set_maintenance_mode().await.unwrap();
185        assert!(runtime_switch_manager.maintenance_mode().await.unwrap());
186        runtime_switch_manager
187            .unset_maintenance_mode()
188            .await
189            .unwrap();
190        assert!(!runtime_switch_manager.maintenance_mode().await.unwrap());
191    }
192
193    #[tokio::test]
194    async fn test_runtime_switch_manager_with_legacy_key() {
195        let kv_backend = Arc::new(MemoryKvBackend::new());
196        kv_backend
197            .put(PutRequest {
198                key: Vec::from(LEGACY_MAINTENANCE_KEY),
199                value: vec![],
200                prev_kv: false,
201            })
202            .await
203            .unwrap();
204        let runtime_switch_manager = Arc::new(RuntimeSwitchManager::new(kv_backend));
205        assert!(runtime_switch_manager.maintenance_mode().await.unwrap());
206        runtime_switch_manager
207            .unset_maintenance_mode()
208            .await
209            .unwrap();
210        assert!(!runtime_switch_manager.maintenance_mode().await.unwrap());
211        runtime_switch_manager.set_maintenance_mode().await.unwrap();
212        assert!(runtime_switch_manager.maintenance_mode().await.unwrap());
213    }
214
215    #[tokio::test]
216    async fn test_pasue_procedure() {
217        let runtime_switch_manager =
218            Arc::new(RuntimeSwitchManager::new(Arc::new(MemoryKvBackend::new())));
219        runtime_switch_manager.pasue_procedure().await.unwrap();
220        assert!(runtime_switch_manager.is_procedure_paused().await.unwrap());
221        runtime_switch_manager.resume_procedure().await.unwrap();
222        assert!(!runtime_switch_manager.is_procedure_paused().await.unwrap());
223    }
224}