common_datasource/
object_store.rs1pub mod azblob;
16pub mod fs;
17pub mod gcs;
18pub mod oss;
19pub mod s3;
20
21use std::collections::HashMap;
22
23use lazy_static::lazy_static;
24use object_store::ObjectStore;
25use regex::Regex;
26use snafu::{OptionExt, ResultExt};
27use url::{ParseError, Url};
28
29use self::azblob::build_azblob_backend;
30use self::fs::build_fs_backend;
31use self::gcs::build_gcs_backend;
32use self::s3::build_s3_backend;
33use crate::error::{self, Result};
34use crate::object_store::oss::build_oss_backend;
35use crate::util::find_dir_and_filename;
36
37pub const FS_SCHEMA: &str = "FS";
38pub const S3_SCHEMA: &str = "S3";
39pub const OSS_SCHEMA: &str = "OSS";
40pub const GCS_SCHEMA: &str = "GCS";
41pub const AZBLOB_SCHEMA: &str = "AZBLOB";
42
43pub fn parse_url(url: &str) -> Result<(String, Option<String>, String)> {
45 #[cfg(windows)]
46 {
47 if handle_windows_path(url).is_some() {
49 return Ok((FS_SCHEMA.to_string(), None, url.to_string()));
50 }
51 }
52 let parsed_url = Url::parse(url);
53 match parsed_url {
54 Ok(url) => Ok((
55 url.scheme().to_string(),
56 url.host_str().map(|s| s.to_string()),
57 url.path().to_string(),
58 )),
59 Err(ParseError::RelativeUrlWithoutBase) => {
60 Ok((FS_SCHEMA.to_string(), None, url.to_string()))
61 }
62 Err(err) => Err(err).context(error::InvalidUrlSnafu { url }),
63 }
64}
65
66pub fn build_backend(url: &str, connection: &HashMap<String, String>) -> Result<ObjectStore> {
67 let (schema, host, path) = parse_url(url)?;
68 let (root, _) = find_dir_and_filename(&path);
69
70 match schema.to_uppercase().as_str() {
71 S3_SCHEMA => {
72 let host = host.context(error::EmptyHostPathSnafu {
73 url: url.to_string(),
74 })?;
75 Ok(build_s3_backend(&host, &root, connection)?)
76 }
77 OSS_SCHEMA => {
78 let host = host.context(error::EmptyHostPathSnafu {
79 url: url.to_string(),
80 })?;
81 Ok(build_oss_backend(&host, &root, connection)?)
82 }
83 GCS_SCHEMA => {
84 let host = host.context(error::EmptyHostPathSnafu {
85 url: url.to_string(),
86 })?;
87 Ok(build_gcs_backend(&host, &root, connection)?)
88 }
89 AZBLOB_SCHEMA => {
90 let host = host.context(error::EmptyHostPathSnafu {
91 url: url.to_string(),
92 })?;
93 Ok(build_azblob_backend(&host, &root, connection)?)
94 }
95 FS_SCHEMA => Ok(build_fs_backend(&root)?),
96
97 _ => error::UnsupportedBackendProtocolSnafu {
98 protocol: schema,
99 url,
100 }
101 .fail(),
102 }
103}
104
105lazy_static! {
106 static ref DISK_SYMBOL_PATTERN: Regex = Regex::new("^([A-Za-z]:/)").unwrap();
107}
108
109pub fn handle_windows_path(url: &str) -> Option<String> {
110 DISK_SYMBOL_PATTERN
111 .captures(url)
112 .map(|captures| captures[0].to_string())
113}
114
115#[cfg(test)]
116mod tests {
117 use super::handle_windows_path;
118
119 #[test]
120 fn test_handle_windows_path() {
121 assert_eq!(
122 handle_windows_path("C:/to/path/file"),
123 Some("C:/".to_string())
124 );
125 assert_eq!(handle_windows_path("https://google.com"), None);
126 assert_eq!(handle_windows_path("s3://bucket/path/to"), None);
127 }
128}