1use async_trait::async_trait;
21use futures::TryStreamExt;
22use object_store::services::{Azblob, Fs, Gcs, Oss, S3};
23use object_store::util::{with_instrument_layers, with_retry_layers};
24use object_store::{
25 AzblobConnection, ErrorKind, GcsConnection, ObjectStore, OssConnection, S3Connection,
26};
27use snafu::ResultExt;
28use url::Url;
29
30use crate::common::ObjectStoreConfig;
31use crate::data::export_v2::error::{
32 BuildObjectStoreSnafu, InvalidUriSnafu, ManifestParseSnafu, ManifestSerializeSnafu, Result,
33 SnapshotNotFoundSnafu, StorageOperationSnafu, TextDecodeSnafu, UnsupportedSchemeSnafu,
34 UrlParseSnafu,
35};
36use crate::data::export_v2::manifest::{MANIFEST_FILE, Manifest};
37#[cfg(test)]
38use crate::data::export_v2::schema::SchemaDefinition;
39use crate::data::export_v2::schema::{SCHEMA_DIR, SCHEMAS_FILE, SchemaSnapshot};
40
41struct RemoteLocation {
42 bucket_or_container: String,
43 root: String,
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum StorageScheme {
49 S3,
51 Oss,
53 Gcs,
55 Azblob,
57 File,
59}
60
61impl StorageScheme {
62 pub fn from_uri(uri: &str) -> Result<Self> {
64 let url = Url::parse(uri).context(UrlParseSnafu)?;
65
66 match url.scheme() {
67 "s3" => Ok(Self::S3),
68 "oss" => Ok(Self::Oss),
69 "gs" | "gcs" => Ok(Self::Gcs),
70 "azblob" => Ok(Self::Azblob),
71 "file" => Ok(Self::File),
72 scheme => UnsupportedSchemeSnafu { scheme }.fail(),
73 }
74 }
75}
76
77fn extract_remote_location(uri: &str) -> Result<RemoteLocation> {
79 let url = Url::parse(uri).context(UrlParseSnafu)?;
80 let bucket_or_container = url.host_str().unwrap_or("").to_string();
81 if bucket_or_container.is_empty() {
82 return InvalidUriSnafu {
83 uri,
84 reason: "URI must include bucket/container in host",
85 }
86 .fail();
87 }
88
89 let root = url.path().trim_start_matches('/').to_string();
90 if root.is_empty() {
91 return InvalidUriSnafu {
92 uri,
93 reason: "snapshot URI must include a non-empty path after the bucket/container",
94 }
95 .fail();
96 }
97
98 Ok(RemoteLocation {
99 bucket_or_container,
100 root,
101 })
102}
103
104pub fn validate_uri(uri: &str) -> Result<StorageScheme> {
117 if !uri.contains("://") {
119 return InvalidUriSnafu {
120 uri,
121 reason: "URI must have a scheme (e.g., s3://, file://). Bare paths are not supported.",
122 }
123 .fail();
124 }
125
126 StorageScheme::from_uri(uri)
127}
128
129fn schema_index_path() -> String {
130 format!("{}/{}", SCHEMA_DIR, SCHEMAS_FILE)
131}
132
133fn extract_file_path_from_uri(uri: &str) -> Result<String> {
135 let url = Url::parse(uri).context(UrlParseSnafu)?;
136
137 match url.host_str() {
138 Some(host) if !host.is_empty() && host != "localhost" => InvalidUriSnafu {
139 uri,
140 reason: "file:// URI must use an absolute path like file:///tmp/backup",
141 }
142 .fail(),
143 _ => url
144 .to_file_path()
145 .map_err(|_| {
146 InvalidUriSnafu {
147 uri,
148 reason: "file:// URI must use an absolute path like file:///tmp/backup",
149 }
150 .build()
151 })
152 .map(|path| path.to_string_lossy().into_owned()),
153 }
154}
155
156async fn ensure_snapshot_exists(storage: &OpenDalStorage) -> Result<()> {
157 if storage.exists().await? {
158 Ok(())
159 } else {
160 SnapshotNotFoundSnafu {
161 uri: storage.target_uri.as_str(),
162 }
163 .fail()
164 }
165}
166
167#[async_trait]
171pub trait SnapshotStorage: Send + Sync {
172 async fn exists(&self) -> Result<bool>;
174
175 async fn read_manifest(&self) -> Result<Manifest>;
177
178 async fn write_manifest(&self, manifest: &Manifest) -> Result<()>;
180
181 async fn write_schema(&self, schema: &SchemaSnapshot) -> Result<()>;
183
184 async fn write_text(&self, path: &str, content: &str) -> Result<()>;
186
187 async fn read_text(&self, path: &str) -> Result<String>;
189
190 async fn create_dir_all(&self, path: &str) -> Result<()>;
192
193 async fn list_files_recursive(&self, prefix: &str) -> Result<Vec<String>>;
195
196 async fn delete_snapshot(&self) -> Result<()>;
198}
199
200pub struct OpenDalStorage {
202 object_store: ObjectStore,
203 target_uri: String,
204}
205
206impl OpenDalStorage {
207 fn new_operator_rooted(object_store: ObjectStore, target_uri: &str) -> Self {
208 Self {
209 object_store,
210 target_uri: target_uri.to_string(),
211 }
212 }
213
214 fn finish_local_store(object_store: ObjectStore) -> ObjectStore {
215 with_instrument_layers(object_store, false)
216 }
217
218 fn finish_remote_store(object_store: ObjectStore) -> ObjectStore {
219 with_instrument_layers(with_retry_layers(object_store), false)
220 }
221
222 fn ensure_backend_enabled(uri: &str, enabled: bool, reason: &'static str) -> Result<()> {
223 if enabled {
224 Ok(())
225 } else {
226 InvalidUriSnafu { uri, reason }.fail()
227 }
228 }
229
230 fn validate_remote_config<E: std::fmt::Display>(
231 uri: &str,
232 backend: &str,
233 result: std::result::Result<(), E>,
234 ) -> Result<()> {
235 result.map_err(|error| {
236 InvalidUriSnafu {
237 uri,
238 reason: format!("invalid {} config: {}", backend, error),
239 }
240 .build()
241 })
242 }
243
244 pub fn from_file_uri(uri: &str) -> Result<Self> {
246 let path = extract_file_path_from_uri(uri)?;
247
248 let builder = Fs::default().root(&path);
249 let object_store = ObjectStore::new(builder)
250 .context(BuildObjectStoreSnafu)?
251 .finish();
252 Ok(Self::new_operator_rooted(
253 Self::finish_local_store(object_store),
254 uri,
255 ))
256 }
257
258 fn from_file_uri_with_config(uri: &str, storage: &ObjectStoreConfig) -> Result<Self> {
259 if storage.enable_s3 || storage.enable_oss || storage.enable_gcs || storage.enable_azblob {
260 return InvalidUriSnafu {
261 uri,
262 reason: "file:// cannot be used with remote storage flags",
263 }
264 .fail();
265 }
266
267 Self::from_file_uri(uri)
268 }
269
270 fn from_s3_uri(uri: &str, storage: &ObjectStoreConfig) -> Result<Self> {
271 Self::ensure_backend_enabled(
272 uri,
273 storage.enable_s3,
274 "s3:// requires --s3 and related options",
275 )?;
276
277 let location = extract_remote_location(uri)?;
278 let mut config = storage.s3.clone();
279 config.s3_bucket = location.bucket_or_container;
280 config.s3_root = location.root;
281 Self::validate_remote_config(uri, "s3", config.validate())?;
282
283 let conn: S3Connection = config.into();
284 let object_store = ObjectStore::new(S3::from(&conn))
285 .context(BuildObjectStoreSnafu)?
286 .finish();
287 Ok(Self::new_operator_rooted(
288 Self::finish_remote_store(object_store),
289 uri,
290 ))
291 }
292
293 fn from_oss_uri(uri: &str, storage: &ObjectStoreConfig) -> Result<Self> {
294 Self::ensure_backend_enabled(
295 uri,
296 storage.enable_oss,
297 "oss:// requires --oss and related options",
298 )?;
299
300 let location = extract_remote_location(uri)?;
301 let mut config = storage.oss.clone();
302 config.oss_bucket = location.bucket_or_container;
303 config.oss_root = location.root;
304 Self::validate_remote_config(uri, "oss", config.validate())?;
305
306 let conn: OssConnection = config.into();
307 let object_store = ObjectStore::new(Oss::from(&conn))
308 .context(BuildObjectStoreSnafu)?
309 .finish();
310 Ok(Self::new_operator_rooted(
311 Self::finish_remote_store(object_store),
312 uri,
313 ))
314 }
315
316 fn from_gcs_uri(uri: &str, storage: &ObjectStoreConfig) -> Result<Self> {
317 Self::ensure_backend_enabled(
318 uri,
319 storage.enable_gcs,
320 "gs:// or gcs:// requires --gcs and related options",
321 )?;
322
323 let location = extract_remote_location(uri)?;
324 let mut config = storage.gcs.clone();
325 config.gcs_bucket = location.bucket_or_container;
326 config.gcs_root = location.root;
327 Self::validate_remote_config(uri, "gcs", config.validate())?;
328
329 let conn: GcsConnection = config.into();
330 let object_store = ObjectStore::new(Gcs::from(&conn))
331 .context(BuildObjectStoreSnafu)?
332 .finish();
333 Ok(Self::new_operator_rooted(
334 Self::finish_remote_store(object_store),
335 uri,
336 ))
337 }
338
339 fn from_azblob_uri(uri: &str, storage: &ObjectStoreConfig) -> Result<Self> {
340 Self::ensure_backend_enabled(
341 uri,
342 storage.enable_azblob,
343 "azblob:// requires --azblob and related options",
344 )?;
345
346 let location = extract_remote_location(uri)?;
347 let mut config = storage.azblob.clone();
348 config.azblob_container = location.bucket_or_container;
349 config.azblob_root = location.root;
350 Self::validate_remote_config(uri, "azblob", config.validate())?;
351
352 let conn: AzblobConnection = config.into();
353 let object_store = ObjectStore::new(Azblob::from(&conn))
354 .context(BuildObjectStoreSnafu)?
355 .finish();
356 Ok(Self::new_operator_rooted(
357 Self::finish_remote_store(object_store),
358 uri,
359 ))
360 }
361
362 pub fn from_uri(uri: &str, storage: &ObjectStoreConfig) -> Result<Self> {
364 match StorageScheme::from_uri(uri)? {
365 StorageScheme::File => Self::from_file_uri_with_config(uri, storage),
366 StorageScheme::S3 => Self::from_s3_uri(uri, storage),
367 StorageScheme::Oss => Self::from_oss_uri(uri, storage),
368 StorageScheme::Gcs => Self::from_gcs_uri(uri, storage),
369 StorageScheme::Azblob => Self::from_azblob_uri(uri, storage),
370 }
371 }
372
373 async fn read_file(&self, path: &str) -> Result<Vec<u8>> {
375 let data = self
376 .object_store
377 .read(path)
378 .await
379 .context(StorageOperationSnafu {
380 operation: format!("read {}", path),
381 })?;
382 Ok(data.to_vec())
383 }
384
385 async fn write_file(&self, path: &str, data: Vec<u8>) -> Result<()> {
387 self.object_store
388 .write(path, data)
389 .await
390 .map(|_| ())
391 .context(StorageOperationSnafu {
392 operation: format!("write {}", path),
393 })
394 }
395
396 async fn file_exists(&self, path: &str) -> Result<bool> {
398 match self.object_store.stat(path).await {
399 Ok(_) => Ok(true),
400 Err(e) if e.kind() == object_store::ErrorKind::NotFound => Ok(false),
401 Err(e) => Err(e).context(StorageOperationSnafu {
402 operation: format!("check exists {}", path),
403 }),
404 }
405 }
406
407 #[cfg(test)]
408 pub async fn read_schema(&self) -> Result<SchemaSnapshot> {
409 let schemas_path = schema_index_path();
410 let schemas: Vec<SchemaDefinition> = if self.file_exists(&schemas_path).await? {
411 let data = self.read_file(&schemas_path).await?;
412 serde_json::from_slice(&data).context(ManifestParseSnafu)?
413 } else {
414 vec![]
415 };
416
417 Ok(SchemaSnapshot { schemas })
418 }
419}
420
421#[async_trait]
422impl SnapshotStorage for OpenDalStorage {
423 async fn exists(&self) -> Result<bool> {
424 self.file_exists(MANIFEST_FILE).await
425 }
426
427 async fn read_manifest(&self) -> Result<Manifest> {
428 ensure_snapshot_exists(self).await?;
429
430 let data = self.read_file(MANIFEST_FILE).await?;
431 serde_json::from_slice(&data).context(ManifestParseSnafu)
432 }
433
434 async fn write_manifest(&self, manifest: &Manifest) -> Result<()> {
435 let data = serde_json::to_vec_pretty(manifest).context(ManifestSerializeSnafu)?;
436 self.write_file(MANIFEST_FILE, data).await
437 }
438
439 async fn write_schema(&self, schema: &SchemaSnapshot) -> Result<()> {
440 let schemas_path = schema_index_path();
441 let schemas_data =
442 serde_json::to_vec_pretty(&schema.schemas).context(ManifestSerializeSnafu)?;
443 self.write_file(&schemas_path, schemas_data).await
444 }
445
446 async fn write_text(&self, path: &str, content: &str) -> Result<()> {
447 self.write_file(path, content.as_bytes().to_vec()).await
448 }
449
450 async fn read_text(&self, path: &str) -> Result<String> {
451 let data = self.read_file(path).await?;
452 String::from_utf8(data).context(TextDecodeSnafu)
453 }
454
455 async fn create_dir_all(&self, path: &str) -> Result<()> {
456 self.object_store
457 .create_dir(path)
458 .await
459 .context(StorageOperationSnafu {
460 operation: format!("create dir {}", path),
461 })
462 }
463
464 async fn list_files_recursive(&self, prefix: &str) -> Result<Vec<String>> {
465 let mut lister = match self.object_store.lister_with(prefix).recursive(true).await {
466 Ok(lister) => lister,
467 Err(error) if error.kind() == ErrorKind::NotFound => return Ok(Vec::new()),
468 Err(error) => {
469 return Err(error).context(StorageOperationSnafu {
470 operation: format!("list {}", prefix),
471 });
472 }
473 };
474
475 let mut files = Vec::new();
476 while let Some(entry) = lister.try_next().await.context(StorageOperationSnafu {
477 operation: format!("list {}", prefix),
478 })? {
479 if entry.metadata().is_dir() {
480 continue;
481 }
482 files.push(entry.path().to_string());
483 }
484 Ok(files)
485 }
486
487 async fn delete_snapshot(&self) -> Result<()> {
488 self.object_store
489 .remove_all("/")
490 .await
491 .context(StorageOperationSnafu {
492 operation: "delete snapshot",
493 })
494 }
495}
496
497#[cfg(test)]
498mod tests {
499 use std::collections::HashMap;
500 use std::path::Path;
501
502 use object_store::ObjectStore;
503 use object_store::services::Fs;
504 use tempfile::tempdir;
505 use url::Url;
506
507 use super::*;
508 use crate::data::export_v2::manifest::{DataFormat, TimeRange};
509 use crate::data::export_v2::schema::SchemaDefinition;
510
511 fn make_storage_with_rooted_fs(dir: &std::path::Path) -> OpenDalStorage {
512 let object_store = ObjectStore::new(Fs::default().root(dir.to_str().unwrap()))
513 .unwrap()
514 .finish();
515 OpenDalStorage::new_operator_rooted(
516 OpenDalStorage::finish_local_store(object_store),
517 Url::from_directory_path(dir).unwrap().as_ref(),
518 )
519 }
520
521 #[test]
522 fn test_validate_uri_valid() {
523 assert_eq!(validate_uri("s3://bucket/path").unwrap(), StorageScheme::S3);
524 assert_eq!(
525 validate_uri("oss://bucket/path").unwrap(),
526 StorageScheme::Oss
527 );
528 assert_eq!(
529 validate_uri("gs://bucket/path").unwrap(),
530 StorageScheme::Gcs
531 );
532 assert_eq!(
533 validate_uri("gcs://bucket/path").unwrap(),
534 StorageScheme::Gcs
535 );
536 assert_eq!(
537 validate_uri("azblob://container/path").unwrap(),
538 StorageScheme::Azblob
539 );
540 assert_eq!(
541 validate_uri("file:///tmp/backup").unwrap(),
542 StorageScheme::File
543 );
544 }
545
546 #[test]
547 fn test_validate_uri_invalid() {
548 assert!(validate_uri("/tmp/backup").is_err());
550 assert!(validate_uri("./backup").is_err());
551 assert!(validate_uri("backup").is_err());
552
553 assert!(validate_uri("ftp://server/path").is_err());
555 }
556
557 #[test]
558 fn test_extract_remote_location_requires_non_empty_root() {
559 assert!(extract_remote_location("s3://bucket").is_err());
560 assert!(extract_remote_location("s3://bucket/").is_err());
561 assert!(extract_remote_location("oss://bucket").is_err());
562 assert!(extract_remote_location("gs://bucket").is_err());
563 assert!(extract_remote_location("azblob://container").is_err());
564 }
565
566 #[cfg(not(windows))]
567 #[test]
568 fn test_extract_path_from_uri_unix_examples() {
569 assert_eq!(
570 extract_file_path_from_uri("file:///tmp/backup").unwrap(),
571 "/tmp/backup"
572 );
573 assert_eq!(
574 extract_file_path_from_uri("file://localhost/tmp/backup").unwrap(),
575 "/tmp/backup"
576 );
577 assert_eq!(
578 extract_file_path_from_uri("file:///tmp/my%20backup").unwrap(),
579 "/tmp/my backup"
580 );
581 assert_eq!(
582 extract_file_path_from_uri("file://localhost/tmp/my%20backup").unwrap(),
583 "/tmp/my backup"
584 );
585 }
586
587 #[test]
588 fn test_extract_file_path_from_uri_rejects_file_host() {
589 assert!(extract_file_path_from_uri("file://tmp/backup").is_err());
590 }
591
592 #[test]
593 fn test_extract_file_path_from_uri_round_trips_directory_url() {
594 let dir = tempdir().unwrap();
595 let uri = Url::from_directory_path(dir.path()).unwrap().to_string();
596 let path = extract_file_path_from_uri(&uri).unwrap();
597
598 assert_eq!(Path::new(&path), dir.path());
599 }
600
601 #[tokio::test]
602 async fn test_read_manifest_reports_requested_uri() {
603 let dir = tempdir().unwrap();
604 let uri = Url::from_directory_path(dir.path()).unwrap().to_string();
605 let storage = OpenDalStorage::from_file_uri(&uri).unwrap();
606
607 let error = storage.read_manifest().await.unwrap_err().to_string();
608
609 assert!(error.contains(uri.as_str()));
610 }
611
612 #[tokio::test]
613 async fn test_manifest_round_trip() {
614 let dir = tempdir().unwrap();
615 let storage = make_storage_with_rooted_fs(dir.path());
616
617 let manifest = Manifest::new_full(
618 "greptime".to_string(),
619 vec!["public".to_string()],
620 TimeRange::unbounded(),
621 DataFormat::Parquet,
622 );
623
624 storage.write_manifest(&manifest).await.unwrap();
625 let loaded = storage.read_manifest().await.unwrap();
626
627 assert_eq!(loaded.catalog, manifest.catalog);
628 assert_eq!(loaded.schemas, manifest.schemas);
629 assert_eq!(loaded.schema_only, manifest.schema_only);
630 assert_eq!(loaded.format, manifest.format);
631 assert_eq!(loaded.snapshot_id, manifest.snapshot_id);
632 }
633
634 #[tokio::test]
635 async fn test_schema_round_trip() {
636 let dir = tempdir().unwrap();
637 let storage = make_storage_with_rooted_fs(dir.path());
638
639 let mut snapshot = SchemaSnapshot::new();
640 snapshot.add_schema(SchemaDefinition {
641 catalog: "greptime".to_string(),
642 name: "test_db".to_string(),
643 options: HashMap::from([("ttl".to_string(), "7d".to_string())]),
644 });
645
646 storage.write_schema(&snapshot).await.unwrap();
647 let loaded = storage.read_schema().await.unwrap();
648
649 assert_eq!(loaded, snapshot);
650 }
651
652 #[tokio::test]
653 async fn test_text_round_trip() {
654 let dir = tempdir().unwrap();
655 let storage = make_storage_with_rooted_fs(dir.path());
656 let content = "CREATE TABLE metrics (ts TIMESTAMP TIME INDEX);";
657
658 storage
659 .write_text("schema/ddl/public.sql", content)
660 .await
661 .unwrap();
662 let loaded = storage.read_text("schema/ddl/public.sql").await.unwrap();
663
664 assert_eq!(loaded, content);
665 }
666
667 #[tokio::test]
668 async fn test_read_text_rejects_invalid_utf8() {
669 let dir = tempdir().unwrap();
670 let storage = make_storage_with_rooted_fs(dir.path());
671
672 storage
673 .write_file("schema/ddl/public.sql", vec![0xff, 0xfe, 0xfd])
674 .await
675 .unwrap();
676
677 let error = storage
678 .read_text("schema/ddl/public.sql")
679 .await
680 .unwrap_err();
681 assert!(error.to_string().contains("UTF-8"));
682 }
683
684 #[tokio::test]
685 async fn test_exists_follows_manifest_presence() {
686 let dir = tempdir().unwrap();
687 let storage = make_storage_with_rooted_fs(dir.path());
688
689 assert!(!storage.exists().await.unwrap());
690
691 storage
692 .write_manifest(&Manifest::new_schema_only(
693 "greptime".to_string(),
694 vec!["public".to_string()],
695 ))
696 .await
697 .unwrap();
698
699 assert!(storage.exists().await.unwrap());
700 }
701
702 #[tokio::test]
703 async fn test_delete_snapshot_only_removes_rooted_contents() {
704 let parent = tempdir().unwrap();
705 let snapshot_root = parent.path().join("snapshot");
706 let sibling = parent.path().join("sibling");
707 std::fs::create_dir_all(&snapshot_root).unwrap();
708 std::fs::create_dir_all(&sibling).unwrap();
709 std::fs::write(snapshot_root.join("manifest.json"), b"{}").unwrap();
710 std::fs::write(sibling.join("keep.txt"), b"keep").unwrap();
711
712 let storage = make_storage_with_rooted_fs(&snapshot_root);
713 storage.delete_snapshot().await.unwrap();
714
715 assert!(!snapshot_root.join("manifest.json").exists());
716 assert!(sibling.join("keep.txt").exists());
717 }
718}