Skip to main content

cli/data/
snapshot_storage.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Storage abstraction for Export/Import V2.
16//!
17//! This module provides a unified interface for reading and writing snapshot data
18//! to various storage backends (S3, OSS, GCS, Azure Blob, local filesystem).
19
20use async_trait::async_trait;
21use futures::TryStreamExt;
22use object_store::services::{Azblob, Fs, Gcs, Oss, S3};
23use object_store::util::{with_instrument_layers, with_retry_layers};
24use object_store::{
25    AzblobConnection, ErrorKind, GcsConnection, ObjectStore, OssConnection, S3Connection,
26};
27use snafu::ResultExt;
28use url::Url;
29
30use crate::common::ObjectStoreConfig;
31use crate::data::export_v2::error::{
32    BuildObjectStoreSnafu, InvalidUriSnafu, ManifestParseSnafu, ManifestSerializeSnafu, Result,
33    SnapshotNotFoundSnafu, StorageOperationSnafu, TextDecodeSnafu, UnsupportedSchemeSnafu,
34    UrlParseSnafu,
35};
36use crate::data::export_v2::manifest::{MANIFEST_FILE, Manifest};
37#[cfg(test)]
38use crate::data::export_v2::schema::SchemaDefinition;
39use crate::data::export_v2::schema::{SCHEMA_DIR, SCHEMAS_FILE, SchemaSnapshot};
40
41struct RemoteLocation {
42    bucket_or_container: String,
43    root: String,
44}
45
46/// URI schemes supported for snapshot storage.
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum StorageScheme {
49    /// Amazon S3.
50    S3,
51    /// Alibaba Cloud OSS.
52    Oss,
53    /// Google Cloud Storage.
54    Gcs,
55    /// Azure Blob Storage.
56    Azblob,
57    /// Local filesystem (file://).
58    File,
59}
60
61impl StorageScheme {
62    /// Parses storage scheme from URI.
63    pub fn from_uri(uri: &str) -> Result<Self> {
64        let url = Url::parse(uri).context(UrlParseSnafu)?;
65
66        match url.scheme() {
67            "s3" => Ok(Self::S3),
68            "oss" => Ok(Self::Oss),
69            "gs" | "gcs" => Ok(Self::Gcs),
70            "azblob" => Ok(Self::Azblob),
71            "file" => Ok(Self::File),
72            scheme => UnsupportedSchemeSnafu { scheme }.fail(),
73        }
74    }
75}
76
77/// Extracts bucket/container and root path from a URI.
78fn extract_remote_location(uri: &str) -> Result<RemoteLocation> {
79    let url = Url::parse(uri).context(UrlParseSnafu)?;
80    let bucket_or_container = url.host_str().unwrap_or("").to_string();
81    if bucket_or_container.is_empty() {
82        return InvalidUriSnafu {
83            uri,
84            reason: "URI must include bucket/container in host",
85        }
86        .fail();
87    }
88
89    let root = url.path().trim_start_matches('/').to_string();
90    if root.is_empty() {
91        return InvalidUriSnafu {
92            uri,
93            reason: "snapshot URI must include a non-empty path after the bucket/container",
94        }
95        .fail();
96    }
97
98    Ok(RemoteLocation {
99        bucket_or_container,
100        root,
101    })
102}
103
104/// Validates that a URI has a proper scheme.
105///
106/// Rejects bare paths (e.g., `/tmp/backup`, `./backup`) because:
107/// - Schema export (CLI) and data export (server) run in different processes
108/// - Using bare paths would split the snapshot across machines
109///
110/// Supported URI schemes:
111/// - `s3://bucket/path` - Amazon S3
112/// - `oss://bucket/path` - Alibaba Cloud OSS
113/// - `gs://bucket/path` - Google Cloud Storage
114/// - `azblob://container/path` - Azure Blob Storage
115/// - `file:///absolute/path` - Local filesystem
116pub fn validate_uri(uri: &str) -> Result<StorageScheme> {
117    // Must have a scheme
118    if !uri.contains("://") {
119        return InvalidUriSnafu {
120            uri,
121            reason: "URI must have a scheme (e.g., s3://, file://). Bare paths are not supported.",
122        }
123        .fail();
124    }
125
126    StorageScheme::from_uri(uri)
127}
128
129fn schema_index_path() -> String {
130    format!("{}/{}", SCHEMA_DIR, SCHEMAS_FILE)
131}
132
133/// Extracts the absolute filesystem path from a file:// URI.
134fn extract_file_path_from_uri(uri: &str) -> Result<String> {
135    let url = Url::parse(uri).context(UrlParseSnafu)?;
136
137    match url.host_str() {
138        Some(host) if !host.is_empty() && host != "localhost" => InvalidUriSnafu {
139            uri,
140            reason: "file:// URI must use an absolute path like file:///tmp/backup",
141        }
142        .fail(),
143        _ => url
144            .to_file_path()
145            .map_err(|_| {
146                InvalidUriSnafu {
147                    uri,
148                    reason: "file:// URI must use an absolute path like file:///tmp/backup",
149                }
150                .build()
151            })
152            .map(|path| path.to_string_lossy().into_owned()),
153    }
154}
155
156async fn ensure_snapshot_exists(storage: &OpenDalStorage) -> Result<()> {
157    if storage.exists().await? {
158        Ok(())
159    } else {
160        SnapshotNotFoundSnafu {
161            uri: storage.target_uri.as_str(),
162        }
163        .fail()
164    }
165}
166
167/// Snapshot storage abstraction.
168///
169/// Provides operations for reading and writing snapshot data to various storage backends.
170#[async_trait]
171pub trait SnapshotStorage: Send + Sync {
172    /// Checks if a snapshot exists at this location (manifest.json exists).
173    async fn exists(&self) -> Result<bool>;
174
175    /// Reads the manifest file.
176    async fn read_manifest(&self) -> Result<Manifest>;
177
178    /// Writes the manifest file.
179    async fn write_manifest(&self, manifest: &Manifest) -> Result<()>;
180
181    /// Writes the schema index to schema/schemas.json.
182    async fn write_schema(&self, schema: &SchemaSnapshot) -> Result<()>;
183
184    /// Writes a text file to a relative path under the snapshot root.
185    async fn write_text(&self, path: &str, content: &str) -> Result<()>;
186
187    /// Reads a text file from a relative path under the snapshot root.
188    async fn read_text(&self, path: &str) -> Result<String>;
189
190    /// Creates a directory-like prefix under the snapshot root when needed by the backend.
191    async fn create_dir_all(&self, path: &str) -> Result<()>;
192
193    /// Lists files recursively under a relative prefix.
194    async fn list_files_recursive(&self, prefix: &str) -> Result<Vec<String>>;
195
196    /// Deletes the entire snapshot (for --force).
197    async fn delete_snapshot(&self) -> Result<()>;
198}
199
200/// OpenDAL-based implementation of SnapshotStorage.
201pub struct OpenDalStorage {
202    object_store: ObjectStore,
203    target_uri: String,
204}
205
206impl OpenDalStorage {
207    fn new_operator_rooted(object_store: ObjectStore, target_uri: &str) -> Self {
208        Self {
209            object_store,
210            target_uri: target_uri.to_string(),
211        }
212    }
213
214    fn finish_local_store(object_store: ObjectStore) -> ObjectStore {
215        with_instrument_layers(object_store, false)
216    }
217
218    fn finish_remote_store(object_store: ObjectStore) -> ObjectStore {
219        with_instrument_layers(with_retry_layers(object_store), false)
220    }
221
222    fn ensure_backend_enabled(uri: &str, enabled: bool, reason: &'static str) -> Result<()> {
223        if enabled {
224            Ok(())
225        } else {
226            InvalidUriSnafu { uri, reason }.fail()
227        }
228    }
229
230    fn validate_remote_config<E: std::fmt::Display>(
231        uri: &str,
232        backend: &str,
233        result: std::result::Result<(), E>,
234    ) -> Result<()> {
235        result.map_err(|error| {
236            InvalidUriSnafu {
237                uri,
238                reason: format!("invalid {} config: {}", backend, error),
239            }
240            .build()
241        })
242    }
243
244    /// Creates a new storage from a file:// URI.
245    pub fn from_file_uri(uri: &str) -> Result<Self> {
246        let path = extract_file_path_from_uri(uri)?;
247
248        let builder = Fs::default().root(&path);
249        let object_store = ObjectStore::new(builder)
250            .context(BuildObjectStoreSnafu)?
251            .finish();
252        Ok(Self::new_operator_rooted(
253            Self::finish_local_store(object_store),
254            uri,
255        ))
256    }
257
258    fn from_file_uri_with_config(uri: &str, storage: &ObjectStoreConfig) -> Result<Self> {
259        if storage.enable_s3 || storage.enable_oss || storage.enable_gcs || storage.enable_azblob {
260            return InvalidUriSnafu {
261                uri,
262                reason: "file:// cannot be used with remote storage flags",
263            }
264            .fail();
265        }
266
267        Self::from_file_uri(uri)
268    }
269
270    fn from_s3_uri(uri: &str, storage: &ObjectStoreConfig) -> Result<Self> {
271        Self::ensure_backend_enabled(
272            uri,
273            storage.enable_s3,
274            "s3:// requires --s3 and related options",
275        )?;
276
277        let location = extract_remote_location(uri)?;
278        let mut config = storage.s3.clone();
279        config.s3_bucket = location.bucket_or_container;
280        config.s3_root = location.root;
281        Self::validate_remote_config(uri, "s3", config.validate())?;
282
283        let conn: S3Connection = config.into();
284        let object_store = ObjectStore::new(S3::from(&conn))
285            .context(BuildObjectStoreSnafu)?
286            .finish();
287        Ok(Self::new_operator_rooted(
288            Self::finish_remote_store(object_store),
289            uri,
290        ))
291    }
292
293    fn from_oss_uri(uri: &str, storage: &ObjectStoreConfig) -> Result<Self> {
294        Self::ensure_backend_enabled(
295            uri,
296            storage.enable_oss,
297            "oss:// requires --oss and related options",
298        )?;
299
300        let location = extract_remote_location(uri)?;
301        let mut config = storage.oss.clone();
302        config.oss_bucket = location.bucket_or_container;
303        config.oss_root = location.root;
304        Self::validate_remote_config(uri, "oss", config.validate())?;
305
306        let conn: OssConnection = config.into();
307        let object_store = ObjectStore::new(Oss::from(&conn))
308            .context(BuildObjectStoreSnafu)?
309            .finish();
310        Ok(Self::new_operator_rooted(
311            Self::finish_remote_store(object_store),
312            uri,
313        ))
314    }
315
316    fn from_gcs_uri(uri: &str, storage: &ObjectStoreConfig) -> Result<Self> {
317        Self::ensure_backend_enabled(
318            uri,
319            storage.enable_gcs,
320            "gs:// or gcs:// requires --gcs and related options",
321        )?;
322
323        let location = extract_remote_location(uri)?;
324        let mut config = storage.gcs.clone();
325        config.gcs_bucket = location.bucket_or_container;
326        config.gcs_root = location.root;
327        Self::validate_remote_config(uri, "gcs", config.validate())?;
328
329        let conn: GcsConnection = config.into();
330        let object_store = ObjectStore::new(Gcs::from(&conn))
331            .context(BuildObjectStoreSnafu)?
332            .finish();
333        Ok(Self::new_operator_rooted(
334            Self::finish_remote_store(object_store),
335            uri,
336        ))
337    }
338
339    fn from_azblob_uri(uri: &str, storage: &ObjectStoreConfig) -> Result<Self> {
340        Self::ensure_backend_enabled(
341            uri,
342            storage.enable_azblob,
343            "azblob:// requires --azblob and related options",
344        )?;
345
346        let location = extract_remote_location(uri)?;
347        let mut config = storage.azblob.clone();
348        config.azblob_container = location.bucket_or_container;
349        config.azblob_root = location.root;
350        Self::validate_remote_config(uri, "azblob", config.validate())?;
351
352        let conn: AzblobConnection = config.into();
353        let object_store = ObjectStore::new(Azblob::from(&conn))
354            .context(BuildObjectStoreSnafu)?
355            .finish();
356        Ok(Self::new_operator_rooted(
357            Self::finish_remote_store(object_store),
358            uri,
359        ))
360    }
361
362    /// Creates a new storage from a URI and object store config.
363    pub fn from_uri(uri: &str, storage: &ObjectStoreConfig) -> Result<Self> {
364        match StorageScheme::from_uri(uri)? {
365            StorageScheme::File => Self::from_file_uri_with_config(uri, storage),
366            StorageScheme::S3 => Self::from_s3_uri(uri, storage),
367            StorageScheme::Oss => Self::from_oss_uri(uri, storage),
368            StorageScheme::Gcs => Self::from_gcs_uri(uri, storage),
369            StorageScheme::Azblob => Self::from_azblob_uri(uri, storage),
370        }
371    }
372
373    /// Reads a file as bytes.
374    async fn read_file(&self, path: &str) -> Result<Vec<u8>> {
375        let data = self
376            .object_store
377            .read(path)
378            .await
379            .context(StorageOperationSnafu {
380                operation: format!("read {}", path),
381            })?;
382        Ok(data.to_vec())
383    }
384
385    /// Writes bytes to a file.
386    async fn write_file(&self, path: &str, data: Vec<u8>) -> Result<()> {
387        self.object_store
388            .write(path, data)
389            .await
390            .map(|_| ())
391            .context(StorageOperationSnafu {
392                operation: format!("write {}", path),
393            })
394    }
395
396    /// Checks if a file exists using stat.
397    async fn file_exists(&self, path: &str) -> Result<bool> {
398        match self.object_store.stat(path).await {
399            Ok(_) => Ok(true),
400            Err(e) if e.kind() == object_store::ErrorKind::NotFound => Ok(false),
401            Err(e) => Err(e).context(StorageOperationSnafu {
402                operation: format!("check exists {}", path),
403            }),
404        }
405    }
406
407    #[cfg(test)]
408    pub async fn read_schema(&self) -> Result<SchemaSnapshot> {
409        let schemas_path = schema_index_path();
410        let schemas: Vec<SchemaDefinition> = if self.file_exists(&schemas_path).await? {
411            let data = self.read_file(&schemas_path).await?;
412            serde_json::from_slice(&data).context(ManifestParseSnafu)?
413        } else {
414            vec![]
415        };
416
417        Ok(SchemaSnapshot { schemas })
418    }
419}
420
421#[async_trait]
422impl SnapshotStorage for OpenDalStorage {
423    async fn exists(&self) -> Result<bool> {
424        self.file_exists(MANIFEST_FILE).await
425    }
426
427    async fn read_manifest(&self) -> Result<Manifest> {
428        ensure_snapshot_exists(self).await?;
429
430        let data = self.read_file(MANIFEST_FILE).await?;
431        serde_json::from_slice(&data).context(ManifestParseSnafu)
432    }
433
434    async fn write_manifest(&self, manifest: &Manifest) -> Result<()> {
435        let data = serde_json::to_vec_pretty(manifest).context(ManifestSerializeSnafu)?;
436        self.write_file(MANIFEST_FILE, data).await
437    }
438
439    async fn write_schema(&self, schema: &SchemaSnapshot) -> Result<()> {
440        let schemas_path = schema_index_path();
441        let schemas_data =
442            serde_json::to_vec_pretty(&schema.schemas).context(ManifestSerializeSnafu)?;
443        self.write_file(&schemas_path, schemas_data).await
444    }
445
446    async fn write_text(&self, path: &str, content: &str) -> Result<()> {
447        self.write_file(path, content.as_bytes().to_vec()).await
448    }
449
450    async fn read_text(&self, path: &str) -> Result<String> {
451        let data = self.read_file(path).await?;
452        String::from_utf8(data).context(TextDecodeSnafu)
453    }
454
455    async fn create_dir_all(&self, path: &str) -> Result<()> {
456        self.object_store
457            .create_dir(path)
458            .await
459            .context(StorageOperationSnafu {
460                operation: format!("create dir {}", path),
461            })
462    }
463
464    async fn list_files_recursive(&self, prefix: &str) -> Result<Vec<String>> {
465        let mut lister = match self.object_store.lister_with(prefix).recursive(true).await {
466            Ok(lister) => lister,
467            Err(error) if error.kind() == ErrorKind::NotFound => return Ok(Vec::new()),
468            Err(error) => {
469                return Err(error).context(StorageOperationSnafu {
470                    operation: format!("list {}", prefix),
471                });
472            }
473        };
474
475        let mut files = Vec::new();
476        while let Some(entry) = lister.try_next().await.context(StorageOperationSnafu {
477            operation: format!("list {}", prefix),
478        })? {
479            if entry.metadata().is_dir() {
480                continue;
481            }
482            files.push(entry.path().to_string());
483        }
484        Ok(files)
485    }
486
487    async fn delete_snapshot(&self) -> Result<()> {
488        self.object_store
489            .remove_all("/")
490            .await
491            .context(StorageOperationSnafu {
492                operation: "delete snapshot",
493            })
494    }
495}
496
497#[cfg(test)]
498mod tests {
499    use std::collections::HashMap;
500    use std::path::Path;
501
502    use object_store::ObjectStore;
503    use object_store::services::Fs;
504    use tempfile::tempdir;
505    use url::Url;
506
507    use super::*;
508    use crate::data::export_v2::manifest::{DataFormat, TimeRange};
509    use crate::data::export_v2::schema::SchemaDefinition;
510
511    fn make_storage_with_rooted_fs(dir: &std::path::Path) -> OpenDalStorage {
512        let object_store = ObjectStore::new(Fs::default().root(dir.to_str().unwrap()))
513            .unwrap()
514            .finish();
515        OpenDalStorage::new_operator_rooted(
516            OpenDalStorage::finish_local_store(object_store),
517            Url::from_directory_path(dir).unwrap().as_ref(),
518        )
519    }
520
521    #[test]
522    fn test_validate_uri_valid() {
523        assert_eq!(validate_uri("s3://bucket/path").unwrap(), StorageScheme::S3);
524        assert_eq!(
525            validate_uri("oss://bucket/path").unwrap(),
526            StorageScheme::Oss
527        );
528        assert_eq!(
529            validate_uri("gs://bucket/path").unwrap(),
530            StorageScheme::Gcs
531        );
532        assert_eq!(
533            validate_uri("gcs://bucket/path").unwrap(),
534            StorageScheme::Gcs
535        );
536        assert_eq!(
537            validate_uri("azblob://container/path").unwrap(),
538            StorageScheme::Azblob
539        );
540        assert_eq!(
541            validate_uri("file:///tmp/backup").unwrap(),
542            StorageScheme::File
543        );
544    }
545
546    #[test]
547    fn test_validate_uri_invalid() {
548        // Bare paths should be rejected
549        assert!(validate_uri("/tmp/backup").is_err());
550        assert!(validate_uri("./backup").is_err());
551        assert!(validate_uri("backup").is_err());
552
553        // Unknown schemes
554        assert!(validate_uri("ftp://server/path").is_err());
555    }
556
557    #[test]
558    fn test_extract_remote_location_requires_non_empty_root() {
559        assert!(extract_remote_location("s3://bucket").is_err());
560        assert!(extract_remote_location("s3://bucket/").is_err());
561        assert!(extract_remote_location("oss://bucket").is_err());
562        assert!(extract_remote_location("gs://bucket").is_err());
563        assert!(extract_remote_location("azblob://container").is_err());
564    }
565
566    #[cfg(not(windows))]
567    #[test]
568    fn test_extract_path_from_uri_unix_examples() {
569        assert_eq!(
570            extract_file_path_from_uri("file:///tmp/backup").unwrap(),
571            "/tmp/backup"
572        );
573        assert_eq!(
574            extract_file_path_from_uri("file://localhost/tmp/backup").unwrap(),
575            "/tmp/backup"
576        );
577        assert_eq!(
578            extract_file_path_from_uri("file:///tmp/my%20backup").unwrap(),
579            "/tmp/my backup"
580        );
581        assert_eq!(
582            extract_file_path_from_uri("file://localhost/tmp/my%20backup").unwrap(),
583            "/tmp/my backup"
584        );
585    }
586
587    #[test]
588    fn test_extract_file_path_from_uri_rejects_file_host() {
589        assert!(extract_file_path_from_uri("file://tmp/backup").is_err());
590    }
591
592    #[test]
593    fn test_extract_file_path_from_uri_round_trips_directory_url() {
594        let dir = tempdir().unwrap();
595        let uri = Url::from_directory_path(dir.path()).unwrap().to_string();
596        let path = extract_file_path_from_uri(&uri).unwrap();
597
598        assert_eq!(Path::new(&path), dir.path());
599    }
600
601    #[tokio::test]
602    async fn test_read_manifest_reports_requested_uri() {
603        let dir = tempdir().unwrap();
604        let uri = Url::from_directory_path(dir.path()).unwrap().to_string();
605        let storage = OpenDalStorage::from_file_uri(&uri).unwrap();
606
607        let error = storage.read_manifest().await.unwrap_err().to_string();
608
609        assert!(error.contains(uri.as_str()));
610    }
611
612    #[tokio::test]
613    async fn test_manifest_round_trip() {
614        let dir = tempdir().unwrap();
615        let storage = make_storage_with_rooted_fs(dir.path());
616
617        let manifest = Manifest::new_full(
618            "greptime".to_string(),
619            vec!["public".to_string()],
620            TimeRange::unbounded(),
621            DataFormat::Parquet,
622        );
623
624        storage.write_manifest(&manifest).await.unwrap();
625        let loaded = storage.read_manifest().await.unwrap();
626
627        assert_eq!(loaded.catalog, manifest.catalog);
628        assert_eq!(loaded.schemas, manifest.schemas);
629        assert_eq!(loaded.schema_only, manifest.schema_only);
630        assert_eq!(loaded.format, manifest.format);
631        assert_eq!(loaded.snapshot_id, manifest.snapshot_id);
632    }
633
634    #[tokio::test]
635    async fn test_schema_round_trip() {
636        let dir = tempdir().unwrap();
637        let storage = make_storage_with_rooted_fs(dir.path());
638
639        let mut snapshot = SchemaSnapshot::new();
640        snapshot.add_schema(SchemaDefinition {
641            catalog: "greptime".to_string(),
642            name: "test_db".to_string(),
643            options: HashMap::from([("ttl".to_string(), "7d".to_string())]),
644        });
645
646        storage.write_schema(&snapshot).await.unwrap();
647        let loaded = storage.read_schema().await.unwrap();
648
649        assert_eq!(loaded, snapshot);
650    }
651
652    #[tokio::test]
653    async fn test_text_round_trip() {
654        let dir = tempdir().unwrap();
655        let storage = make_storage_with_rooted_fs(dir.path());
656        let content = "CREATE TABLE metrics (ts TIMESTAMP TIME INDEX);";
657
658        storage
659            .write_text("schema/ddl/public.sql", content)
660            .await
661            .unwrap();
662        let loaded = storage.read_text("schema/ddl/public.sql").await.unwrap();
663
664        assert_eq!(loaded, content);
665    }
666
667    #[tokio::test]
668    async fn test_read_text_rejects_invalid_utf8() {
669        let dir = tempdir().unwrap();
670        let storage = make_storage_with_rooted_fs(dir.path());
671
672        storage
673            .write_file("schema/ddl/public.sql", vec![0xff, 0xfe, 0xfd])
674            .await
675            .unwrap();
676
677        let error = storage
678            .read_text("schema/ddl/public.sql")
679            .await
680            .unwrap_err();
681        assert!(error.to_string().contains("UTF-8"));
682    }
683
684    #[tokio::test]
685    async fn test_exists_follows_manifest_presence() {
686        let dir = tempdir().unwrap();
687        let storage = make_storage_with_rooted_fs(dir.path());
688
689        assert!(!storage.exists().await.unwrap());
690
691        storage
692            .write_manifest(&Manifest::new_schema_only(
693                "greptime".to_string(),
694                vec!["public".to_string()],
695            ))
696            .await
697            .unwrap();
698
699        assert!(storage.exists().await.unwrap());
700    }
701
702    #[tokio::test]
703    async fn test_delete_snapshot_only_removes_rooted_contents() {
704        let parent = tempdir().unwrap();
705        let snapshot_root = parent.path().join("snapshot");
706        let sibling = parent.path().join("sibling");
707        std::fs::create_dir_all(&snapshot_root).unwrap();
708        std::fs::create_dir_all(&sibling).unwrap();
709        std::fs::write(snapshot_root.join("manifest.json"), b"{}").unwrap();
710        std::fs::write(sibling.join("keep.txt"), b"keep").unwrap();
711
712        let storage = make_storage_with_rooted_fs(&snapshot_root);
713        storage.delete_snapshot().await.unwrap();
714
715        assert!(!snapshot_root.join("manifest.json").exists());
716        assert!(sibling.join("keep.txt").exists());
717    }
718}