mito2/manifest/storage/
staging.rs1use std::sync::Arc;
16
17use common_datasource::compression::CompressionType;
18use common_telemetry::debug;
19use object_store::{Lister, ObjectStore, util};
20use snafu::ResultExt;
21use store_api::ManifestVersion;
22
23use crate::error::{OpenDalSnafu, Result};
24use crate::manifest::storage::delta::DeltaStorage;
25use crate::manifest::storage::size_tracker::NoopTracker;
26use crate::manifest::storage::utils::sort_manifests;
27use crate::manifest::storage::{file_version, is_delta_file};
28
29#[derive(Debug, Clone)]
35pub(crate) struct StagingBlobStorage {
36 object_store: ObjectStore,
37 path: String,
38}
39
40pub fn staging_blob_path(manifest_path: &str) -> String {
46 let parent_dir = manifest_path
47 .trim_end_matches("manifest/")
48 .trim_end_matches('/');
49 util::normalize_dir(&format!("{}/staging/blob", parent_dir))
50}
51
52impl StagingBlobStorage {
53 pub fn new(path: String, object_store: ObjectStore) -> Self {
54 let path = util::normalize_dir(&path);
55 common_telemetry::debug!(
56 "Staging blob storage path: {}, root: {}",
57 path,
58 object_store.info().root()
59 );
60 Self { object_store, path }
61 }
62
63 pub fn path(&self) -> &str {
65 &self.path
66 }
67
68 pub fn object_store(&self) -> &ObjectStore {
70 &self.object_store
71 }
72
73 pub async fn put(&self, path: &str, bytes: Vec<u8>) -> Result<()> {
75 let path = format!("{}{}", self.path, path);
76 common_telemetry::debug!(
77 "Putting blob to staging blob storage, path: {}, root: {}, bytes: {}",
78 path,
79 self.object_store.info().root(),
80 bytes.len()
81 );
82 self.object_store
83 .write(&path, bytes)
84 .await
85 .context(OpenDalSnafu)?;
86 Ok(())
87 }
88
89 pub async fn get(&self, path: &str) -> Result<Vec<u8>> {
91 let path = format!("{}{}", self.path, path);
92 common_telemetry::debug!(
93 "Reading blob from staging blob storage, path: {}, root: {}",
94 path,
95 self.object_store.info().root()
96 );
97 let bytes = self.object_store.read(&path).await.context(OpenDalSnafu)?;
98
99 Ok(bytes.to_vec())
100 }
101}
102
103#[derive(Debug, Clone)]
113pub(crate) struct StagingStorage {
114 delta_storage: DeltaStorage<NoopTracker>,
115 blob_storage: StagingBlobStorage,
116}
117
118pub fn staging_manifest_path(manifest_path: &str) -> String {
124 let parent_dir = manifest_path
125 .trim_end_matches("manifest/")
126 .trim_end_matches('/');
127 util::normalize_dir(&format!("{}/staging/manifest", parent_dir))
128}
129
130impl StagingStorage {
131 pub fn new(path: String, object_store: ObjectStore, compress_type: CompressionType) -> Self {
132 let staging_blob_path = staging_blob_path(&path);
133 let blob_storage = StagingBlobStorage::new(staging_blob_path, object_store.clone());
134 let staging_manifest_path = staging_manifest_path(&path);
135 let delta_storage = DeltaStorage::new(
136 staging_manifest_path.clone(),
137 object_store.clone(),
138 compress_type,
139 None,
141 Arc::new(NoopTracker),
144 );
145
146 Self {
147 delta_storage,
148 blob_storage,
149 }
150 }
151
152 pub(crate) fn blob_storage(&self) -> &StagingBlobStorage {
154 &self.blob_storage
155 }
156
157 pub(crate) async fn manifest_lister(&self) -> Result<Option<Lister>> {
159 self.delta_storage.manifest_lister().await
160 }
161
162 pub(crate) async fn fetch_manifests(&self) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
164 let manifest_entries = self
165 .delta_storage
166 .get_paths(|entry| {
167 let file_name = entry.name();
168 if is_delta_file(file_name) {
169 let version = file_version(file_name);
170 Some((version, entry))
171 } else {
172 None
173 }
174 })
175 .await?;
176
177 let mut sorted_entries = manifest_entries;
178 sort_manifests(&mut sorted_entries);
179
180 self.delta_storage
181 .fetch_manifests_from_entries(sorted_entries)
182 .await
183 }
184
185 pub(crate) async fn save(&mut self, version: ManifestVersion, bytes: &[u8]) -> Result<()> {
187 self.delta_storage.save(version, bytes).await
188 }
189
190 pub(crate) async fn clear(&self) -> Result<()> {
192 self.delta_storage
193 .object_store()
194 .remove_all(self.delta_storage.path())
195 .await
196 .context(OpenDalSnafu)?;
197
198 self.blob_storage
199 .object_store()
200 .remove_all(self.blob_storage.path())
201 .await
202 .context(OpenDalSnafu)?;
203
204 debug!(
205 "Cleared all staging manifest files from {}, blob directory: {}",
206 self.delta_storage.path(),
207 self.blob_storage.path()
208 );
209
210 Ok(())
211 }
212}
213
214#[cfg(test)]
215impl StagingStorage {
216 pub fn set_compress_type(&mut self, compress_type: CompressionType) {
217 self.delta_storage.set_compress_type(compress_type);
218 }
219}
220
221#[cfg(test)]
222mod tests {
223 use common_datasource::compression::CompressionType;
224 use common_test_util::temp_dir::create_temp_dir;
225 use futures::TryStreamExt;
226 use object_store::services::Fs;
227 use object_store::{ErrorKind, ObjectStore};
228
229 use super::{StagingStorage, staging_blob_path, staging_manifest_path};
230
231 #[test]
232 fn test_staging_path() {
233 let path = "/data/table/region_0001/manifest/";
234 let expected = "/data/table/region_0001/staging/manifest/";
235 assert_eq!(staging_manifest_path(path), expected);
236 }
237
238 #[test]
239 fn test_staging_blob_path() {
240 let path = "/data/table/region_0001/manifest/";
241 let expected = "/data/table/region_0001/staging/blob/";
242 assert_eq!(staging_blob_path(path), expected);
243 }
244
245 async fn count_entries(object_store: &ObjectStore, path: &str) -> usize {
246 match object_store.lister_with(path).await {
247 Ok(lister) => lister.try_collect::<Vec<_>>().await.unwrap().len(),
248 Err(err) if err.kind() == ErrorKind::NotFound => 0,
249 Err(err) => panic!("failed to list {path}: {err}"),
250 }
251 }
252
253 #[tokio::test]
254 async fn test_staging_clear_removes_blobs() {
255 common_telemetry::init_default_ut_logging();
256
257 let tmp_dir = create_temp_dir("test_staging_storage_clear");
258 let builder = Fs::default().root(&tmp_dir.path().to_string_lossy());
259 let object_store = ObjectStore::new(builder).unwrap().finish();
260 let manifest_path = "/data/table/region_0001/manifest/";
261 let mut storage = StagingStorage::new(
262 manifest_path.to_string(),
263 object_store.clone(),
264 CompressionType::Uncompressed,
265 );
266 let manifest_dir = staging_manifest_path(manifest_path);
267 let blob_dir = staging_blob_path(manifest_path);
268
269 storage.save(0, b"manifest").await.unwrap();
270 storage
271 .blob_storage()
272 .put("region_0002_manifest", b"blob".to_vec())
273 .await
274 .unwrap();
275
276 assert!(count_entries(&object_store, &manifest_dir).await > 0);
277 assert!(count_entries(&object_store, &blob_dir).await > 0);
278
279 storage.clear().await.unwrap();
280
281 assert_eq!(count_entries(&object_store, &manifest_dir).await, 0);
282 assert_eq!(count_entries(&object_store, &blob_dir).await, 0);
283 }
284}