1use common_base::secrets::{ExposeSecret, SecretString};
16use common_telemetry::info;
17use object_store::util::{join_path, normalize_path};
18use snafu::ResultExt;
19use url::Url;
20
21use crate::common::ObjectStoreConfig;
22use crate::data::export_v2::error::{DatabaseSnafu, InvalidUriSnafu, Result, UrlParseSnafu};
23use crate::data::export_v2::manifest::{DataFormat, TimeRange};
24use crate::data::path::data_dir_for_schema_chunk;
25use crate::data::snapshot_storage::StorageScheme;
26use crate::data::sql::{escape_sql_identifier, escape_sql_literal};
27use crate::database::DatabaseClient;
28
29pub(super) struct CopyOptions {
30 pub(super) format: DataFormat,
31 pub(super) time_range: TimeRange,
32 pub(super) parallelism: usize,
33}
34
35pub(super) struct CopyTarget {
36 pub(super) location: String,
37 pub(super) connection: String,
38 secrets: Vec<Option<String>>,
39}
40
41impl CopyTarget {
42 fn mask_sql(&self, sql: &str) -> String {
43 mask_secrets(sql, &self.secrets)
44 }
45}
46
47pub(super) fn build_copy_target(
48 snapshot_uri: &str,
49 storage: &ObjectStoreConfig,
50 schema: &str,
51 chunk_id: u32,
52) -> Result<CopyTarget> {
53 let url = Url::parse(snapshot_uri).context(UrlParseSnafu)?;
54 let scheme = StorageScheme::from_uri(snapshot_uri)?;
55 let suffix = data_dir_for_schema_chunk(schema, chunk_id);
56
57 match scheme {
58 StorageScheme::File => {
59 let root = url.to_file_path().map_err(|_| {
60 InvalidUriSnafu {
61 uri: snapshot_uri,
62 reason: "file:// URI must use an absolute path like file:///tmp/backup",
63 }
64 .build()
65 })?;
66 let location = normalize_path(&format!("{}/{}", root.to_string_lossy(), suffix));
67 Ok(CopyTarget {
68 location,
69 connection: String::new(),
70 secrets: Vec::new(),
71 })
72 }
73 StorageScheme::S3 => {
74 let (bucket, root) = extract_bucket_root(&url, snapshot_uri)?;
75 let location = format!("s3://{}/{}", bucket, join_root(&root, &suffix));
76 let (connection, secrets) = build_s3_connection(storage);
77 Ok(CopyTarget {
78 location,
79 connection,
80 secrets,
81 })
82 }
83 StorageScheme::Oss => {
84 let (bucket, root) = extract_bucket_root(&url, snapshot_uri)?;
85 let location = format!("oss://{}/{}", bucket, join_root(&root, &suffix));
86 let (connection, secrets) = build_oss_connection(storage);
87 Ok(CopyTarget {
88 location,
89 connection,
90 secrets,
91 })
92 }
93 StorageScheme::Gcs => {
94 let (bucket, root) = extract_bucket_root(&url, snapshot_uri)?;
95 let location = format!("gcs://{}/{}", bucket, join_root(&root, &suffix));
96 let (connection, secrets) = build_gcs_connection(storage, snapshot_uri)?;
97 Ok(CopyTarget {
98 location,
99 connection,
100 secrets,
101 })
102 }
103 StorageScheme::Azblob => {
104 let (bucket, root) = extract_bucket_root(&url, snapshot_uri)?;
105 let location = format!("azblob://{}/{}", bucket, join_root(&root, &suffix));
106 let (connection, secrets) = build_azblob_connection(storage);
107 Ok(CopyTarget {
108 location,
109 connection,
110 secrets,
111 })
112 }
113 }
114}
115
116pub(super) async fn execute_copy_database(
117 database_client: &DatabaseClient,
118 catalog: &str,
119 schema: &str,
120 target: &CopyTarget,
121 options: &CopyOptions,
122) -> Result<()> {
123 let with_options = build_with_options(options);
124 let sql = format!(
125 r#"COPY DATABASE "{}"."{}" TO '{}' WITH ({}){};"#,
126 escape_sql_identifier(catalog),
127 escape_sql_identifier(schema),
128 escape_sql_literal(&target.location),
129 with_options,
130 target.connection
131 );
132 let safe_sql = target.mask_sql(&sql);
133 info!("Executing sql: {}", safe_sql);
134 database_client
135 .sql_in_public(&sql)
136 .await
137 .context(DatabaseSnafu)?;
138 Ok(())
139}
140
141fn build_with_options(options: &CopyOptions) -> String {
142 let mut parts = vec![format!("FORMAT='{}'", options.format)];
143 if let Some(start) = options.time_range.start {
144 parts.push(format!(
145 "START_TIME='{}'",
146 escape_sql_literal(&start.to_rfc3339())
147 ));
148 }
149 if let Some(end) = options.time_range.end {
150 parts.push(format!(
151 "END_TIME='{}'",
152 escape_sql_literal(&end.to_rfc3339())
153 ));
154 }
155 parts.push(format!("PARALLELISM={}", options.parallelism));
156 parts.join(", ")
157}
158
159fn extract_bucket_root(url: &Url, snapshot_uri: &str) -> Result<(String, String)> {
160 let bucket = url.host_str().unwrap_or("").to_string();
161 if bucket.is_empty() {
162 return InvalidUriSnafu {
163 uri: snapshot_uri,
164 reason: "URI must include bucket/container in host",
165 }
166 .fail();
167 }
168 let root = url
169 .path()
170 .trim_start_matches('/')
171 .trim_end_matches('/')
172 .to_string();
173 Ok((bucket, root))
174}
175
176fn join_root(root: &str, suffix: &str) -> String {
177 join_path(root, suffix).trim_start_matches('/').to_string()
178}
179
180fn build_s3_connection(storage: &ObjectStoreConfig) -> (String, Vec<Option<String>>) {
181 let access_key_id = expose_optional_secret(&storage.s3.s3_access_key_id);
182 let secret_access_key = expose_optional_secret(&storage.s3.s3_secret_access_key);
183
184 let mut options = Vec::new();
185 if let Some(access_key_id) = &access_key_id {
186 options.push(format!(
187 "ACCESS_KEY_ID='{}'",
188 escape_sql_literal(access_key_id)
189 ));
190 }
191 if let Some(secret_access_key) = &secret_access_key {
192 options.push(format!(
193 "SECRET_ACCESS_KEY='{}'",
194 escape_sql_literal(secret_access_key)
195 ));
196 }
197 if let Some(region) = &storage.s3.s3_region {
198 options.push(format!("REGION='{}'", escape_sql_literal(region)));
199 }
200 if let Some(endpoint) = &storage.s3.s3_endpoint {
201 options.push(format!("ENDPOINT='{}'", escape_sql_literal(endpoint)));
202 }
203
204 let secrets = vec![access_key_id, secret_access_key];
205 let connection = if options.is_empty() {
206 String::new()
207 } else {
208 format!(" CONNECTION ({})", options.join(", "))
209 };
210 (connection, secrets)
211}
212
213fn build_oss_connection(storage: &ObjectStoreConfig) -> (String, Vec<Option<String>>) {
214 let access_key_id = expose_optional_secret(&storage.oss.oss_access_key_id);
215 let access_key_secret = expose_optional_secret(&storage.oss.oss_access_key_secret);
216
217 let mut options = Vec::new();
218 if let Some(access_key_id) = &access_key_id {
219 options.push(format!(
220 "ACCESS_KEY_ID='{}'",
221 escape_sql_literal(access_key_id)
222 ));
223 }
224 if let Some(access_key_secret) = &access_key_secret {
225 options.push(format!(
226 "ACCESS_KEY_SECRET='{}'",
227 escape_sql_literal(access_key_secret)
228 ));
229 }
230 if !storage.oss.oss_endpoint.is_empty() {
231 options.push(format!(
232 "ENDPOINT='{}'",
233 escape_sql_literal(&storage.oss.oss_endpoint)
234 ));
235 }
236
237 let secrets = vec![access_key_id, access_key_secret];
238 let connection = if options.is_empty() {
239 String::new()
240 } else {
241 format!(" CONNECTION ({})", options.join(", "))
242 };
243 (connection, secrets)
244}
245
246fn build_gcs_connection(
247 storage: &ObjectStoreConfig,
248 snapshot_uri: &str,
249) -> Result<(String, Vec<Option<String>>)> {
250 let credential_path = expose_optional_secret(&storage.gcs.gcs_credential_path);
251 let credential = expose_optional_secret(&storage.gcs.gcs_credential);
252
253 if credential.is_none() && credential_path.is_some() {
254 return InvalidUriSnafu {
255 uri: snapshot_uri,
256 reason: "gcs_credential_path is not supported for server-side COPY; provide gcs_credential or rely on server-side ADC",
257 }
258 .fail();
259 }
260
261 let mut options = Vec::new();
262 if let Some(credential) = &credential {
263 options.push(format!("CREDENTIAL='{}'", escape_sql_literal(credential)));
264 }
265 if !storage.gcs.gcs_scope.is_empty() {
266 options.push(format!(
267 "SCOPE='{}'",
268 escape_sql_literal(&storage.gcs.gcs_scope)
269 ));
270 }
271 if !storage.gcs.gcs_endpoint.is_empty() {
272 options.push(format!(
273 "ENDPOINT='{}'",
274 escape_sql_literal(&storage.gcs.gcs_endpoint)
275 ));
276 }
277
278 let connection = if options.is_empty() {
279 String::new()
280 } else {
281 format!(" CONNECTION ({})", options.join(", "))
282 };
283 let secrets = vec![credential_path, credential];
284 Ok((connection, secrets))
285}
286
287fn build_azblob_connection(storage: &ObjectStoreConfig) -> (String, Vec<Option<String>>) {
288 let account_name = expose_optional_secret(&storage.azblob.azblob_account_name);
289 let account_key = expose_optional_secret(&storage.azblob.azblob_account_key);
290 let sas_token = storage.azblob.azblob_sas_token.clone();
291
292 let mut options = Vec::new();
293 if let Some(account_name) = &account_name {
294 options.push(format!(
295 "ACCOUNT_NAME='{}'",
296 escape_sql_literal(account_name)
297 ));
298 }
299 if let Some(account_key) = &account_key {
300 options.push(format!("ACCOUNT_KEY='{}'", escape_sql_literal(account_key)));
301 }
302 if let Some(sas_token) = &sas_token {
303 options.push(format!("SAS_TOKEN='{}'", escape_sql_literal(sas_token)));
304 }
305 if !storage.azblob.azblob_endpoint.is_empty() {
306 options.push(format!(
307 "ENDPOINT='{}'",
308 escape_sql_literal(&storage.azblob.azblob_endpoint)
309 ));
310 }
311
312 let secrets = vec![account_name, account_key, sas_token];
313 let connection = if options.is_empty() {
314 String::new()
315 } else {
316 format!(" CONNECTION ({})", options.join(", "))
317 };
318 (connection, secrets)
319}
320
321fn expose_optional_secret(secret: &Option<SecretString>) -> Option<String> {
322 secret.as_ref().map(|s| s.expose_secret().to_owned())
323}
324
325fn mask_secrets(sql: &str, secrets: &[Option<String>]) -> String {
326 let mut masked = sql.to_string();
327 for secret in secrets {
328 if let Some(secret) = secret
329 && !secret.is_empty()
330 {
331 masked = masked.replace(secret, "[REDACTED]");
332 }
333 }
334 masked
335}
336
337#[cfg(test)]
338mod tests {
339 use common_base::secrets::SecretString;
340 use common_test_util::temp_dir::create_temp_dir;
341
342 use super::*;
343 use crate::common::{PrefixedAzblobConnection, PrefixedGcsConnection, PrefixedOssConnection};
344
345 #[test]
346 fn test_build_oss_connection_includes_endpoint() {
347 let storage = ObjectStoreConfig {
348 oss: PrefixedOssConnection {
349 oss_endpoint: "https://oss.example.com".to_string(),
350 oss_access_key_id: Some(SecretString::from("key_id".to_string())),
351 oss_access_key_secret: Some(SecretString::from("key_secret".to_string())),
352 ..Default::default()
353 },
354 ..Default::default()
355 };
356
357 let (connection, _) = build_oss_connection(&storage);
358 assert!(connection.contains("ENDPOINT='https://oss.example.com'"));
359 }
360
361 #[test]
362 fn test_build_gcs_connection_uses_scope_and_inline_credential() {
363 let storage = ObjectStoreConfig {
364 gcs: PrefixedGcsConnection {
365 gcs_scope: "scope-a".to_string(),
366 gcs_endpoint: "https://storage.googleapis.com".to_string(),
367 gcs_credential: Some(SecretString::from("credential-json".to_string())),
368 ..Default::default()
369 },
370 ..Default::default()
371 };
372
373 let (connection, _) = build_gcs_connection(&storage, "gcs://bucket/root").unwrap();
374 assert!(connection.contains("CREDENTIAL='credential-json'"));
375 assert!(connection.contains("SCOPE='scope-a'"));
376 assert!(connection.contains("ENDPOINT='https://storage.googleapis.com'"));
377 assert!(!connection.contains("CREDENTIAL_PATH"));
378 }
379
380 #[test]
381 fn test_build_gcs_connection_rejects_credential_path_only() {
382 let storage = ObjectStoreConfig {
383 gcs: PrefixedGcsConnection {
384 gcs_scope: "scope-a".to_string(),
385 gcs_credential_path: Some(SecretString::from("/tmp/creds.json".to_string())),
386 ..Default::default()
387 },
388 ..Default::default()
389 };
390
391 let error = build_gcs_connection(&storage, "gcs://bucket/root")
392 .expect_err("credential_path-only should be rejected")
393 .to_string();
394 assert!(error.contains("gcs_credential_path is not supported"));
395 }
396
397 #[test]
398 fn test_build_azblob_connection_includes_endpoint() {
399 let storage = ObjectStoreConfig {
400 azblob: PrefixedAzblobConnection {
401 azblob_account_name: Some(SecretString::from("account".to_string())),
402 azblob_account_key: Some(SecretString::from("key".to_string())),
403 azblob_endpoint: "https://blob.example.com".to_string(),
404 ..Default::default()
405 },
406 ..Default::default()
407 };
408
409 let (connection, _) = build_azblob_connection(&storage);
410 assert!(connection.contains("ENDPOINT='https://blob.example.com'"));
411 }
412
413 #[test]
414 fn test_build_azblob_connection_redacts_sas_token() {
415 let storage = ObjectStoreConfig {
416 azblob: PrefixedAzblobConnection {
417 azblob_account_name: Some(SecretString::from("account".to_string())),
418 azblob_account_key: Some(SecretString::from("key".to_string())),
419 azblob_sas_token: Some("sig=secret-token".to_string()),
420 ..Default::default()
421 },
422 ..Default::default()
423 };
424
425 let (connection, secrets) = build_azblob_connection(&storage);
426 let masked = mask_secrets(&connection, &secrets);
427
428 assert!(connection.contains("SAS_TOKEN='sig=secret-token'"));
429 assert!(masked.contains("SAS_TOKEN='[REDACTED]'"));
430 assert!(!masked.contains("sig=secret-token"));
431 }
432
433 #[test]
434 fn test_build_copy_target_decodes_file_uri_path() {
435 let storage = ObjectStoreConfig::default();
436 let snapshot_root = create_temp_dir("my backup");
437 let snapshot_uri = Url::from_file_path(snapshot_root.path())
438 .expect("absolute platform path should convert to file:// URI")
439 .to_string();
440 let expected = normalize_path(&format!(
441 "{}/{}",
442 snapshot_root.path().to_string_lossy(),
443 data_dir_for_schema_chunk("public", 7)
444 ));
445 let target = build_copy_target(&snapshot_uri, &storage, "public", 7)
446 .expect("file:// copy target should be built");
447
448 assert!(snapshot_uri.contains("%20"));
449 assert!(!target.location.contains("%20"));
450 assert!(target.location.contains("my backup"));
451 assert_eq!(target.location, expected);
452 }
453}