Skip to main content

cli/common/
object_store.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_base::secrets::{ExposeSecret, SecretString};
16use common_error::ext::BoxedError;
17use object_store::services::{Azblob, Fs, Gcs, Oss, S3};
18use object_store::util::{with_instrument_layers, with_retry_layers};
19use object_store::{AzblobConnection, GcsConnection, ObjectStore, OssConnection, S3Connection};
20use paste::paste;
21use snafu::ResultExt;
22
23use crate::error::{self};
24
25/// Trait to convert CLI field types to target struct field types.
26/// This enables `Option<SecretString>` (CLI) -> `SecretString` (target) conversions,
27/// allowing us to distinguish "not provided" from "provided but empty".
28trait IntoField<T> {
29    fn into_field(self) -> T;
30}
31
32/// Identity conversion for types that are the same.
33impl<T> IntoField<T> for T {
34    fn into_field(self) -> T {
35        self
36    }
37}
38
39/// Convert `Option<SecretString>` to `SecretString`, using default for None.
40impl IntoField<SecretString> for Option<SecretString> {
41    fn into_field(self) -> SecretString {
42        self.unwrap_or_default()
43    }
44}
45
46/// Trait for checking if a field is effectively empty.
47///
48/// **`is_empty()`**: Checks if the field has no meaningful value
49/// - Used when backend is enabled to validate required fields
50/// - `None`, `Some("")`, `false`, or `""` are considered empty
51trait FieldValidator {
52    /// Check if the field is empty (has no meaningful value).
53    fn is_empty(&self) -> bool;
54}
55
56/// String fields: empty if the string is empty
57impl FieldValidator for String {
58    fn is_empty(&self) -> bool {
59        self.is_empty()
60    }
61}
62
63/// Bool fields: false is considered "empty", true is "provided"
64impl FieldValidator for bool {
65    fn is_empty(&self) -> bool {
66        !self
67    }
68}
69
70/// Option<String> fields: None or empty content is empty
71impl FieldValidator for Option<String> {
72    fn is_empty(&self) -> bool {
73        self.as_ref().is_none_or(|s| s.is_empty())
74    }
75}
76
77/// Option<SecretString> fields: None or empty secret is empty
78/// For secrets, Some("") is treated as "not provided" for both checks
79impl FieldValidator for Option<SecretString> {
80    fn is_empty(&self) -> bool {
81        self.as_ref().is_none_or(|s| s.expose_secret().is_empty())
82    }
83}
84
85macro_rules! wrap_with_clap_prefix {
86    (
87        $new_name:ident, $prefix:literal, $enable_flag:literal, $base:ty, {
88            $( $( #[doc = $doc:expr] )? $( #[alias = $alias:literal] )? $( #[hide = $hide:literal] )? $field:ident : $type:ty $( = $default:expr )? ),* $(,)?
89        }
90    ) => {
91        paste!{
92            #[derive(clap::Parser, Debug, Clone, PartialEq, Default)]
93            pub struct $new_name {
94                $(
95                    $( #[doc = $doc] )?
96                    $( #[clap(alias = $alias)] )?
97                    $( #[clap(hide = $hide)] )?
98                    #[clap(long, requires = $enable_flag $(, default_value_t = $default )? )]
99                    pub [<$prefix $field>]: $type,
100                )*
101            }
102
103            impl From<$new_name> for $base {
104                fn from(w: $new_name) -> Self {
105                    Self {
106                        // Use into_field() to handle Option<SecretString> -> SecretString conversion
107                        $( $field: w.[<$prefix $field>].into_field() ),*
108                    }
109                }
110            }
111        }
112    };
113}
114
115/// Macro for declarative backend validation.
116///
117/// # Validation Rules
118///
119/// For each storage backend (S3, OSS, GCS, Azblob), this function validates:
120/// **When backend is enabled** (e.g., `--s3`): All required fields must be non-empty
121///
122/// Note: When backend is disabled, clap's `requires` attribute ensures no configuration
123/// fields can be provided at parse time.
124///
125/// # Syntax
126///
127/// ```ignore
128/// validate_backend!(
129///     enable: self.enable_s3,
130///     name: "S3",
131///     required: [(field1, "name1"), (field2, "name2"), ...],
132///     custom_validator: |missing| { ... }  // optional
133/// )
134/// ```
135///
136/// # Arguments
137///
138/// - `enable`: Boolean expression indicating if backend is enabled
139/// - `name`: Human-readable backend name for error messages
140/// - `required`: Array of (field_ref, field_name) tuples for required fields
141/// - `custom_validator`: Optional closure for complex validation logic
142///
143/// # Example
144///
145/// ```ignore
146/// validate_backend!(
147///     enable: self.enable_s3,
148///     name: "S3",
149///     required: [
150///         (&self.s3.s3_bucket, "bucket"),
151///         (&self.s3.s3_access_key_id, "access key ID"),
152///     ]
153/// )
154/// ```
155macro_rules! validate_backend {
156    (
157        enable: $enable:expr,
158        name: $backend_name:expr,
159        required: [ $( ($field:expr, $field_name:expr) ),* $(,)? ]
160        $(, custom_validator: $custom_validator:expr)?
161    ) => {{
162        if $enable {
163            // Check required fields when backend is enabled
164            let mut missing = Vec::new();
165            $(
166                if FieldValidator::is_empty($field) {
167                    missing.push($field_name);
168                }
169            )*
170
171            // Run custom validation if provided
172            $(
173                $custom_validator(&mut missing);
174            )?
175
176            if !missing.is_empty() {
177                return Err(BoxedError::new(
178                    error::MissingConfigSnafu {
179                        msg: format!(
180                            "{} {} must be set when --{} is enabled.",
181                            $backend_name,
182                            missing.join(", "),
183                            $backend_name.to_lowercase()
184                        ),
185                    }
186                    .build(),
187                ));
188            }
189        }
190
191        Ok(())
192    }};
193}
194
195wrap_with_clap_prefix! {
196    PrefixedAzblobConnection,
197    "azblob-",
198    "enable_azblob",
199    AzblobConnection,
200    {
201        #[doc = "The container of the object store."]
202        container: String = Default::default(),
203        #[doc = "The root of the object store."]
204        root: String = Default::default(),
205        #[doc = "The account name of the object store."]
206        account_name: Option<SecretString>,
207        #[doc = "The account key of the object store."]
208        account_key: Option<SecretString>,
209        #[doc = "The endpoint of the object store."]
210        endpoint: String = Default::default(),
211        #[doc = "The SAS token of the object store."]
212        sas_token: Option<String>,
213    }
214}
215
216impl PrefixedAzblobConnection {
217    pub fn validate(&self) -> Result<(), BoxedError> {
218        validate_backend!(
219            enable: true,
220            name: "AzBlob",
221            required: [
222                (&self.azblob_container, "container"),
223                (&self.azblob_endpoint, "endpoint"),
224            ]
225        )
226    }
227}
228
229wrap_with_clap_prefix! {
230    PrefixedS3Connection,
231    "s3-",
232    "enable_s3",
233    S3Connection,
234    {
235        #[doc = "The bucket of the object store."]
236        bucket: String = Default::default(),
237        #[doc = "The root of the object store."]
238        root: String = Default::default(),
239        #[doc = "The access key ID of the object store."]
240        access_key_id: Option<SecretString>,
241        #[doc = "The secret access key of the object store."]
242        secret_access_key: Option<SecretString>,
243        #[doc = "The endpoint of the object store."]
244        endpoint: Option<String>,
245        #[doc = "The region of the object store."]
246        region: Option<String>,
247        #[doc = "Enable virtual host style for the object store."]
248        enable_virtual_host_style: bool = Default::default(),
249        #[doc = "Disable EC2 metadata service for the object store."]
250        disable_ec2_metadata: bool = Default::default(),
251    }
252}
253
254impl PrefixedS3Connection {
255    pub fn validate(&self) -> Result<(), BoxedError> {
256        validate_backend!(
257            enable: true,
258            name: "S3",
259            required: [
260                (&self.s3_bucket, "bucket"),
261                (&self.s3_region, "region"),
262            ]
263        )
264    }
265}
266
267wrap_with_clap_prefix! {
268    PrefixedOssConnection,
269    "oss-",
270    "enable_oss",
271    OssConnection,
272    {
273        #[doc = "The bucket of the object store."]
274        bucket: String = Default::default(),
275        #[doc = "The root of the object store."]
276        root: String = Default::default(),
277        #[doc = "The access key ID of the object store."]
278        access_key_id: Option<SecretString>,
279        #[doc = "The access key secret of the object store."]
280        access_key_secret: Option<SecretString>,
281        #[doc = "The endpoint of the object store."]
282        endpoint: String = Default::default(),
283    }
284}
285
286impl PrefixedOssConnection {
287    pub fn validate(&self) -> Result<(), BoxedError> {
288        validate_backend!(
289            enable: true,
290            name: "OSS",
291            required: [
292                (&self.oss_bucket, "bucket"),
293                (&self.oss_access_key_id, "access key ID"),
294                (&self.oss_access_key_secret, "access key secret"),
295                (&self.oss_endpoint, "endpoint"),
296            ]
297        )
298    }
299}
300
301wrap_with_clap_prefix! {
302    PrefixedGcsConnection,
303    "gcs-",
304    "enable_gcs",
305    GcsConnection,
306    {
307        #[doc = "The root of the object store."]
308        root: String = Default::default(),
309        #[doc = "The bucket of the object store."]
310        bucket: String = Default::default(),
311        #[doc = "The scope of the object store."]
312        scope: String = Default::default(),
313        #[doc = "The credential path of the object store."]
314        #[hide = true]
315        credential_path: Option<SecretString>,
316        #[doc = "The credential of the object store."]
317        credential: Option<SecretString>,
318        #[doc = "The endpoint of the object store."]
319        endpoint: String = Default::default(),
320    }
321}
322
323impl PrefixedGcsConnection {
324    pub fn validate(&self) -> Result<(), BoxedError> {
325        validate_backend!(
326            enable: true,
327            name: "GCS",
328            required: [
329                (&self.gcs_bucket, "bucket"),
330                (&self.gcs_root, "root"),
331                (&self.gcs_scope, "scope"),
332            ]
333            // No custom_validator needed: GCS supports Application Default Credentials (ADC)
334            // where neither credential_path nor credential is required.
335            // Endpoint is also optional (defaults to https://storage.googleapis.com).
336        )
337    }
338}
339
340/// Common config for object store.
341///
342/// # Dependency Enforcement
343///
344/// Each backend's configuration fields (e.g., `--s3-bucket`) requires its corresponding
345/// enable flag (e.g., `--s3`) to be present. This is enforced by `clap` at parse time
346/// using the `requires` attribute.
347///
348/// For example, attempting to use `--s3-bucket my-bucket` without `--s3` will result in:
349/// ```text
350/// error: The argument '--s3-bucket <BUCKET>' requires '--s3'
351/// ```
352///
353/// This ensures that users cannot accidentally provide backend-specific configuration
354/// without explicitly enabling that backend.
355#[derive(clap::Parser, Debug, Clone, PartialEq, Default)]
356#[clap(group(clap::ArgGroup::new("storage_backend").required(false).multiple(false)))]
357pub struct ObjectStoreConfig {
358    /// Whether to use S3 object store.
359    #[clap(long = "s3", group = "storage_backend")]
360    pub enable_s3: bool,
361
362    #[clap(flatten)]
363    pub s3: PrefixedS3Connection,
364
365    /// Whether to use OSS.
366    #[clap(long = "oss", group = "storage_backend")]
367    pub enable_oss: bool,
368
369    #[clap(flatten)]
370    pub oss: PrefixedOssConnection,
371
372    /// Whether to use GCS.
373    #[clap(long = "gcs", group = "storage_backend")]
374    pub enable_gcs: bool,
375
376    #[clap(flatten)]
377    pub gcs: PrefixedGcsConnection,
378
379    /// Whether to use Azure Blob.
380    #[clap(long = "azblob", group = "storage_backend")]
381    pub enable_azblob: bool,
382
383    #[clap(flatten)]
384    pub azblob: PrefixedAzblobConnection,
385}
386
387/// Creates a new file system object store.
388pub fn new_fs_object_store(root: &str) -> std::result::Result<ObjectStore, BoxedError> {
389    let builder = Fs::default().root(root);
390    let object_store = ObjectStore::new(builder)
391        .context(error::InitBackendSnafu)
392        .map_err(BoxedError::new)?
393        .finish();
394
395    Ok(with_instrument_layers(object_store, false))
396}
397
398macro_rules! gen_object_store_builder {
399    ($method:ident, $field:ident, $conn_type:ty, $service_type:ty) => {
400        pub fn $method(&self) -> Result<ObjectStore, BoxedError> {
401            let config = <$conn_type>::from(self.$field.clone());
402            common_telemetry::info!(
403                "Building object store with {}: {:?}",
404                stringify!($field),
405                config
406            );
407            let object_store = ObjectStore::new(<$service_type>::from(&config))
408                .context(error::InitBackendSnafu)
409                .map_err(BoxedError::new)?
410                .finish();
411            Ok(with_instrument_layers(
412                with_retry_layers(object_store),
413                false,
414            ))
415        }
416    };
417}
418
419impl ObjectStoreConfig {
420    gen_object_store_builder!(build_s3, s3, S3Connection, S3);
421
422    gen_object_store_builder!(build_oss, oss, OssConnection, Oss);
423
424    gen_object_store_builder!(build_gcs, gcs, GcsConnection, Gcs);
425
426    gen_object_store_builder!(build_azblob, azblob, AzblobConnection, Azblob);
427
428    pub fn validate(&self) -> Result<(), BoxedError> {
429        if self.enable_s3 {
430            self.s3.validate()?;
431        }
432        if self.enable_oss {
433            self.oss.validate()?;
434        }
435        if self.enable_gcs {
436            self.gcs.validate()?;
437        }
438        if self.enable_azblob {
439            self.azblob.validate()?;
440        }
441        Ok(())
442    }
443
444    /// Builds the object store from the config.
445    pub fn build(&self) -> Result<Option<ObjectStore>, BoxedError> {
446        self.validate()?;
447
448        if self.enable_s3 {
449            self.build_s3().map(Some)
450        } else if self.enable_oss {
451            self.build_oss().map(Some)
452        } else if self.enable_gcs {
453            self.build_gcs().map(Some)
454        } else if self.enable_azblob {
455            self.build_azblob().map(Some)
456        } else {
457            Ok(None)
458        }
459    }
460}