common_procedure/store/
state_store.rsuse std::pin::Pin;
use std::sync::Arc;
use async_stream::try_stream;
use async_trait::async_trait;
use common_error::ext::{BoxedError, PlainError};
use common_error::status_code::StatusCode;
use futures::{Stream, StreamExt};
use object_store::{EntryMode, ObjectStore};
use snafu::ResultExt;
use crate::error::{DeleteStateSnafu, ListStateSnafu, PutStateSnafu, Result};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct KeySet {
key: String,
segments: usize,
}
impl PartialOrd for KeySet {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for KeySet {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.key.cmp(&other.key)
}
}
impl From<&str> for KeySet {
fn from(value: &str) -> Self {
KeySet {
key: value.to_string(),
segments: 0,
}
}
}
impl KeySet {
pub fn new(key: String, segments: usize) -> Self {
Self { key, segments }
}
pub fn with_segment_suffix(key: &str, version: usize) -> String {
format!("{key}/{version:010}")
}
pub fn with_prefix(key: &str) -> String {
format!("{key}/")
}
pub fn keys(&self) -> Vec<String> {
let mut keys = Vec::with_capacity(self.segments + 1);
keys.push(self.key.to_string());
for i in 1..=self.segments {
keys.push(Self::with_segment_suffix(&self.key, i))
}
keys
}
pub fn key(&self) -> &str {
&self.key
}
}
pub type KeyValue = (KeySet, Vec<u8>);
pub type KeyValueStream = Pin<Box<dyn Stream<Item = Result<KeyValue>> + Send>>;
#[async_trait]
pub trait StateStore: Send + Sync {
async fn put(&self, key: &str, value: Vec<u8>) -> Result<()>;
async fn walk_top_down(&self, path: &str) -> Result<KeyValueStream>;
async fn batch_delete(&self, keys: &[String]) -> Result<()>;
async fn delete(&self, key: &str) -> Result<()>;
}
pub(crate) type StateStoreRef = Arc<dyn StateStore>;
#[derive(Debug)]
pub struct ObjectStateStore {
store: ObjectStore,
}
impl ObjectStateStore {
pub fn new(store: ObjectStore) -> ObjectStateStore {
ObjectStateStore { store }
}
}
#[async_trait]
impl StateStore for ObjectStateStore {
async fn put(&self, key: &str, value: Vec<u8>) -> Result<()> {
self.store
.write(key, value)
.await
.map_err(|e| {
BoxedError::new(PlainError::new(
e.to_string(),
StatusCode::StorageUnavailable,
))
})
.context(PutStateSnafu { key })
}
async fn walk_top_down(&self, path: &str) -> Result<KeyValueStream> {
let mut lister = self
.store
.lister_with(path)
.recursive(true)
.await
.map_err(|e| {
BoxedError::new(PlainError::new(
e.to_string(),
StatusCode::StorageUnavailable,
))
})
.context(ListStateSnafu { path })?;
let store = self.store.clone();
let path_string = path.to_string();
let stream = try_stream!({
while let Some(res) = lister.next().await {
let entry = res
.map_err(|e| {
BoxedError::new(PlainError::new(
e.to_string(),
StatusCode::StorageUnavailable,
))
})
.context(ListStateSnafu { path: &path_string })?;
let key = entry.path();
if let EntryMode::FILE = entry.metadata().mode() {
let value = store
.read(key)
.await
.map_err(|e| {
BoxedError::new(PlainError::new(
e.to_string(),
StatusCode::StorageUnavailable,
))
})
.context(ListStateSnafu { path: key })?;
yield (key.into(), value.to_vec());
}
}
});
Ok(Box::pin(stream))
}
async fn batch_delete(&self, keys: &[String]) -> Result<()> {
self.store
.delete_iter(keys.iter().map(String::as_str))
.await
.with_context(|_| DeleteStateSnafu {
key: format!("{:?}", keys),
})?;
Ok(())
}
async fn delete(&self, key: &str) -> Result<()> {
self.store
.delete(key)
.await
.with_context(|_| DeleteStateSnafu { key })?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use common_test_util::temp_dir::create_temp_dir;
use futures_util::TryStreamExt;
use object_store::services::Fs as Builder;
use super::*;
#[tokio::test]
async fn test_object_state_store() {
let dir = create_temp_dir("state_store");
let store_dir = dir.path().to_str().unwrap();
let builder = Builder::default().root(store_dir);
let object_store = ObjectStore::new(builder).unwrap().finish();
let state_store = ObjectStateStore::new(object_store);
let data: Vec<_> = state_store
.walk_top_down("/")
.await
.unwrap()
.try_collect()
.await
.unwrap();
assert!(data.is_empty());
state_store.put("a/1", b"v1".to_vec()).await.unwrap();
state_store.put("a/2", b"v2".to_vec()).await.unwrap();
state_store.put("b/1", b"v3".to_vec()).await.unwrap();
let mut data: Vec<_> = state_store
.walk_top_down("/")
.await
.unwrap()
.try_collect()
.await
.unwrap();
data.sort_unstable_by(|a, b| a.0.cmp(&b.0));
assert_eq!(
vec![
("a/1".into(), b"v1".to_vec()),
("a/2".into(), b"v2".to_vec()),
("b/1".into(), b"v3".to_vec())
],
data
);
let mut data: Vec<_> = state_store
.walk_top_down("a/")
.await
.unwrap()
.try_collect()
.await
.unwrap();
data.sort_unstable_by(|a, b| a.0.cmp(&b.0));
assert_eq!(
vec![
("a/1".into(), b"v1".to_vec()),
("a/2".into(), b"v2".to_vec()),
],
data
);
state_store
.batch_delete(&["a/2".to_string(), "b/1".to_string()])
.await
.unwrap();
let mut data: Vec<_> = state_store
.walk_top_down("a/")
.await
.unwrap()
.try_collect()
.await
.unwrap();
data.sort_unstable_by(|a, b| a.0.cmp(&b.0));
assert_eq!(vec![("a/1".into(), b"v1".to_vec()),], data);
}
#[tokio::test]
async fn test_object_state_store_delete() {
let dir = create_temp_dir("state_store_list");
let store_dir = dir.path().to_str().unwrap();
let builder = Builder::default().root(store_dir);
let object_store = ObjectStore::new(builder).unwrap().finish();
let state_store = ObjectStateStore::new(object_store);
state_store.put("a/1", b"v1".to_vec()).await.unwrap();
state_store.put("a/2", b"v2".to_vec()).await.unwrap();
state_store.put("b/1", b"v3".to_vec()).await.unwrap();
state_store.delete("b/1").await.unwrap();
let mut data: Vec<_> = state_store
.walk_top_down("a/")
.await
.unwrap()
.try_collect()
.await
.unwrap();
data.sort_unstable_by(|a, b| a.0.cmp(&b.0));
assert_eq!(
vec![
("a/1".into(), b"v1".to_vec()),
("a/2".into(), b"v2".to_vec()),
],
data
);
state_store.delete("b/1").await.unwrap();
}
}