common_meta/key/
runtime_switch.rs1use std::sync::Arc;
16use std::time::Duration;
17
18use common_error::ext::BoxedError;
19use common_procedure::local::PauseAware;
20use moka::future::Cache;
21use snafu::ResultExt;
22
23use crate::error::{GetCacheSnafu, Result};
24use crate::key::{LEGACY_MAINTENANCE_KEY, MAINTENANCE_KEY, PAUSE_PROCEDURE_KEY, RECOVERY_MODE_KEY};
25use crate::kv_backend::KvBackendRef;
26use crate::rpc::store::{BatchDeleteRequest, PutRequest};
27
28pub type RuntimeSwitchManagerRef = Arc<RuntimeSwitchManager>;
29
30#[derive(Clone)]
34pub struct RuntimeSwitchManager {
35 kv_backend: KvBackendRef,
36 cache: Cache<Vec<u8>, Option<Vec<u8>>>,
37}
38
39#[async_trait::async_trait]
40impl PauseAware for RuntimeSwitchManager {
41 async fn is_paused(&self) -> std::result::Result<bool, BoxedError> {
42 self.is_procedure_paused().await.map_err(BoxedError::new)
43 }
44}
45
46const CACHE_TTL: Duration = Duration::from_secs(10);
47const MAX_CAPACITY: u64 = 32;
48
49impl RuntimeSwitchManager {
50 pub fn new(kv_backend: KvBackendRef) -> Self {
51 let cache = Cache::builder()
52 .time_to_live(CACHE_TTL)
53 .max_capacity(MAX_CAPACITY)
54 .build();
55 Self { kv_backend, cache }
56 }
57
58 async fn put_key(&self, key: &str) -> Result<()> {
59 let req = PutRequest {
60 key: Vec::from(key),
61 value: vec![],
62 prev_kv: false,
63 };
64 self.kv_backend.put(req).await?;
65 self.cache.invalidate(key.as_bytes()).await;
66 Ok(())
67 }
68
69 async fn delete_keys(&self, keys: &[&str]) -> Result<()> {
70 let req = BatchDeleteRequest::new()
71 .with_keys(keys.iter().map(|x| x.as_bytes().to_vec()).collect());
72 self.kv_backend.batch_delete(req).await?;
73 for key in keys {
74 self.cache.invalidate(key.as_bytes()).await;
75 }
76 Ok(())
77 }
78
79 async fn exists(&self, key: &str) -> Result<bool> {
81 let key = key.as_bytes().to_vec();
82 let kv_backend = self.kv_backend.clone();
83 let value = self
84 .cache
85 .try_get_with(key.clone(), async move {
86 kv_backend.get(&key).await.map(|v| v.map(|v| v.value))
87 })
88 .await
89 .context(GetCacheSnafu)?;
90
91 Ok(value.is_some())
92 }
93
94 pub async fn set_maintenance_mode(&self) -> Result<()> {
96 self.put_key(MAINTENANCE_KEY).await
97 }
98
99 pub async fn unset_maintenance_mode(&self) -> Result<()> {
101 self.delete_keys(&[MAINTENANCE_KEY, LEGACY_MAINTENANCE_KEY])
102 .await
103 }
104
105 pub async fn maintenance_mode(&self) -> Result<bool> {
107 let exists = self.exists(MAINTENANCE_KEY).await?;
108 if exists {
109 return Ok(true);
110 }
111
112 let exists = self.exists(LEGACY_MAINTENANCE_KEY).await?;
113 if exists {
114 return Ok(true);
115 }
116
117 Ok(false)
118 }
119
120 pub async fn pasue_procedure(&self) -> Result<()> {
122 self.put_key(PAUSE_PROCEDURE_KEY).await
123 }
124
125 pub async fn resume_procedure(&self) -> Result<()> {
127 self.delete_keys(&[PAUSE_PROCEDURE_KEY]).await
128 }
129
130 pub async fn is_procedure_paused(&self) -> Result<bool> {
132 self.exists(PAUSE_PROCEDURE_KEY).await
133 }
134
135 pub async fn set_recovery_mode(&self) -> Result<()> {
137 self.put_key(RECOVERY_MODE_KEY).await
138 }
139
140 pub async fn unset_recovery_mode(&self) -> Result<()> {
142 self.delete_keys(&[RECOVERY_MODE_KEY]).await
143 }
144
145 pub async fn recovery_mode(&self) -> Result<bool> {
147 self.exists(RECOVERY_MODE_KEY).await
148 }
149}
150
151#[cfg(test)]
152mod tests {
153 use std::sync::Arc;
154
155 use crate::key::runtime_switch::RuntimeSwitchManager;
156 use crate::key::{LEGACY_MAINTENANCE_KEY, MAINTENANCE_KEY};
157 use crate::kv_backend::memory::MemoryKvBackend;
158 use crate::kv_backend::KvBackend;
159 use crate::rpc::store::PutRequest;
160
161 #[tokio::test]
162 async fn test_runtime_switch_manager_basic() {
163 let runtime_switch_manager =
164 Arc::new(RuntimeSwitchManager::new(Arc::new(MemoryKvBackend::new())));
165 runtime_switch_manager
166 .put_key(MAINTENANCE_KEY)
167 .await
168 .unwrap();
169 let v = runtime_switch_manager
170 .cache
171 .get(MAINTENANCE_KEY.as_bytes())
172 .await;
173 assert!(v.is_none());
174 runtime_switch_manager
175 .exists(MAINTENANCE_KEY)
176 .await
177 .unwrap();
178 let v = runtime_switch_manager
179 .cache
180 .get(MAINTENANCE_KEY.as_bytes())
181 .await;
182 assert!(v.is_some());
183 runtime_switch_manager
184 .delete_keys(&[MAINTENANCE_KEY])
185 .await
186 .unwrap();
187 let v = runtime_switch_manager
188 .cache
189 .get(MAINTENANCE_KEY.as_bytes())
190 .await;
191 assert!(v.is_none());
192 }
193
194 #[tokio::test]
195 async fn test_runtime_switch_manager() {
196 let runtime_switch_manager =
197 Arc::new(RuntimeSwitchManager::new(Arc::new(MemoryKvBackend::new())));
198 assert!(!runtime_switch_manager.maintenance_mode().await.unwrap());
199 runtime_switch_manager.set_maintenance_mode().await.unwrap();
200 assert!(runtime_switch_manager.maintenance_mode().await.unwrap());
201 runtime_switch_manager
202 .unset_maintenance_mode()
203 .await
204 .unwrap();
205 assert!(!runtime_switch_manager.maintenance_mode().await.unwrap());
206 }
207
208 #[tokio::test]
209 async fn test_runtime_switch_manager_with_legacy_key() {
210 let kv_backend = Arc::new(MemoryKvBackend::new());
211 kv_backend
212 .put(PutRequest {
213 key: Vec::from(LEGACY_MAINTENANCE_KEY),
214 value: vec![],
215 prev_kv: false,
216 })
217 .await
218 .unwrap();
219 let runtime_switch_manager = Arc::new(RuntimeSwitchManager::new(kv_backend));
220 assert!(runtime_switch_manager.maintenance_mode().await.unwrap());
221 runtime_switch_manager
222 .unset_maintenance_mode()
223 .await
224 .unwrap();
225 assert!(!runtime_switch_manager.maintenance_mode().await.unwrap());
226 runtime_switch_manager.set_maintenance_mode().await.unwrap();
227 assert!(runtime_switch_manager.maintenance_mode().await.unwrap());
228 }
229
230 #[tokio::test]
231 async fn test_pasue_procedure() {
232 let runtime_switch_manager =
233 Arc::new(RuntimeSwitchManager::new(Arc::new(MemoryKvBackend::new())));
234 runtime_switch_manager.pasue_procedure().await.unwrap();
235 assert!(runtime_switch_manager.is_procedure_paused().await.unwrap());
236 runtime_switch_manager.resume_procedure().await.unwrap();
237 assert!(!runtime_switch_manager.is_procedure_paused().await.unwrap());
238 }
239
240 #[tokio::test]
241 async fn test_recovery_mode() {
242 let runtime_switch_manager =
243 Arc::new(RuntimeSwitchManager::new(Arc::new(MemoryKvBackend::new())));
244 assert!(!runtime_switch_manager.recovery_mode().await.unwrap());
245 runtime_switch_manager.set_recovery_mode().await.unwrap();
246 assert!(runtime_switch_manager.recovery_mode().await.unwrap());
247 runtime_switch_manager.unset_recovery_mode().await.unwrap();
248 assert!(!runtime_switch_manager.recovery_mode().await.unwrap());
249 }
250}