puffin/puffin_manager/
file_accessor.rs1use async_trait::async_trait;
16use common_base::range_read::{FileReader, SizeAwareRangeReader};
17use common_test_util::temp_dir::{create_temp_dir, TempDir};
18use futures::AsyncWrite;
19use tokio::fs::File;
20use tokio_util::compat::{Compat, TokioAsyncReadCompatExt};
21
22use crate::error::Result;
23
24#[async_trait]
26#[auto_impl::auto_impl(Arc)]
27pub trait PuffinFileAccessor: Send + Sync + 'static {
28 type Reader: SizeAwareRangeReader + Sync;
29 type Writer: AsyncWrite + Unpin + Send;
30 type FileHandle: ToString + Clone + Send + Sync;
31
32 async fn reader(&self, handle: &Self::FileHandle) -> Result<Self::Reader>;
34
35 async fn writer(&self, handle: &Self::FileHandle) -> Result<Self::Writer>;
37}
38
39pub struct MockFileAccessor {
40 tempdir: TempDir,
41}
42
43impl MockFileAccessor {
44 pub fn new(prefix: &str) -> Self {
45 let tempdir = create_temp_dir(prefix);
46 Self { tempdir }
47 }
48}
49
50#[async_trait]
51impl PuffinFileAccessor for MockFileAccessor {
52 type Reader = FileReader;
53 type Writer = Compat<File>;
54 type FileHandle = String;
55
56 async fn reader(&self, handle: &String) -> Result<Self::Reader> {
57 Ok(FileReader::new(self.tempdir.path().join(handle))
58 .await
59 .unwrap())
60 }
61
62 async fn writer(&self, handle: &String) -> Result<Self::Writer> {
63 let p = self.tempdir.path().join(handle);
64 if let Some(p) = p.parent() {
65 if !tokio::fs::try_exists(p).await.unwrap() {
66 tokio::fs::create_dir_all(p).await.unwrap();
67 }
68 }
69 let f = tokio::fs::File::create(p).await.unwrap();
70 Ok(f.compat())
71 }
72}