common_datasource/
object_store.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod fs;
16pub mod oss;
17pub mod s3;
18
19use std::collections::HashMap;
20
21use lazy_static::lazy_static;
22use object_store::ObjectStore;
23use regex::Regex;
24use snafu::{OptionExt, ResultExt};
25use url::{ParseError, Url};
26
27use self::fs::build_fs_backend;
28use self::s3::build_s3_backend;
29use crate::error::{self, Result};
30use crate::object_store::oss::build_oss_backend;
31use crate::util::find_dir_and_filename;
32
33pub const FS_SCHEMA: &str = "FS";
34pub const S3_SCHEMA: &str = "S3";
35pub const OSS_SCHEMA: &str = "OSS";
36
37/// Returns `(schema, Option<host>, path)`
38pub fn parse_url(url: &str) -> Result<(String, Option<String>, String)> {
39    #[cfg(windows)]
40    {
41        // On Windows, the url may start with `C:/`.
42        if handle_windows_path(url).is_some() {
43            return Ok((FS_SCHEMA.to_string(), None, url.to_string()));
44        }
45    }
46    let parsed_url = Url::parse(url);
47    match parsed_url {
48        Ok(url) => Ok((
49            url.scheme().to_string(),
50            url.host_str().map(|s| s.to_string()),
51            url.path().to_string(),
52        )),
53        Err(ParseError::RelativeUrlWithoutBase) => {
54            Ok((FS_SCHEMA.to_string(), None, url.to_string()))
55        }
56        Err(err) => Err(err).context(error::InvalidUrlSnafu { url }),
57    }
58}
59
60pub fn build_backend(url: &str, connection: &HashMap<String, String>) -> Result<ObjectStore> {
61    let (schema, host, path) = parse_url(url)?;
62    let (root, _) = find_dir_and_filename(&path);
63
64    match schema.to_uppercase().as_str() {
65        S3_SCHEMA => {
66            let host = host.context(error::EmptyHostPathSnafu {
67                url: url.to_string(),
68            })?;
69            Ok(build_s3_backend(&host, &root, connection)?)
70        }
71        OSS_SCHEMA => {
72            let host = host.context(error::EmptyHostPathSnafu {
73                url: url.to_string(),
74            })?;
75            Ok(build_oss_backend(&host, &root, connection)?)
76        }
77        FS_SCHEMA => Ok(build_fs_backend(&root)?),
78
79        _ => error::UnsupportedBackendProtocolSnafu {
80            protocol: schema,
81            url,
82        }
83        .fail(),
84    }
85}
86
87lazy_static! {
88    static ref DISK_SYMBOL_PATTERN: Regex = Regex::new("^([A-Za-z]:/)").unwrap();
89}
90
91pub fn handle_windows_path(url: &str) -> Option<String> {
92    DISK_SYMBOL_PATTERN
93        .captures(url)
94        .map(|captures| captures[0].to_string())
95}
96
97#[cfg(test)]
98mod tests {
99    use super::handle_windows_path;
100
101    #[test]
102    fn test_handle_windows_path() {
103        assert_eq!(
104            handle_windows_path("C:/to/path/file"),
105            Some("C:/".to_string())
106        );
107        assert_eq!(handle_windows_path("https://google.com"), None);
108        assert_eq!(handle_windows_path("s3://bucket/path/to"), None);
109    }
110}