mito2/manifest/storage/
staging.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use common_datasource::compression::CompressionType;
18use common_telemetry::debug;
19use object_store::{Lister, ObjectStore, util};
20use snafu::ResultExt;
21use store_api::ManifestVersion;
22
23use crate::error::{OpenDalSnafu, Result};
24use crate::manifest::storage::delta::DeltaStorage;
25use crate::manifest::storage::size_tracker::NoopTracker;
26use crate::manifest::storage::utils::sort_manifests;
27use crate::manifest::storage::{file_version, is_delta_file};
28
29/// A simple blob storage for arbitrary binary data in the staging directory.
30///
31/// This is primarily used during repartition operations to store generated
32/// manifests that will be consumed by other regions via [`ApplyStagingManifestRequest`](store_api::region_request::ApplyStagingManifestRequest).
33/// The blobs are stored in `{region_dir}/staging/blob/` directory.
34#[derive(Debug, Clone)]
35pub(crate) struct StagingBlobStorage {
36    object_store: ObjectStore,
37    path: String,
38}
39
40/// Returns the staging path from the blob path.
41///
42/// # Example
43/// - Input: `"data/table/region_0001/manifest/"`
44/// - Output: `"data/table/region_0001/staging/blob/"`
45pub fn staging_blob_path(manifest_path: &str) -> String {
46    let parent_dir = manifest_path
47        .trim_end_matches("manifest/")
48        .trim_end_matches('/');
49    util::normalize_dir(&format!("{}/staging/blob", parent_dir))
50}
51
52impl StagingBlobStorage {
53    pub fn new(path: String, object_store: ObjectStore) -> Self {
54        let path = util::normalize_dir(&path);
55        common_telemetry::debug!(
56            "Staging blob storage path: {}, root: {}",
57            path,
58            object_store.info().root()
59        );
60        Self { object_store, path }
61    }
62
63    /// Returns the path.
64    pub fn path(&self) -> &str {
65        &self.path
66    }
67
68    /// Returns the object store.
69    pub fn object_store(&self) -> &ObjectStore {
70        &self.object_store
71    }
72
73    /// Put the bytes to the blob storage.
74    pub async fn put(&self, path: &str, bytes: Vec<u8>) -> Result<()> {
75        let path = format!("{}{}", self.path, path);
76        common_telemetry::debug!(
77            "Putting blob to staging blob storage, path: {}, root: {}, bytes: {}",
78            path,
79            self.object_store.info().root(),
80            bytes.len()
81        );
82        self.object_store
83            .write(&path, bytes)
84            .await
85            .context(OpenDalSnafu)?;
86        Ok(())
87    }
88
89    /// Get the bytes from the blob storage.
90    pub async fn get(&self, path: &str) -> Result<Vec<u8>> {
91        let path = format!("{}{}", self.path, path);
92        common_telemetry::debug!(
93            "Reading blob from staging blob storage, path: {}, root: {}",
94            path,
95            self.object_store.info().root()
96        );
97        let bytes = self.object_store.read(&path).await.context(OpenDalSnafu)?;
98
99        Ok(bytes.to_vec())
100    }
101}
102
103/// Storage for staging manifest files and blobs used during repartition operations.
104///
105/// Fields:
106/// - `delta_storage`: Manages incremental manifest delta files specific to the staging region.
107/// - `blob_storage`: Manages arbitrary blobs, such as generated manifests for regions.
108///
109/// Directory structure:
110/// - `{region_dir}/staging/manifest/` — for incremental manifest delta files for the staging region.
111/// - `{region_dir}/staging/blob/` — for arbitrary blobs (e.g., generated region manifests).
112#[derive(Debug, Clone)]
113pub(crate) struct StagingStorage {
114    delta_storage: DeltaStorage<NoopTracker>,
115    blob_storage: StagingBlobStorage,
116}
117
118/// Returns the staging path from the manifest path.
119///
120/// # Example
121/// - Input: `"data/table/region_0001/manifest/"`
122/// - Output: `"data/table/region_0001/staging/manifest/"`
123pub fn staging_manifest_path(manifest_path: &str) -> String {
124    let parent_dir = manifest_path
125        .trim_end_matches("manifest/")
126        .trim_end_matches('/');
127    util::normalize_dir(&format!("{}/staging/manifest", parent_dir))
128}
129
130impl StagingStorage {
131    pub fn new(path: String, object_store: ObjectStore, compress_type: CompressionType) -> Self {
132        let staging_blob_path = staging_blob_path(&path);
133        let blob_storage = StagingBlobStorage::new(staging_blob_path, object_store.clone());
134        let staging_manifest_path = staging_manifest_path(&path);
135        let delta_storage = DeltaStorage::new(
136            staging_manifest_path.clone(),
137            object_store.clone(),
138            compress_type,
139            // StagingStorage does not use a manifest cache; set to None.
140            None,
141            // StagingStorage does not track file sizes, since all staging files are
142            // deleted after exiting staging mode.
143            Arc::new(NoopTracker),
144        );
145
146        Self {
147            delta_storage,
148            blob_storage,
149        }
150    }
151
152    /// Returns the blob storage.
153    pub(crate) fn blob_storage(&self) -> &StagingBlobStorage {
154        &self.blob_storage
155    }
156
157    /// Returns an iterator of manifests from staging directory.
158    pub(crate) async fn manifest_lister(&self) -> Result<Option<Lister>> {
159        self.delta_storage.manifest_lister().await
160    }
161
162    /// Fetch all staging manifest files and return them as (version, action_list) pairs.
163    pub(crate) async fn fetch_manifests(&self) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
164        let manifest_entries = self
165            .delta_storage
166            .get_paths(|entry| {
167                let file_name = entry.name();
168                if is_delta_file(file_name) {
169                    let version = file_version(file_name);
170                    Some((version, entry))
171                } else {
172                    None
173                }
174            })
175            .await?;
176
177        let mut sorted_entries = manifest_entries;
178        sort_manifests(&mut sorted_entries);
179
180        self.delta_storage
181            .fetch_manifests_from_entries(sorted_entries)
182            .await
183    }
184
185    /// Save the delta manifest file.
186    pub(crate) async fn save(&mut self, version: ManifestVersion, bytes: &[u8]) -> Result<()> {
187        self.delta_storage.save(version, bytes).await
188    }
189
190    /// Clean all staging manifest files.
191    pub(crate) async fn clear(&self) -> Result<()> {
192        self.delta_storage
193            .object_store()
194            .remove_all(self.delta_storage.path())
195            .await
196            .context(OpenDalSnafu)?;
197
198        self.blob_storage
199            .object_store()
200            .remove_all(self.blob_storage.path())
201            .await
202            .context(OpenDalSnafu)?;
203
204        debug!(
205            "Cleared all staging manifest files from {}, blob directory: {}",
206            self.delta_storage.path(),
207            self.blob_storage.path()
208        );
209
210        Ok(())
211    }
212}
213
214#[cfg(test)]
215impl StagingStorage {
216    pub fn set_compress_type(&mut self, compress_type: CompressionType) {
217        self.delta_storage.set_compress_type(compress_type);
218    }
219}
220
221#[cfg(test)]
222mod tests {
223    use common_datasource::compression::CompressionType;
224    use common_test_util::temp_dir::create_temp_dir;
225    use futures::TryStreamExt;
226    use object_store::services::Fs;
227    use object_store::{ErrorKind, ObjectStore};
228
229    use super::{StagingStorage, staging_blob_path, staging_manifest_path};
230
231    #[test]
232    fn test_staging_path() {
233        let path = "/data/table/region_0001/manifest/";
234        let expected = "/data/table/region_0001/staging/manifest/";
235        assert_eq!(staging_manifest_path(path), expected);
236    }
237
238    #[test]
239    fn test_staging_blob_path() {
240        let path = "/data/table/region_0001/manifest/";
241        let expected = "/data/table/region_0001/staging/blob/";
242        assert_eq!(staging_blob_path(path), expected);
243    }
244
245    async fn count_entries(object_store: &ObjectStore, path: &str) -> usize {
246        match object_store.lister_with(path).await {
247            Ok(lister) => lister.try_collect::<Vec<_>>().await.unwrap().len(),
248            Err(err) if err.kind() == ErrorKind::NotFound => 0,
249            Err(err) => panic!("failed to list {path}: {err}"),
250        }
251    }
252
253    #[tokio::test]
254    async fn test_staging_clear_removes_blobs() {
255        common_telemetry::init_default_ut_logging();
256
257        let tmp_dir = create_temp_dir("test_staging_storage_clear");
258        let builder = Fs::default().root(&tmp_dir.path().to_string_lossy());
259        let object_store = ObjectStore::new(builder).unwrap().finish();
260        let manifest_path = "/data/table/region_0001/manifest/";
261        let mut storage = StagingStorage::new(
262            manifest_path.to_string(),
263            object_store.clone(),
264            CompressionType::Uncompressed,
265        );
266        let manifest_dir = staging_manifest_path(manifest_path);
267        let blob_dir = staging_blob_path(manifest_path);
268
269        storage.save(0, b"manifest").await.unwrap();
270        storage
271            .blob_storage()
272            .put("region_0002_manifest", b"blob".to_vec())
273            .await
274            .unwrap();
275
276        assert!(count_entries(&object_store, &manifest_dir).await > 0);
277        assert!(count_entries(&object_store, &blob_dir).await > 0);
278
279        storage.clear().await.unwrap();
280
281        assert_eq!(count_entries(&object_store, &manifest_dir).await, 0);
282        assert_eq!(count_entries(&object_store, &blob_dir).await, 0);
283    }
284}