common_procedure/store/
state_store.rs1use std::pin::Pin;
16use std::sync::Arc;
17
18use async_stream::try_stream;
19use async_trait::async_trait;
20use common_error::ext::{BoxedError, PlainError};
21use common_error::status_code::StatusCode;
22use futures::{Stream, StreamExt};
23use object_store::{EntryMode, ObjectStore};
24use snafu::ResultExt;
25
26use crate::error::{DeleteStateSnafu, ListStateSnafu, PutStateSnafu, Result};
27
28#[derive(Debug, Clone, PartialEq, Eq)]
30pub struct KeySet {
31 key: String,
32 segments: usize,
33}
34
35impl PartialOrd for KeySet {
36 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
37 Some(self.cmp(other))
38 }
39}
40
41impl Ord for KeySet {
42 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
43 self.key.cmp(&other.key)
44 }
45}
46
47impl From<&str> for KeySet {
48 fn from(value: &str) -> Self {
49 KeySet {
50 key: value.to_string(),
51 segments: 0,
52 }
53 }
54}
55
56impl KeySet {
57 pub fn new(key: String, segments: usize) -> Self {
58 Self { key, segments }
59 }
60
61 pub fn with_segment_suffix(key: &str, version: usize) -> String {
62 format!("{key}/{version:010}")
63 }
64
65 pub fn with_prefix(key: &str) -> String {
66 format!("{key}/")
67 }
68
69 pub fn keys(&self) -> Vec<String> {
70 let mut keys = Vec::with_capacity(self.segments + 1);
71 keys.push(self.key.to_string());
72 for i in 1..=self.segments {
73 keys.push(Self::with_segment_suffix(&self.key, i))
74 }
75
76 keys
77 }
78
79 pub fn key(&self) -> &str {
80 &self.key
81 }
82}
83
84pub type KeyValue = (KeySet, Vec<u8>);
86
87pub type KeyValueStream = Pin<Box<dyn Stream<Item = Result<KeyValue>> + Send>>;
89
90#[async_trait]
92pub trait StateStore: Send + Sync {
93 async fn put(&self, key: &str, value: Vec<u8>) -> Result<()>;
95
96 async fn walk_top_down(&self, path: &str) -> Result<KeyValueStream>;
102
103 async fn batch_delete(&self, keys: &[String]) -> Result<()>;
105
106 async fn delete(&self, key: &str) -> Result<()>;
109}
110
111pub(crate) type StateStoreRef = Arc<dyn StateStore>;
113
114#[derive(Debug)]
116pub struct ObjectStateStore {
117 store: ObjectStore,
118}
119
120impl ObjectStateStore {
121 pub fn new(store: ObjectStore) -> ObjectStateStore {
123 ObjectStateStore { store }
124 }
125}
126
127#[async_trait]
128impl StateStore for ObjectStateStore {
129 async fn put(&self, key: &str, value: Vec<u8>) -> Result<()> {
130 self.store
131 .write(key, value)
132 .await
133 .map_err(|e| {
134 BoxedError::new(PlainError::new(
135 e.to_string(),
136 StatusCode::StorageUnavailable,
137 ))
138 })
139 .context(PutStateSnafu { key })
140 .map(|_| ())
141 }
142
143 async fn walk_top_down(&self, path: &str) -> Result<KeyValueStream> {
144 let mut lister = self
145 .store
146 .lister_with(path)
147 .recursive(true)
148 .await
149 .map_err(|e| {
150 BoxedError::new(PlainError::new(
151 e.to_string(),
152 StatusCode::StorageUnavailable,
153 ))
154 })
155 .context(ListStateSnafu { path })?;
156
157 let store = self.store.clone();
158
159 let path_string = path.to_string();
160 let stream = try_stream!({
161 while let Some(res) = lister.next().await {
162 let entry = res
163 .map_err(|e| {
164 BoxedError::new(PlainError::new(
165 e.to_string(),
166 StatusCode::StorageUnavailable,
167 ))
168 })
169 .context(ListStateSnafu { path: &path_string })?;
170 let key = entry.path();
171
172 if let EntryMode::FILE = entry.metadata().mode() {
173 let value = store
174 .read(key)
175 .await
176 .map_err(|e| {
177 BoxedError::new(PlainError::new(
178 e.to_string(),
179 StatusCode::StorageUnavailable,
180 ))
181 })
182 .context(ListStateSnafu { path: key })?;
183 yield (key.into(), value.to_vec());
184 }
185 }
186 });
187
188 Ok(Box::pin(stream))
189 }
190
191 async fn batch_delete(&self, keys: &[String]) -> Result<()> {
192 self.store
193 .delete_iter(keys.iter().map(String::as_str))
194 .await
195 .with_context(|_| DeleteStateSnafu {
196 key: format!("{:?}", keys),
197 })?;
198
199 Ok(())
200 }
201
202 async fn delete(&self, key: &str) -> Result<()> {
203 self.store
204 .delete(key)
205 .await
206 .with_context(|_| DeleteStateSnafu { key })?;
207
208 Ok(())
209 }
210}
211
212#[cfg(test)]
213mod tests {
214 use common_test_util::temp_dir::create_temp_dir;
215 use futures_util::TryStreamExt;
216 use object_store::services::Fs as Builder;
217
218 use super::*;
219
220 #[tokio::test]
221 async fn test_object_state_store() {
222 let dir = create_temp_dir("state_store");
223 let store_dir = dir.path().to_str().unwrap();
224 let builder = Builder::default().root(store_dir);
225
226 let object_store = ObjectStore::new(builder).unwrap().finish();
227 let state_store = ObjectStateStore::new(object_store);
228
229 let data: Vec<_> = state_store
230 .walk_top_down("/")
231 .await
232 .unwrap()
233 .try_collect()
234 .await
235 .unwrap();
236 assert!(data.is_empty());
237
238 state_store.put("a/1", b"v1".to_vec()).await.unwrap();
239 state_store.put("a/2", b"v2".to_vec()).await.unwrap();
240 state_store.put("b/1", b"v3".to_vec()).await.unwrap();
241
242 let mut data: Vec<_> = state_store
243 .walk_top_down("/")
244 .await
245 .unwrap()
246 .try_collect()
247 .await
248 .unwrap();
249 data.sort_unstable_by(|a, b| a.0.cmp(&b.0));
250 assert_eq!(
251 vec![
252 ("a/1".into(), b"v1".to_vec()),
253 ("a/2".into(), b"v2".to_vec()),
254 ("b/1".into(), b"v3".to_vec())
255 ],
256 data
257 );
258
259 let mut data: Vec<_> = state_store
260 .walk_top_down("a/")
261 .await
262 .unwrap()
263 .try_collect()
264 .await
265 .unwrap();
266 data.sort_unstable_by(|a, b| a.0.cmp(&b.0));
267 assert_eq!(
268 vec![
269 ("a/1".into(), b"v1".to_vec()),
270 ("a/2".into(), b"v2".to_vec()),
271 ],
272 data
273 );
274
275 state_store
276 .batch_delete(&["a/2".to_string(), "b/1".to_string()])
277 .await
278 .unwrap();
279 let mut data: Vec<_> = state_store
280 .walk_top_down("a/")
281 .await
282 .unwrap()
283 .try_collect()
284 .await
285 .unwrap();
286 data.sort_unstable_by(|a, b| a.0.cmp(&b.0));
287 assert_eq!(vec![("a/1".into(), b"v1".to_vec()),], data);
288 }
289
290 #[tokio::test]
291 async fn test_object_state_store_delete() {
292 let dir = create_temp_dir("state_store_list");
293 let store_dir = dir.path().to_str().unwrap();
294 let builder = Builder::default().root(store_dir);
295
296 let object_store = ObjectStore::new(builder).unwrap().finish();
297 let state_store = ObjectStateStore::new(object_store);
298
299 state_store.put("a/1", b"v1".to_vec()).await.unwrap();
300 state_store.put("a/2", b"v2".to_vec()).await.unwrap();
301 state_store.put("b/1", b"v3".to_vec()).await.unwrap();
302
303 state_store.delete("b/1").await.unwrap();
304
305 let mut data: Vec<_> = state_store
306 .walk_top_down("a/")
307 .await
308 .unwrap()
309 .try_collect()
310 .await
311 .unwrap();
312 data.sort_unstable_by(|a, b| a.0.cmp(&b.0));
313 assert_eq!(
314 vec![
315 ("a/1".into(), b"v1".to_vec()),
316 ("a/2".into(), b"v2".to_vec()),
317 ],
318 data
319 );
320
321 state_store.delete("b/1").await.unwrap();
323 }
324}