common_procedure/store/
state_store.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::pin::Pin;
16use std::sync::Arc;
17
18use async_stream::try_stream;
19use async_trait::async_trait;
20use common_error::ext::{BoxedError, PlainError};
21use common_error::status_code::StatusCode;
22use futures::{Stream, StreamExt};
23use object_store::{EntryMode, ObjectStore};
24use snafu::ResultExt;
25
26use crate::error::{DeleteStateSnafu, ListStateSnafu, PutStateSnafu, Result};
27
28/// The set of keys.
29#[derive(Debug, Clone, PartialEq, Eq)]
30pub struct KeySet {
31    key: String,
32    segments: usize,
33}
34
35impl PartialOrd for KeySet {
36    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
37        Some(self.cmp(other))
38    }
39}
40
41impl Ord for KeySet {
42    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
43        self.key.cmp(&other.key)
44    }
45}
46
47impl From<&str> for KeySet {
48    fn from(value: &str) -> Self {
49        KeySet {
50            key: value.to_string(),
51            segments: 0,
52        }
53    }
54}
55
56impl KeySet {
57    pub fn new(key: String, segments: usize) -> Self {
58        Self { key, segments }
59    }
60
61    pub fn with_segment_suffix(key: &str, version: usize) -> String {
62        format!("{key}/{version:010}")
63    }
64
65    pub fn with_prefix(key: &str) -> String {
66        format!("{key}/")
67    }
68
69    pub fn keys(&self) -> Vec<String> {
70        let mut keys = Vec::with_capacity(self.segments + 1);
71        keys.push(self.key.to_string());
72        for i in 1..=self.segments {
73            keys.push(Self::with_segment_suffix(&self.key, i))
74        }
75
76        keys
77    }
78
79    pub fn key(&self) -> &str {
80        &self.key
81    }
82}
83
84/// Key value from state store.
85pub type KeyValue = (KeySet, Vec<u8>);
86
87/// Stream that yields [KeyValue].
88pub type KeyValueStream = Pin<Box<dyn Stream<Item = Result<KeyValue>> + Send>>;
89
90/// Storage layer for persisting procedure's state.
91#[async_trait]
92pub trait StateStore: Send + Sync {
93    /// Puts `key` and `value` into the store.
94    async fn put(&self, key: &str, value: Vec<u8>) -> Result<()>;
95
96    /// Returns the key-value pairs under `path` in top down way.
97    ///
98    /// # Note
99    /// - There is no guarantee about the order of the keys in the stream.
100    /// - The `path` must ends with `/`.
101    async fn walk_top_down(&self, path: &str) -> Result<KeyValueStream>;
102
103    /// Deletes key-value pairs by `keys`.
104    async fn batch_delete(&self, keys: &[String]) -> Result<()>;
105
106    /// Deletes one key-value pair by `key`. Return `Ok` if the key
107    /// does not exist.
108    async fn delete(&self, key: &str) -> Result<()>;
109}
110
111/// Reference counted pointer to [StateStore].
112pub(crate) type StateStoreRef = Arc<dyn StateStore>;
113
114/// [StateStore] based on [ObjectStore].
115#[derive(Debug)]
116pub struct ObjectStateStore {
117    store: ObjectStore,
118}
119
120impl ObjectStateStore {
121    /// Returns a new [ObjectStateStore] with specific `store`.
122    pub fn new(store: ObjectStore) -> ObjectStateStore {
123        ObjectStateStore { store }
124    }
125}
126
127#[async_trait]
128impl StateStore for ObjectStateStore {
129    async fn put(&self, key: &str, value: Vec<u8>) -> Result<()> {
130        self.store
131            .write(key, value)
132            .await
133            .map_err(|e| {
134                BoxedError::new(PlainError::new(
135                    e.to_string(),
136                    StatusCode::StorageUnavailable,
137                ))
138            })
139            .context(PutStateSnafu { key })
140            .map(|_| ())
141    }
142
143    async fn walk_top_down(&self, path: &str) -> Result<KeyValueStream> {
144        let mut lister = self
145            .store
146            .lister_with(path)
147            .recursive(true)
148            .await
149            .map_err(|e| {
150                BoxedError::new(PlainError::new(
151                    e.to_string(),
152                    StatusCode::StorageUnavailable,
153                ))
154            })
155            .context(ListStateSnafu { path })?;
156
157        let store = self.store.clone();
158
159        let path_string = path.to_string();
160        let stream = try_stream!({
161            while let Some(res) = lister.next().await {
162                let entry = res
163                    .map_err(|e| {
164                        BoxedError::new(PlainError::new(
165                            e.to_string(),
166                            StatusCode::StorageUnavailable,
167                        ))
168                    })
169                    .context(ListStateSnafu { path: &path_string })?;
170                let key = entry.path();
171
172                if let EntryMode::FILE = entry.metadata().mode() {
173                    let value = store
174                        .read(key)
175                        .await
176                        .map_err(|e| {
177                            BoxedError::new(PlainError::new(
178                                e.to_string(),
179                                StatusCode::StorageUnavailable,
180                            ))
181                        })
182                        .context(ListStateSnafu { path: key })?;
183                    yield (key.into(), value.to_vec());
184                }
185            }
186        });
187
188        Ok(Box::pin(stream))
189    }
190
191    async fn batch_delete(&self, keys: &[String]) -> Result<()> {
192        self.store
193            .delete_iter(keys.iter().map(String::as_str))
194            .await
195            .with_context(|_| DeleteStateSnafu {
196                key: format!("{:?}", keys),
197            })?;
198
199        Ok(())
200    }
201
202    async fn delete(&self, key: &str) -> Result<()> {
203        self.store
204            .delete(key)
205            .await
206            .with_context(|_| DeleteStateSnafu { key })?;
207
208        Ok(())
209    }
210}
211
212#[cfg(test)]
213mod tests {
214    use common_test_util::temp_dir::create_temp_dir;
215    use futures_util::TryStreamExt;
216    use object_store::services::Fs as Builder;
217
218    use super::*;
219
220    #[tokio::test]
221    async fn test_object_state_store() {
222        let dir = create_temp_dir("state_store");
223        let store_dir = dir.path().to_str().unwrap();
224        let builder = Builder::default().root(store_dir);
225
226        let object_store = ObjectStore::new(builder).unwrap().finish();
227        let state_store = ObjectStateStore::new(object_store);
228
229        let data: Vec<_> = state_store
230            .walk_top_down("/")
231            .await
232            .unwrap()
233            .try_collect()
234            .await
235            .unwrap();
236        assert!(data.is_empty());
237
238        state_store.put("a/1", b"v1".to_vec()).await.unwrap();
239        state_store.put("a/2", b"v2".to_vec()).await.unwrap();
240        state_store.put("b/1", b"v3".to_vec()).await.unwrap();
241
242        let mut data: Vec<_> = state_store
243            .walk_top_down("/")
244            .await
245            .unwrap()
246            .try_collect()
247            .await
248            .unwrap();
249        data.sort_unstable_by(|a, b| a.0.cmp(&b.0));
250        assert_eq!(
251            vec![
252                ("a/1".into(), b"v1".to_vec()),
253                ("a/2".into(), b"v2".to_vec()),
254                ("b/1".into(), b"v3".to_vec())
255            ],
256            data
257        );
258
259        let mut data: Vec<_> = state_store
260            .walk_top_down("a/")
261            .await
262            .unwrap()
263            .try_collect()
264            .await
265            .unwrap();
266        data.sort_unstable_by(|a, b| a.0.cmp(&b.0));
267        assert_eq!(
268            vec![
269                ("a/1".into(), b"v1".to_vec()),
270                ("a/2".into(), b"v2".to_vec()),
271            ],
272            data
273        );
274
275        state_store
276            .batch_delete(&["a/2".to_string(), "b/1".to_string()])
277            .await
278            .unwrap();
279        let mut data: Vec<_> = state_store
280            .walk_top_down("a/")
281            .await
282            .unwrap()
283            .try_collect()
284            .await
285            .unwrap();
286        data.sort_unstable_by(|a, b| a.0.cmp(&b.0));
287        assert_eq!(vec![("a/1".into(), b"v1".to_vec()),], data);
288    }
289
290    #[tokio::test]
291    async fn test_object_state_store_delete() {
292        let dir = create_temp_dir("state_store_list");
293        let store_dir = dir.path().to_str().unwrap();
294        let builder = Builder::default().root(store_dir);
295
296        let object_store = ObjectStore::new(builder).unwrap().finish();
297        let state_store = ObjectStateStore::new(object_store);
298
299        state_store.put("a/1", b"v1".to_vec()).await.unwrap();
300        state_store.put("a/2", b"v2".to_vec()).await.unwrap();
301        state_store.put("b/1", b"v3".to_vec()).await.unwrap();
302
303        state_store.delete("b/1").await.unwrap();
304
305        let mut data: Vec<_> = state_store
306            .walk_top_down("a/")
307            .await
308            .unwrap()
309            .try_collect()
310            .await
311            .unwrap();
312        data.sort_unstable_by(|a, b| a.0.cmp(&b.0));
313        assert_eq!(
314            vec![
315                ("a/1".into(), b"v1".to_vec()),
316                ("a/2".into(), b"v2".to_vec()),
317            ],
318            data
319        );
320
321        // Delete returns Ok even the key doesn't exist.
322        state_store.delete("b/1").await.unwrap();
323    }
324}