1use std::path::PathBuf;
16
17use common_base::secrets::{ExposeSecret, SecretString};
18use common_error::ext::BoxedError;
19
20use crate::common::{
21 PrefixedAzblobConnection, PrefixedGcsConnection, PrefixedOssConnection, PrefixedS3Connection,
22};
23
24fn expose_optional_secret(secret: &Option<SecretString>) -> &str {
27 secret
28 .as_ref()
29 .map(|s| s.expose_secret().as_str())
30 .unwrap_or("")
31}
32
33fn format_root_path(root: &str) -> String {
35 if root.is_empty() {
36 String::new()
37 } else {
38 format!("/{}", root)
39 }
40}
41
42fn mask_secrets(mut sql: String, secrets: &[&str]) -> String {
44 for secret in secrets {
45 if !secret.is_empty() {
46 sql = sql.replace(secret, "[REDACTED]");
47 }
48 }
49 sql
50}
51
52fn format_uri(scheme: &str, bucket: &str, root: &str, path: &str) -> String {
54 let root = format_root_path(root);
55 format!("{}://{}{}/{}", scheme, bucket, root, path)
56}
57
58pub trait StorageExport: Send + Sync {
60 fn get_storage_path(&self, catalog: &str, schema: &str) -> (String, String);
63
64 fn format_output_path(&self, file_path: &str) -> String;
66
67 fn mask_sensitive_info(&self, sql: &str) -> String;
69}
70
71macro_rules! define_backend {
72 ($name:ident, $config:ty) => {
73 #[derive(Clone)]
74 pub struct $name {
75 config: $config,
76 }
77
78 impl $name {
79 pub fn new(config: $config) -> Result<Self, BoxedError> {
80 config.validate()?;
81 Ok(Self { config })
82 }
83 }
84 };
85}
86
87#[derive(Clone)]
89pub struct FsBackend {
90 output_dir: String,
91}
92
93impl FsBackend {
94 pub fn new(output_dir: String) -> Self {
95 Self { output_dir }
96 }
97}
98
99impl StorageExport for FsBackend {
100 fn get_storage_path(&self, catalog: &str, schema: &str) -> (String, String) {
101 if self.output_dir.is_empty() {
102 unreachable!("output_dir must be set when not using remote storage")
103 }
104 let path = PathBuf::from(&self.output_dir)
105 .join(catalog)
106 .join(format!("{schema}/"))
107 .to_string_lossy()
108 .to_string();
109 (path, String::new())
110 }
111
112 fn format_output_path(&self, file_path: &str) -> String {
113 format!("{}/{}", self.output_dir, file_path)
114 }
115
116 fn mask_sensitive_info(&self, sql: &str) -> String {
117 sql.to_string()
118 }
119}
120
121define_backend!(S3Backend, PrefixedS3Connection);
122
123impl StorageExport for S3Backend {
124 fn get_storage_path(&self, catalog: &str, schema: &str) -> (String, String) {
125 let s3_path = format_uri(
126 "s3",
127 &self.config.s3_bucket,
128 &self.config.s3_root,
129 &format!("{}/{}/", catalog, schema),
130 );
131
132 let mut connection_options = vec![
133 format!(
134 "ACCESS_KEY_ID='{}'",
135 expose_optional_secret(&self.config.s3_access_key_id)
136 ),
137 format!(
138 "SECRET_ACCESS_KEY='{}'",
139 expose_optional_secret(&self.config.s3_secret_access_key)
140 ),
141 ];
142
143 if let Some(region) = &self.config.s3_region {
144 connection_options.push(format!("REGION='{}'", region));
145 }
146
147 if let Some(endpoint) = &self.config.s3_endpoint {
148 connection_options.push(format!("ENDPOINT='{}'", endpoint));
149 }
150
151 let connection_str = format!(" CONNECTION ({})", connection_options.join(", "));
152 (s3_path, connection_str)
153 }
154
155 fn format_output_path(&self, file_path: &str) -> String {
156 format_uri(
157 "s3",
158 &self.config.s3_bucket,
159 &self.config.s3_root,
160 file_path,
161 )
162 }
163
164 fn mask_sensitive_info(&self, sql: &str) -> String {
165 mask_secrets(
166 sql.to_string(),
167 &[
168 expose_optional_secret(&self.config.s3_access_key_id),
169 expose_optional_secret(&self.config.s3_secret_access_key),
170 ],
171 )
172 }
173}
174
175define_backend!(OssBackend, PrefixedOssConnection);
176
177impl StorageExport for OssBackend {
178 fn get_storage_path(&self, catalog: &str, schema: &str) -> (String, String) {
179 let oss_path = format_uri(
180 "oss",
181 &self.config.oss_bucket,
182 &self.config.oss_root,
183 &format!("{}/{}/", catalog, schema),
184 );
185
186 let connection_options = [
187 format!(
188 "ACCESS_KEY_ID='{}'",
189 expose_optional_secret(&self.config.oss_access_key_id)
190 ),
191 format!(
192 "ACCESS_KEY_SECRET='{}'",
193 expose_optional_secret(&self.config.oss_access_key_secret)
194 ),
195 ];
196
197 let connection_str = format!(" CONNECTION ({})", connection_options.join(", "));
198 (oss_path, connection_str)
199 }
200
201 fn format_output_path(&self, file_path: &str) -> String {
202 format_uri(
203 "oss",
204 &self.config.oss_bucket,
205 &self.config.oss_root,
206 file_path,
207 )
208 }
209
210 fn mask_sensitive_info(&self, sql: &str) -> String {
211 mask_secrets(
212 sql.to_string(),
213 &[
214 expose_optional_secret(&self.config.oss_access_key_id),
215 expose_optional_secret(&self.config.oss_access_key_secret),
216 ],
217 )
218 }
219}
220
221define_backend!(GcsBackend, PrefixedGcsConnection);
222
223impl StorageExport for GcsBackend {
224 fn get_storage_path(&self, catalog: &str, schema: &str) -> (String, String) {
225 let gcs_path = format_uri(
226 "gcs",
227 &self.config.gcs_bucket,
228 &self.config.gcs_root,
229 &format!("{}/{}/", catalog, schema),
230 );
231
232 let mut connection_options = Vec::new();
233 let credential_path = expose_optional_secret(&self.config.gcs_credential_path);
234 if !credential_path.is_empty() {
235 connection_options.push(format!("CREDENTIAL_PATH='{}'", credential_path));
236 }
237
238 let credential = expose_optional_secret(&self.config.gcs_credential);
239 if !credential.is_empty() {
240 connection_options.push(format!("CREDENTIAL='{}'", credential));
241 }
242
243 if !self.config.gcs_endpoint.is_empty() {
244 connection_options.push(format!("ENDPOINT='{}'", self.config.gcs_endpoint));
245 }
246
247 let connection_str = if connection_options.is_empty() {
248 String::new()
249 } else {
250 format!(" CONNECTION ({})", connection_options.join(", "))
251 };
252
253 (gcs_path, connection_str)
254 }
255
256 fn format_output_path(&self, file_path: &str) -> String {
257 format_uri(
258 "gcs",
259 &self.config.gcs_bucket,
260 &self.config.gcs_root,
261 file_path,
262 )
263 }
264
265 fn mask_sensitive_info(&self, sql: &str) -> String {
266 mask_secrets(
267 sql.to_string(),
268 &[expose_optional_secret(&self.config.gcs_credential)],
269 )
270 }
271}
272
273define_backend!(AzblobBackend, PrefixedAzblobConnection);
274
275impl StorageExport for AzblobBackend {
276 fn get_storage_path(&self, catalog: &str, schema: &str) -> (String, String) {
277 let azblob_path = format_uri(
278 "azblob",
279 &self.config.azblob_container,
280 &self.config.azblob_root,
281 &format!("{}/{}/", catalog, schema),
282 );
283
284 let mut connection_options = vec![
285 format!(
286 "ACCOUNT_NAME='{}'",
287 expose_optional_secret(&self.config.azblob_account_name)
288 ),
289 format!(
290 "ACCOUNT_KEY='{}'",
291 expose_optional_secret(&self.config.azblob_account_key)
292 ),
293 ];
294
295 if let Some(sas_token) = &self.config.azblob_sas_token {
296 connection_options.push(format!("SAS_TOKEN='{}'", sas_token));
297 }
298
299 let connection_str = format!(" CONNECTION ({})", connection_options.join(", "));
300 (azblob_path, connection_str)
301 }
302
303 fn format_output_path(&self, file_path: &str) -> String {
304 format_uri(
305 "azblob",
306 &self.config.azblob_container,
307 &self.config.azblob_root,
308 file_path,
309 )
310 }
311
312 fn mask_sensitive_info(&self, sql: &str) -> String {
313 mask_secrets(
314 sql.to_string(),
315 &[
316 expose_optional_secret(&self.config.azblob_account_name),
317 expose_optional_secret(&self.config.azblob_account_key),
318 ],
319 )
320 }
321}
322
323#[derive(Clone)]
324pub enum StorageType {
325 Fs(FsBackend),
326 S3(S3Backend),
327 Oss(OssBackend),
328 Gcs(GcsBackend),
329 Azblob(AzblobBackend),
330}
331
332impl StorageExport for StorageType {
333 fn get_storage_path(&self, catalog: &str, schema: &str) -> (String, String) {
334 match self {
335 StorageType::Fs(backend) => backend.get_storage_path(catalog, schema),
336 StorageType::S3(backend) => backend.get_storage_path(catalog, schema),
337 StorageType::Oss(backend) => backend.get_storage_path(catalog, schema),
338 StorageType::Gcs(backend) => backend.get_storage_path(catalog, schema),
339 StorageType::Azblob(backend) => backend.get_storage_path(catalog, schema),
340 }
341 }
342
343 fn format_output_path(&self, file_path: &str) -> String {
344 match self {
345 StorageType::Fs(backend) => backend.format_output_path(file_path),
346 StorageType::S3(backend) => backend.format_output_path(file_path),
347 StorageType::Oss(backend) => backend.format_output_path(file_path),
348 StorageType::Gcs(backend) => backend.format_output_path(file_path),
349 StorageType::Azblob(backend) => backend.format_output_path(file_path),
350 }
351 }
352
353 fn mask_sensitive_info(&self, sql: &str) -> String {
354 match self {
355 StorageType::Fs(backend) => backend.mask_sensitive_info(sql),
356 StorageType::S3(backend) => backend.mask_sensitive_info(sql),
357 StorageType::Oss(backend) => backend.mask_sensitive_info(sql),
358 StorageType::Gcs(backend) => backend.mask_sensitive_info(sql),
359 StorageType::Azblob(backend) => backend.mask_sensitive_info(sql),
360 }
361 }
362}
363
364impl StorageType {
365 pub fn is_remote_storage(&self) -> bool {
367 !matches!(self, StorageType::Fs(_))
368 }
369}