common_meta/key/
runtime_switch.rs1use std::sync::Arc;
16use std::time::Duration;
17
18use common_error::ext::BoxedError;
19use common_procedure::local::PauseAware;
20use moka::future::Cache;
21use snafu::ResultExt;
22
23use crate::error::{GetCacheSnafu, Result};
24use crate::key::{LEGACY_MAINTENANCE_KEY, MAINTENANCE_KEY, PAUSE_PROCEDURE_KEY};
25use crate::kv_backend::KvBackendRef;
26use crate::rpc::store::{BatchDeleteRequest, PutRequest};
27
28pub type RuntimeSwitchManagerRef = Arc<RuntimeSwitchManager>;
29
30#[derive(Clone)]
34pub struct RuntimeSwitchManager {
35 kv_backend: KvBackendRef,
36 cache: Cache<Vec<u8>, Option<Vec<u8>>>,
37}
38
39#[async_trait::async_trait]
40impl PauseAware for RuntimeSwitchManager {
41 async fn is_paused(&self) -> std::result::Result<bool, BoxedError> {
42 self.is_procedure_paused().await.map_err(BoxedError::new)
43 }
44}
45
46const CACHE_TTL: Duration = Duration::from_secs(10);
47const MAX_CAPACITY: u64 = 32;
48
49impl RuntimeSwitchManager {
50 pub fn new(kv_backend: KvBackendRef) -> Self {
51 let cache = Cache::builder()
52 .time_to_live(CACHE_TTL)
53 .max_capacity(MAX_CAPACITY)
54 .build();
55 Self { kv_backend, cache }
56 }
57
58 async fn put_key(&self, key: &str) -> Result<()> {
59 let req = PutRequest {
60 key: Vec::from(key),
61 value: vec![],
62 prev_kv: false,
63 };
64 self.kv_backend.put(req).await?;
65 self.cache.invalidate(key.as_bytes()).await;
66 Ok(())
67 }
68
69 async fn delete_keys(&self, keys: &[&str]) -> Result<()> {
70 let req = BatchDeleteRequest::new()
71 .with_keys(keys.iter().map(|x| x.as_bytes().to_vec()).collect());
72 self.kv_backend.batch_delete(req).await?;
73 for key in keys {
74 self.cache.invalidate(key.as_bytes()).await;
75 }
76 Ok(())
77 }
78
79 async fn exists(&self, key: &str) -> Result<bool> {
81 let key = key.as_bytes().to_vec();
82 let kv_backend = self.kv_backend.clone();
83 let value = self
84 .cache
85 .try_get_with(key.clone(), async move {
86 kv_backend.get(&key).await.map(|v| v.map(|v| v.value))
87 })
88 .await
89 .context(GetCacheSnafu)?;
90
91 Ok(value.is_some())
92 }
93
94 pub async fn set_maintenance_mode(&self) -> Result<()> {
96 self.put_key(MAINTENANCE_KEY).await
97 }
98
99 pub async fn unset_maintenance_mode(&self) -> Result<()> {
101 self.delete_keys(&[MAINTENANCE_KEY, LEGACY_MAINTENANCE_KEY])
102 .await
103 }
104
105 pub async fn maintenance_mode(&self) -> Result<bool> {
107 let exists = self.exists(MAINTENANCE_KEY).await?;
108 if exists {
109 return Ok(true);
110 }
111
112 let exists = self.exists(LEGACY_MAINTENANCE_KEY).await?;
113 if exists {
114 return Ok(true);
115 }
116
117 Ok(false)
118 }
119
120 pub async fn pasue_procedure(&self) -> Result<()> {
122 self.put_key(PAUSE_PROCEDURE_KEY).await
123 }
124
125 pub async fn resume_procedure(&self) -> Result<()> {
127 self.delete_keys(&[PAUSE_PROCEDURE_KEY]).await
128 }
129
130 pub async fn is_procedure_paused(&self) -> Result<bool> {
132 self.exists(PAUSE_PROCEDURE_KEY).await
133 }
134}
135
136#[cfg(test)]
137mod tests {
138 use std::sync::Arc;
139
140 use crate::key::runtime_switch::RuntimeSwitchManager;
141 use crate::key::{LEGACY_MAINTENANCE_KEY, MAINTENANCE_KEY};
142 use crate::kv_backend::memory::MemoryKvBackend;
143 use crate::kv_backend::KvBackend;
144 use crate::rpc::store::PutRequest;
145
146 #[tokio::test]
147 async fn test_runtime_switch_manager_basic() {
148 let runtime_switch_manager =
149 Arc::new(RuntimeSwitchManager::new(Arc::new(MemoryKvBackend::new())));
150 runtime_switch_manager
151 .put_key(MAINTENANCE_KEY)
152 .await
153 .unwrap();
154 let v = runtime_switch_manager
155 .cache
156 .get(MAINTENANCE_KEY.as_bytes())
157 .await;
158 assert!(v.is_none());
159 runtime_switch_manager
160 .exists(MAINTENANCE_KEY)
161 .await
162 .unwrap();
163 let v = runtime_switch_manager
164 .cache
165 .get(MAINTENANCE_KEY.as_bytes())
166 .await;
167 assert!(v.is_some());
168 runtime_switch_manager
169 .delete_keys(&[MAINTENANCE_KEY])
170 .await
171 .unwrap();
172 let v = runtime_switch_manager
173 .cache
174 .get(MAINTENANCE_KEY.as_bytes())
175 .await;
176 assert!(v.is_none());
177 }
178
179 #[tokio::test]
180 async fn test_runtime_switch_manager() {
181 let runtime_switch_manager =
182 Arc::new(RuntimeSwitchManager::new(Arc::new(MemoryKvBackend::new())));
183 assert!(!runtime_switch_manager.maintenance_mode().await.unwrap());
184 runtime_switch_manager.set_maintenance_mode().await.unwrap();
185 assert!(runtime_switch_manager.maintenance_mode().await.unwrap());
186 runtime_switch_manager
187 .unset_maintenance_mode()
188 .await
189 .unwrap();
190 assert!(!runtime_switch_manager.maintenance_mode().await.unwrap());
191 }
192
193 #[tokio::test]
194 async fn test_runtime_switch_manager_with_legacy_key() {
195 let kv_backend = Arc::new(MemoryKvBackend::new());
196 kv_backend
197 .put(PutRequest {
198 key: Vec::from(LEGACY_MAINTENANCE_KEY),
199 value: vec![],
200 prev_kv: false,
201 })
202 .await
203 .unwrap();
204 let runtime_switch_manager = Arc::new(RuntimeSwitchManager::new(kv_backend));
205 assert!(runtime_switch_manager.maintenance_mode().await.unwrap());
206 runtime_switch_manager
207 .unset_maintenance_mode()
208 .await
209 .unwrap();
210 assert!(!runtime_switch_manager.maintenance_mode().await.unwrap());
211 runtime_switch_manager.set_maintenance_mode().await.unwrap();
212 assert!(runtime_switch_manager.maintenance_mode().await.unwrap());
213 }
214
215 #[tokio::test]
216 async fn test_pasue_procedure() {
217 let runtime_switch_manager =
218 Arc::new(RuntimeSwitchManager::new(Arc::new(MemoryKvBackend::new())));
219 runtime_switch_manager.pasue_procedure().await.unwrap();
220 assert!(runtime_switch_manager.is_procedure_paused().await.unwrap());
221 runtime_switch_manager.resume_procedure().await.unwrap();
222 assert!(!runtime_switch_manager.is_procedure_paused().await.unwrap());
223 }
224}