cli/common/
object_store.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_base::secrets::SecretString;
16use common_error::ext::BoxedError;
17use object_store::services::{Azblob, Fs, Gcs, Oss, S3};
18use object_store::util::{with_instrument_layers, with_retry_layers};
19use object_store::{AzblobConnection, GcsConnection, ObjectStore, OssConnection, S3Connection};
20use paste::paste;
21use snafu::ResultExt;
22
23use crate::error::{self};
24
25macro_rules! wrap_with_clap_prefix {
26    (
27        $new_name:ident, $prefix:literal, $base:ty, {
28            $( $( #[doc = $doc:expr] )? $( #[alias = $alias:literal] )? $field:ident : $type:ty $( = $default:expr )? ),* $(,)?
29        }
30    ) => {
31        paste!{
32            #[derive(clap::Parser, Debug, Clone, PartialEq, Default)]
33            pub struct $new_name {
34                $(
35                    $( #[doc = $doc] )?
36                    $( #[clap(alias = $alias)] )?
37                    #[clap(long $(, default_value_t = $default )? )]
38                    [<$prefix $field>]: $type,
39                )*
40            }
41
42            impl From<$new_name> for $base {
43                fn from(w: $new_name) -> Self {
44                    Self {
45                        $( $field: w.[<$prefix $field>] ),*
46                    }
47                }
48            }
49        }
50    };
51}
52
53wrap_with_clap_prefix! {
54    PrefixedAzblobConnection,
55    "azblob-",
56    AzblobConnection,
57    {
58        #[doc = "The container of the object store."]
59        container: String = Default::default(),
60        #[doc = "The root of the object store."]
61        root: String = Default::default(),
62        #[doc = "The account name of the object store."]
63        account_name: SecretString = Default::default(),
64        #[doc = "The account key of the object store."]
65        account_key: SecretString = Default::default(),
66        #[doc = "The endpoint of the object store."]
67        endpoint: String = Default::default(),
68        #[doc = "The SAS token of the object store."]
69        sas_token: Option<String>,
70    }
71}
72
73wrap_with_clap_prefix! {
74    PrefixedS3Connection,
75    "s3-",
76    S3Connection,
77    {
78        #[doc = "The bucket of the object store."]
79        bucket: String = Default::default(),
80        #[doc = "The root of the object store."]
81        root: String = Default::default(),
82        #[doc = "The access key ID of the object store."]
83        access_key_id: SecretString = Default::default(),
84        #[doc = "The secret access key of the object store."]
85        secret_access_key: SecretString = Default::default(),
86        #[doc = "The endpoint of the object store."]
87        endpoint: Option<String>,
88        #[doc = "The region of the object store."]
89        region: Option<String>,
90        #[doc = "Enable virtual host style for the object store."]
91        enable_virtual_host_style: bool = Default::default(),
92    }
93}
94
95wrap_with_clap_prefix! {
96    PrefixedOssConnection,
97    "oss-",
98    OssConnection,
99    {
100        #[doc = "The bucket of the object store."]
101        bucket: String = Default::default(),
102        #[doc = "The root of the object store."]
103        root: String = Default::default(),
104        #[doc = "The access key ID of the object store."]
105        access_key_id: SecretString = Default::default(),
106        #[doc = "The access key secret of the object store."]
107        access_key_secret: SecretString = Default::default(),
108        #[doc = "The endpoint of the object store."]
109        endpoint: String = Default::default(),
110    }
111}
112
113wrap_with_clap_prefix! {
114    PrefixedGcsConnection,
115    "gcs-",
116    GcsConnection,
117    {
118        #[doc = "The root of the object store."]
119        root: String = Default::default(),
120        #[doc = "The bucket of the object store."]
121        bucket: String = Default::default(),
122        #[doc = "The scope of the object store."]
123        scope: String = Default::default(),
124        #[doc = "The credential path of the object store."]
125        credential_path: SecretString = Default::default(),
126        #[doc = "The credential of the object store."]
127        credential: SecretString = Default::default(),
128        #[doc = "The endpoint of the object store."]
129        endpoint: String = Default::default(),
130    }
131}
132
133/// common config for object store.
134#[derive(clap::Parser, Debug, Clone, PartialEq, Default)]
135pub struct ObjectStoreConfig {
136    /// Whether to use S3 object store.
137    #[clap(long, alias = "s3")]
138    pub enable_s3: bool,
139
140    #[clap(flatten)]
141    pub s3: PrefixedS3Connection,
142
143    /// Whether to use OSS.
144    #[clap(long, alias = "oss")]
145    pub enable_oss: bool,
146
147    #[clap(flatten)]
148    pub oss: PrefixedOssConnection,
149
150    /// Whether to use GCS.
151    #[clap(long, alias = "gcs")]
152    pub enable_gcs: bool,
153
154    #[clap(flatten)]
155    pub gcs: PrefixedGcsConnection,
156
157    /// Whether to use Azure Blob.
158    #[clap(long, alias = "azblob")]
159    pub enable_azblob: bool,
160
161    #[clap(flatten)]
162    pub azblob: PrefixedAzblobConnection,
163}
164
165/// Creates a new file system object store.
166pub fn new_fs_object_store(root: &str) -> std::result::Result<ObjectStore, BoxedError> {
167    let builder = Fs::default().root(root);
168    let object_store = ObjectStore::new(builder)
169        .context(error::InitBackendSnafu)
170        .map_err(BoxedError::new)?
171        .finish();
172
173    Ok(with_instrument_layers(object_store, false))
174}
175
176impl ObjectStoreConfig {
177    /// Builds the object store from the config.
178    pub fn build(&self) -> Result<Option<ObjectStore>, BoxedError> {
179        let object_store = if self.enable_s3 {
180            let s3 = S3Connection::from(self.s3.clone());
181            common_telemetry::info!("Building object store with s3: {:?}", s3);
182            Some(
183                ObjectStore::new(S3::from(&s3))
184                    .context(error::InitBackendSnafu)
185                    .map_err(BoxedError::new)?
186                    .finish(),
187            )
188        } else if self.enable_oss {
189            let oss = OssConnection::from(self.oss.clone());
190            common_telemetry::info!("Building object store with oss: {:?}", oss);
191            Some(
192                ObjectStore::new(Oss::from(&oss))
193                    .context(error::InitBackendSnafu)
194                    .map_err(BoxedError::new)?
195                    .finish(),
196            )
197        } else if self.enable_gcs {
198            let gcs = GcsConnection::from(self.gcs.clone());
199            common_telemetry::info!("Building object store with gcs: {:?}", gcs);
200            Some(
201                ObjectStore::new(Gcs::from(&gcs))
202                    .context(error::InitBackendSnafu)
203                    .map_err(BoxedError::new)?
204                    .finish(),
205            )
206        } else if self.enable_azblob {
207            let azblob = AzblobConnection::from(self.azblob.clone());
208            common_telemetry::info!("Building object store with azblob: {:?}", azblob);
209            Some(
210                ObjectStore::new(Azblob::from(&azblob))
211                    .context(error::InitBackendSnafu)
212                    .map_err(BoxedError::new)?
213                    .finish(),
214            )
215        } else {
216            None
217        };
218
219        let object_store = object_store
220            .map(|object_store| with_instrument_layers(with_retry_layers(object_store), false));
221
222        Ok(object_store)
223    }
224}