Skip to main content

cli/data/
storage_export.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::path::PathBuf;
16
17use common_base::secrets::{ExposeSecret, SecretString};
18use common_error::ext::BoxedError;
19
20use crate::common::{
21    PrefixedAzblobConnection, PrefixedGcsConnection, PrefixedOssConnection, PrefixedS3Connection,
22};
23
24/// Helper function to extract secret string from Option<SecretString>.
25/// Returns empty string if None.
26fn expose_optional_secret(secret: &Option<SecretString>) -> &str {
27    secret
28        .as_ref()
29        .map(|s| s.expose_secret().as_str())
30        .unwrap_or("")
31}
32
33/// Helper function to format root path with leading slash if non-empty.
34fn format_root_path(root: &str) -> String {
35    if root.is_empty() {
36        String::new()
37    } else {
38        format!("/{}", root)
39    }
40}
41
42/// Helper function to mask multiple secrets in a string.
43fn mask_secrets(mut sql: String, secrets: &[&str]) -> String {
44    for secret in secrets {
45        if !secret.is_empty() {
46            sql = sql.replace(secret, "[REDACTED]");
47        }
48    }
49    sql
50}
51
52/// Helper function to format storage URI.
53fn format_uri(scheme: &str, bucket: &str, root: &str, path: &str) -> String {
54    let root = format_root_path(root);
55    format!("{}://{}{}/{}", scheme, bucket, root, path)
56}
57
58/// Trait for storage backends that can be used for data export.
59pub trait StorageExport: Send + Sync {
60    /// Generate the storage path for COPY DATABASE command.
61    /// Returns (path, connection_string) where connection_string includes CONNECTION clause.
62    fn get_storage_path(&self, catalog: &str, schema: &str) -> (String, String);
63
64    /// Format the output path for logging purposes.
65    fn format_output_path(&self, file_path: &str) -> String;
66
67    /// Mask sensitive information in SQL commands for safe logging.
68    fn mask_sensitive_info(&self, sql: &str) -> String;
69}
70
71macro_rules! define_backend {
72    ($name:ident, $config:ty) => {
73        #[derive(Clone)]
74        pub struct $name {
75            config: $config,
76        }
77
78        impl $name {
79            pub fn new(config: $config) -> Result<Self, BoxedError> {
80                config.validate()?;
81                Ok(Self { config })
82            }
83        }
84    };
85}
86
87/// Local file system storage backend.
88#[derive(Clone)]
89pub struct FsBackend {
90    output_dir: String,
91}
92
93impl FsBackend {
94    pub fn new(output_dir: String) -> Self {
95        Self { output_dir }
96    }
97}
98
99impl StorageExport for FsBackend {
100    fn get_storage_path(&self, catalog: &str, schema: &str) -> (String, String) {
101        if self.output_dir.is_empty() {
102            unreachable!("output_dir must be set when not using remote storage")
103        }
104        let path = PathBuf::from(&self.output_dir)
105            .join(catalog)
106            .join(format!("{schema}/"))
107            .to_string_lossy()
108            .to_string();
109        (path, String::new())
110    }
111
112    fn format_output_path(&self, file_path: &str) -> String {
113        format!("{}/{}", self.output_dir, file_path)
114    }
115
116    fn mask_sensitive_info(&self, sql: &str) -> String {
117        sql.to_string()
118    }
119}
120
121define_backend!(S3Backend, PrefixedS3Connection);
122
123impl StorageExport for S3Backend {
124    fn get_storage_path(&self, catalog: &str, schema: &str) -> (String, String) {
125        let s3_path = format_uri(
126            "s3",
127            &self.config.s3_bucket,
128            &self.config.s3_root,
129            &format!("{}/{}/", catalog, schema),
130        );
131
132        let mut connection_options = vec![
133            format!(
134                "ACCESS_KEY_ID='{}'",
135                expose_optional_secret(&self.config.s3_access_key_id)
136            ),
137            format!(
138                "SECRET_ACCESS_KEY='{}'",
139                expose_optional_secret(&self.config.s3_secret_access_key)
140            ),
141        ];
142
143        if let Some(region) = &self.config.s3_region {
144            connection_options.push(format!("REGION='{}'", region));
145        }
146
147        if let Some(endpoint) = &self.config.s3_endpoint {
148            connection_options.push(format!("ENDPOINT='{}'", endpoint));
149        }
150
151        let connection_str = format!(" CONNECTION ({})", connection_options.join(", "));
152        (s3_path, connection_str)
153    }
154
155    fn format_output_path(&self, file_path: &str) -> String {
156        format_uri(
157            "s3",
158            &self.config.s3_bucket,
159            &self.config.s3_root,
160            file_path,
161        )
162    }
163
164    fn mask_sensitive_info(&self, sql: &str) -> String {
165        mask_secrets(
166            sql.to_string(),
167            &[
168                expose_optional_secret(&self.config.s3_access_key_id),
169                expose_optional_secret(&self.config.s3_secret_access_key),
170            ],
171        )
172    }
173}
174
175define_backend!(OssBackend, PrefixedOssConnection);
176
177impl StorageExport for OssBackend {
178    fn get_storage_path(&self, catalog: &str, schema: &str) -> (String, String) {
179        let oss_path = format_uri(
180            "oss",
181            &self.config.oss_bucket,
182            &self.config.oss_root,
183            &format!("{}/{}/", catalog, schema),
184        );
185
186        let connection_options = [
187            format!(
188                "ACCESS_KEY_ID='{}'",
189                expose_optional_secret(&self.config.oss_access_key_id)
190            ),
191            format!(
192                "ACCESS_KEY_SECRET='{}'",
193                expose_optional_secret(&self.config.oss_access_key_secret)
194            ),
195        ];
196
197        let connection_str = format!(" CONNECTION ({})", connection_options.join(", "));
198        (oss_path, connection_str)
199    }
200
201    fn format_output_path(&self, file_path: &str) -> String {
202        format_uri(
203            "oss",
204            &self.config.oss_bucket,
205            &self.config.oss_root,
206            file_path,
207        )
208    }
209
210    fn mask_sensitive_info(&self, sql: &str) -> String {
211        mask_secrets(
212            sql.to_string(),
213            &[
214                expose_optional_secret(&self.config.oss_access_key_id),
215                expose_optional_secret(&self.config.oss_access_key_secret),
216            ],
217        )
218    }
219}
220
221define_backend!(GcsBackend, PrefixedGcsConnection);
222
223impl StorageExport for GcsBackend {
224    fn get_storage_path(&self, catalog: &str, schema: &str) -> (String, String) {
225        let gcs_path = format_uri(
226            "gcs",
227            &self.config.gcs_bucket,
228            &self.config.gcs_root,
229            &format!("{}/{}/", catalog, schema),
230        );
231
232        let mut connection_options = Vec::new();
233        let credential_path = expose_optional_secret(&self.config.gcs_credential_path);
234        if !credential_path.is_empty() {
235            connection_options.push(format!("CREDENTIAL_PATH='{}'", credential_path));
236        }
237
238        let credential = expose_optional_secret(&self.config.gcs_credential);
239        if !credential.is_empty() {
240            connection_options.push(format!("CREDENTIAL='{}'", credential));
241        }
242
243        if !self.config.gcs_endpoint.is_empty() {
244            connection_options.push(format!("ENDPOINT='{}'", self.config.gcs_endpoint));
245        }
246
247        let connection_str = if connection_options.is_empty() {
248            String::new()
249        } else {
250            format!(" CONNECTION ({})", connection_options.join(", "))
251        };
252
253        (gcs_path, connection_str)
254    }
255
256    fn format_output_path(&self, file_path: &str) -> String {
257        format_uri(
258            "gcs",
259            &self.config.gcs_bucket,
260            &self.config.gcs_root,
261            file_path,
262        )
263    }
264
265    fn mask_sensitive_info(&self, sql: &str) -> String {
266        mask_secrets(
267            sql.to_string(),
268            &[expose_optional_secret(&self.config.gcs_credential)],
269        )
270    }
271}
272
273define_backend!(AzblobBackend, PrefixedAzblobConnection);
274
275impl StorageExport for AzblobBackend {
276    fn get_storage_path(&self, catalog: &str, schema: &str) -> (String, String) {
277        let azblob_path = format_uri(
278            "azblob",
279            &self.config.azblob_container,
280            &self.config.azblob_root,
281            &format!("{}/{}/", catalog, schema),
282        );
283
284        let mut connection_options = vec![
285            format!(
286                "ACCOUNT_NAME='{}'",
287                expose_optional_secret(&self.config.azblob_account_name)
288            ),
289            format!(
290                "ACCOUNT_KEY='{}'",
291                expose_optional_secret(&self.config.azblob_account_key)
292            ),
293        ];
294
295        if let Some(sas_token) = &self.config.azblob_sas_token {
296            connection_options.push(format!("SAS_TOKEN='{}'", sas_token));
297        }
298
299        let connection_str = format!(" CONNECTION ({})", connection_options.join(", "));
300        (azblob_path, connection_str)
301    }
302
303    fn format_output_path(&self, file_path: &str) -> String {
304        format_uri(
305            "azblob",
306            &self.config.azblob_container,
307            &self.config.azblob_root,
308            file_path,
309        )
310    }
311
312    fn mask_sensitive_info(&self, sql: &str) -> String {
313        mask_secrets(
314            sql.to_string(),
315            &[
316                expose_optional_secret(&self.config.azblob_account_name),
317                expose_optional_secret(&self.config.azblob_account_key),
318            ],
319        )
320    }
321}
322
323#[derive(Clone)]
324pub enum StorageType {
325    Fs(FsBackend),
326    S3(S3Backend),
327    Oss(OssBackend),
328    Gcs(GcsBackend),
329    Azblob(AzblobBackend),
330}
331
332impl StorageExport for StorageType {
333    fn get_storage_path(&self, catalog: &str, schema: &str) -> (String, String) {
334        match self {
335            StorageType::Fs(backend) => backend.get_storage_path(catalog, schema),
336            StorageType::S3(backend) => backend.get_storage_path(catalog, schema),
337            StorageType::Oss(backend) => backend.get_storage_path(catalog, schema),
338            StorageType::Gcs(backend) => backend.get_storage_path(catalog, schema),
339            StorageType::Azblob(backend) => backend.get_storage_path(catalog, schema),
340        }
341    }
342
343    fn format_output_path(&self, file_path: &str) -> String {
344        match self {
345            StorageType::Fs(backend) => backend.format_output_path(file_path),
346            StorageType::S3(backend) => backend.format_output_path(file_path),
347            StorageType::Oss(backend) => backend.format_output_path(file_path),
348            StorageType::Gcs(backend) => backend.format_output_path(file_path),
349            StorageType::Azblob(backend) => backend.format_output_path(file_path),
350        }
351    }
352
353    fn mask_sensitive_info(&self, sql: &str) -> String {
354        match self {
355            StorageType::Fs(backend) => backend.mask_sensitive_info(sql),
356            StorageType::S3(backend) => backend.mask_sensitive_info(sql),
357            StorageType::Oss(backend) => backend.mask_sensitive_info(sql),
358            StorageType::Gcs(backend) => backend.mask_sensitive_info(sql),
359            StorageType::Azblob(backend) => backend.mask_sensitive_info(sql),
360        }
361    }
362}
363
364impl StorageType {
365    /// Returns true if the storage backend is remote (not local filesystem).
366    pub fn is_remote_storage(&self) -> bool {
367        !matches!(self, StorageType::Fs(_))
368    }
369}