1use common_base::secrets::{ExposeSecret, SecretString};
16use common_telemetry::info;
17use object_store::util::{join_path, normalize_path};
18use snafu::ResultExt;
19use url::Url;
20
21use crate::common::ObjectStoreConfig;
22use crate::data::export_v2::error::{DatabaseSnafu, InvalidUriSnafu, Result, UrlParseSnafu};
23use crate::data::export_v2::manifest::{DataFormat, TimeRange};
24use crate::data::path::data_dir_for_schema_chunk;
25use crate::data::snapshot_storage::StorageScheme;
26use crate::data::sql::{escape_sql_identifier, escape_sql_literal};
27use crate::database::DatabaseClient;
28
29pub(super) struct CopyOptions {
30 pub(super) format: DataFormat,
31 pub(super) time_range: TimeRange,
32 pub(super) parallelism: usize,
33}
34
35pub(super) struct CopyTarget {
36 pub(super) location: String,
37 pub(super) connection: String,
38 secrets: Vec<Option<String>>,
39}
40
41pub(crate) struct CopySource {
42 pub(crate) location: String,
43 pub(crate) connection: String,
44 secrets: Vec<Option<String>>,
45}
46
47impl CopyTarget {
48 fn mask_sql(&self, sql: &str) -> String {
49 mask_secrets(sql, &self.secrets)
50 }
51}
52
53impl CopySource {
54 fn mask_sql(&self, sql: &str) -> String {
55 mask_secrets(sql, &self.secrets)
56 }
57}
58
59pub(super) fn build_copy_target(
60 snapshot_uri: &str,
61 storage: &ObjectStoreConfig,
62 schema: &str,
63 chunk_id: u32,
64) -> Result<CopyTarget> {
65 let location = build_copy_location(snapshot_uri, storage, schema, chunk_id)?;
66 Ok(CopyTarget {
67 location: location.location,
68 connection: location.connection,
69 secrets: location.secrets,
70 })
71}
72
73pub(crate) fn build_copy_source(
74 snapshot_uri: &str,
75 storage: &ObjectStoreConfig,
76 schema: &str,
77 chunk_id: u32,
78) -> Result<CopySource> {
79 let location = build_copy_location(snapshot_uri, storage, schema, chunk_id)?;
80 Ok(CopySource {
81 location: location.location,
82 connection: location.connection,
83 secrets: location.secrets,
84 })
85}
86
87struct CopyLocation {
88 location: String,
89 connection: String,
90 secrets: Vec<Option<String>>,
91}
92
93fn build_copy_location(
94 snapshot_uri: &str,
95 storage: &ObjectStoreConfig,
96 schema: &str,
97 chunk_id: u32,
98) -> Result<CopyLocation> {
99 let url = Url::parse(snapshot_uri).context(UrlParseSnafu)?;
100 let scheme = StorageScheme::from_uri(snapshot_uri)?;
101 let suffix = data_dir_for_schema_chunk(schema, chunk_id);
102
103 match scheme {
104 StorageScheme::File => {
105 let root = url.to_file_path().map_err(|_| {
106 InvalidUriSnafu {
107 uri: snapshot_uri,
108 reason: "file:// URI must use an absolute path like file:///tmp/backup",
109 }
110 .build()
111 })?;
112 let location = normalize_path(&format!("{}/{}", root.to_string_lossy(), suffix));
113 Ok(CopyLocation {
114 location,
115 connection: String::new(),
116 secrets: Vec::new(),
117 })
118 }
119 StorageScheme::S3 => {
120 let (bucket, root) = extract_bucket_root(&url, snapshot_uri)?;
121 let location = format!("s3://{}/{}", bucket, join_root(&root, &suffix));
122 let (connection, secrets) = build_s3_connection(storage);
123 Ok(CopyLocation {
124 location,
125 connection,
126 secrets,
127 })
128 }
129 StorageScheme::Oss => {
130 let (bucket, root) = extract_bucket_root(&url, snapshot_uri)?;
131 let location = format!("oss://{}/{}", bucket, join_root(&root, &suffix));
132 let (connection, secrets) = build_oss_connection(storage);
133 Ok(CopyLocation {
134 location,
135 connection,
136 secrets,
137 })
138 }
139 StorageScheme::Gcs => {
140 let (bucket, root) = extract_bucket_root(&url, snapshot_uri)?;
141 let location = format!("gcs://{}/{}", bucket, join_root(&root, &suffix));
142 let (connection, secrets) = build_gcs_connection(storage, snapshot_uri)?;
143 Ok(CopyLocation {
144 location,
145 connection,
146 secrets,
147 })
148 }
149 StorageScheme::Azblob => {
150 let (bucket, root) = extract_bucket_root(&url, snapshot_uri)?;
151 let location = format!("azblob://{}/{}", bucket, join_root(&root, &suffix));
152 let (connection, secrets) = build_azblob_connection(storage);
153 Ok(CopyLocation {
154 location,
155 connection,
156 secrets,
157 })
158 }
159 }
160}
161
162pub(super) async fn execute_copy_database(
163 database_client: &DatabaseClient,
164 catalog: &str,
165 schema: &str,
166 target: &CopyTarget,
167 options: &CopyOptions,
168) -> Result<()> {
169 let with_options = build_with_options(options);
170 let sql = format!(
171 r#"COPY DATABASE "{}"."{}" TO '{}' WITH ({}){};"#,
172 escape_sql_identifier(catalog),
173 escape_sql_identifier(schema),
174 escape_sql_literal(&target.location),
175 with_options,
176 target.connection
177 );
178 let safe_sql = target.mask_sql(&sql);
179 info!("Executing sql: {}", safe_sql);
180 database_client
181 .sql_in_public(&sql)
182 .await
183 .context(DatabaseSnafu)?;
184 Ok(())
185}
186
187pub(crate) async fn execute_copy_database_from(
188 database_client: &DatabaseClient,
189 catalog: &str,
190 schema: &str,
191 source: &CopySource,
192 format: DataFormat,
193) -> Result<()> {
194 let sql = format!(
195 r#"COPY DATABASE "{}"."{}" FROM '{}' WITH (FORMAT='{}'){};"#,
196 escape_sql_identifier(catalog),
197 escape_sql_identifier(schema),
198 escape_sql_literal(&source.location),
199 format,
200 source.connection
201 );
202 let safe_sql = source.mask_sql(&sql);
203 info!("Executing sql: {}", safe_sql);
204 database_client
205 .sql_in_public(&sql)
206 .await
207 .context(DatabaseSnafu)?;
208 Ok(())
209}
210
211fn build_with_options(options: &CopyOptions) -> String {
212 let mut parts = vec![format!("FORMAT='{}'", options.format)];
213 if let Some(start) = options.time_range.start {
214 parts.push(format!(
215 "START_TIME='{}'",
216 escape_sql_literal(&start.to_rfc3339())
217 ));
218 }
219 if let Some(end) = options.time_range.end {
220 parts.push(format!(
221 "END_TIME='{}'",
222 escape_sql_literal(&end.to_rfc3339())
223 ));
224 }
225 parts.push(format!("PARALLELISM={}", options.parallelism));
226 parts.join(", ")
227}
228
229fn extract_bucket_root(url: &Url, snapshot_uri: &str) -> Result<(String, String)> {
230 let bucket = url.host_str().unwrap_or("").to_string();
231 if bucket.is_empty() {
232 return InvalidUriSnafu {
233 uri: snapshot_uri,
234 reason: "URI must include bucket/container in host",
235 }
236 .fail();
237 }
238 let root = url
239 .path()
240 .trim_start_matches('/')
241 .trim_end_matches('/')
242 .to_string();
243 Ok((bucket, root))
244}
245
246fn join_root(root: &str, suffix: &str) -> String {
247 join_path(root, suffix).trim_start_matches('/').to_string()
248}
249
250fn build_s3_connection(storage: &ObjectStoreConfig) -> (String, Vec<Option<String>>) {
251 let access_key_id = expose_optional_secret(&storage.s3.s3_access_key_id);
252 let secret_access_key = expose_optional_secret(&storage.s3.s3_secret_access_key);
253
254 let mut options = Vec::new();
255 if let Some(access_key_id) = &access_key_id {
256 options.push(format!(
257 "ACCESS_KEY_ID='{}'",
258 escape_sql_literal(access_key_id)
259 ));
260 }
261 if let Some(secret_access_key) = &secret_access_key {
262 options.push(format!(
263 "SECRET_ACCESS_KEY='{}'",
264 escape_sql_literal(secret_access_key)
265 ));
266 }
267 if let Some(region) = &storage.s3.s3_region {
268 options.push(format!("REGION='{}'", escape_sql_literal(region)));
269 }
270 if let Some(endpoint) = &storage.s3.s3_endpoint {
271 options.push(format!("ENDPOINT='{}'", escape_sql_literal(endpoint)));
272 }
273
274 let secrets = vec![access_key_id, secret_access_key];
275 let connection = if options.is_empty() {
276 String::new()
277 } else {
278 format!(" CONNECTION ({})", options.join(", "))
279 };
280 (connection, secrets)
281}
282
283fn build_oss_connection(storage: &ObjectStoreConfig) -> (String, Vec<Option<String>>) {
284 let access_key_id = expose_optional_secret(&storage.oss.oss_access_key_id);
285 let access_key_secret = expose_optional_secret(&storage.oss.oss_access_key_secret);
286
287 let mut options = Vec::new();
288 if let Some(access_key_id) = &access_key_id {
289 options.push(format!(
290 "ACCESS_KEY_ID='{}'",
291 escape_sql_literal(access_key_id)
292 ));
293 }
294 if let Some(access_key_secret) = &access_key_secret {
295 options.push(format!(
296 "ACCESS_KEY_SECRET='{}'",
297 escape_sql_literal(access_key_secret)
298 ));
299 }
300 if !storage.oss.oss_endpoint.is_empty() {
301 options.push(format!(
302 "ENDPOINT='{}'",
303 escape_sql_literal(&storage.oss.oss_endpoint)
304 ));
305 }
306
307 let secrets = vec![access_key_id, access_key_secret];
308 let connection = if options.is_empty() {
309 String::new()
310 } else {
311 format!(" CONNECTION ({})", options.join(", "))
312 };
313 (connection, secrets)
314}
315
316fn build_gcs_connection(
317 storage: &ObjectStoreConfig,
318 snapshot_uri: &str,
319) -> Result<(String, Vec<Option<String>>)> {
320 let credential_path = expose_optional_secret(&storage.gcs.gcs_credential_path);
321 let credential = expose_optional_secret(&storage.gcs.gcs_credential);
322
323 if credential.is_none() && credential_path.is_some() {
324 return InvalidUriSnafu {
325 uri: snapshot_uri,
326 reason: "gcs_credential_path is not supported for server-side COPY; provide gcs_credential or rely on server-side ADC",
327 }
328 .fail();
329 }
330
331 let mut options = Vec::new();
332 if let Some(credential) = &credential {
333 options.push(format!("CREDENTIAL='{}'", escape_sql_literal(credential)));
334 }
335 if !storage.gcs.gcs_scope.is_empty() {
336 options.push(format!(
337 "SCOPE='{}'",
338 escape_sql_literal(&storage.gcs.gcs_scope)
339 ));
340 }
341 if !storage.gcs.gcs_endpoint.is_empty() {
342 options.push(format!(
343 "ENDPOINT='{}'",
344 escape_sql_literal(&storage.gcs.gcs_endpoint)
345 ));
346 }
347
348 let connection = if options.is_empty() {
349 String::new()
350 } else {
351 format!(" CONNECTION ({})", options.join(", "))
352 };
353 let secrets = vec![credential_path, credential];
354 Ok((connection, secrets))
355}
356
357fn build_azblob_connection(storage: &ObjectStoreConfig) -> (String, Vec<Option<String>>) {
358 let account_name = expose_optional_secret(&storage.azblob.azblob_account_name);
359 let account_key = expose_optional_secret(&storage.azblob.azblob_account_key);
360 let sas_token = storage.azblob.azblob_sas_token.clone();
361
362 let mut options = Vec::new();
363 if let Some(account_name) = &account_name {
364 options.push(format!(
365 "ACCOUNT_NAME='{}'",
366 escape_sql_literal(account_name)
367 ));
368 }
369 if let Some(account_key) = &account_key {
370 options.push(format!("ACCOUNT_KEY='{}'", escape_sql_literal(account_key)));
371 }
372 if let Some(sas_token) = &sas_token {
373 options.push(format!("SAS_TOKEN='{}'", escape_sql_literal(sas_token)));
374 }
375 if !storage.azblob.azblob_endpoint.is_empty() {
376 options.push(format!(
377 "ENDPOINT='{}'",
378 escape_sql_literal(&storage.azblob.azblob_endpoint)
379 ));
380 }
381
382 let secrets = vec![account_name, account_key, sas_token];
383 let connection = if options.is_empty() {
384 String::new()
385 } else {
386 format!(" CONNECTION ({})", options.join(", "))
387 };
388 (connection, secrets)
389}
390
391fn expose_optional_secret(secret: &Option<SecretString>) -> Option<String> {
392 secret.as_ref().map(|s| s.expose_secret().to_owned())
393}
394
395fn mask_secrets(sql: &str, secrets: &[Option<String>]) -> String {
396 let mut masked = sql.to_string();
397 for secret in secrets {
398 if let Some(secret) = secret
399 && !secret.is_empty()
400 {
401 let escaped = escape_sql_literal(secret);
402 if escaped != *secret {
403 masked = masked.replace(&escaped, "[REDACTED]");
404 }
405 masked = masked.replace(secret, "[REDACTED]");
406 }
407 }
408 masked
409}
410
411#[cfg(test)]
412mod tests {
413 use common_base::secrets::SecretString;
414 use common_test_util::temp_dir::create_temp_dir;
415
416 use super::*;
417 use crate::common::{PrefixedAzblobConnection, PrefixedGcsConnection, PrefixedOssConnection};
418
419 #[test]
420 fn test_build_oss_connection_includes_endpoint() {
421 let storage = ObjectStoreConfig {
422 oss: PrefixedOssConnection {
423 oss_endpoint: "https://oss.example.com".to_string(),
424 oss_access_key_id: Some(SecretString::from("key_id".to_string())),
425 oss_access_key_secret: Some(SecretString::from("key_secret".to_string())),
426 ..Default::default()
427 },
428 ..Default::default()
429 };
430
431 let (connection, _) = build_oss_connection(&storage);
432 assert!(connection.contains("ENDPOINT='https://oss.example.com'"));
433 }
434
435 #[test]
436 fn test_build_gcs_connection_uses_scope_and_inline_credential() {
437 let storage = ObjectStoreConfig {
438 gcs: PrefixedGcsConnection {
439 gcs_scope: "scope-a".to_string(),
440 gcs_endpoint: "https://storage.googleapis.com".to_string(),
441 gcs_credential: Some(SecretString::from("credential-json".to_string())),
442 ..Default::default()
443 },
444 ..Default::default()
445 };
446
447 let (connection, _) = build_gcs_connection(&storage, "gcs://bucket/root").unwrap();
448 assert!(connection.contains("CREDENTIAL='credential-json'"));
449 assert!(connection.contains("SCOPE='scope-a'"));
450 assert!(connection.contains("ENDPOINT='https://storage.googleapis.com'"));
451 assert!(!connection.contains("CREDENTIAL_PATH"));
452 }
453
454 #[test]
455 fn test_build_gcs_connection_rejects_credential_path_only() {
456 let storage = ObjectStoreConfig {
457 gcs: PrefixedGcsConnection {
458 gcs_scope: "scope-a".to_string(),
459 gcs_credential_path: Some(SecretString::from("/tmp/creds.json".to_string())),
460 ..Default::default()
461 },
462 ..Default::default()
463 };
464
465 let error = build_gcs_connection(&storage, "gcs://bucket/root")
466 .expect_err("credential_path-only should be rejected")
467 .to_string();
468 assert!(error.contains("gcs_credential_path is not supported"));
469 }
470
471 #[test]
472 fn test_build_azblob_connection_includes_endpoint() {
473 let storage = ObjectStoreConfig {
474 azblob: PrefixedAzblobConnection {
475 azblob_account_name: Some(SecretString::from("account".to_string())),
476 azblob_account_key: Some(SecretString::from("key".to_string())),
477 azblob_endpoint: "https://blob.example.com".to_string(),
478 ..Default::default()
479 },
480 ..Default::default()
481 };
482
483 let (connection, _) = build_azblob_connection(&storage);
484 assert!(connection.contains("ENDPOINT='https://blob.example.com'"));
485 }
486
487 #[test]
488 fn test_build_azblob_connection_redacts_sas_token() {
489 let storage = ObjectStoreConfig {
490 azblob: PrefixedAzblobConnection {
491 azblob_account_name: Some(SecretString::from("account".to_string())),
492 azblob_account_key: Some(SecretString::from("key".to_string())),
493 azblob_sas_token: Some("sig=secret-token".to_string()),
494 ..Default::default()
495 },
496 ..Default::default()
497 };
498
499 let (connection, secrets) = build_azblob_connection(&storage);
500 let masked = mask_secrets(&connection, &secrets);
501
502 assert!(connection.contains("SAS_TOKEN='sig=secret-token'"));
503 assert!(masked.contains("SAS_TOKEN='[REDACTED]'"));
504 assert!(!masked.contains("sig=secret-token"));
505 }
506
507 #[test]
508 fn test_mask_secrets_redacts_sql_escaped_literals() {
509 let sql =
510 "COPY DATABASE \"greptime\".\"public\" TO 's3://bucket' CONNECTION (SECRET='ab''cd');";
511 let masked = mask_secrets(sql, &[Some("ab'cd".to_string())]);
512
513 assert!(!masked.contains("ab'cd"));
514 assert!(!masked.contains("ab''cd"));
515 assert!(masked.contains("SECRET='[REDACTED]'"));
516 }
517
518 #[test]
519 fn test_build_copy_target_decodes_file_uri_path() {
520 let storage = ObjectStoreConfig::default();
521 let snapshot_root = create_temp_dir("my backup");
522 let snapshot_uri = Url::from_file_path(snapshot_root.path())
523 .expect("absolute platform path should convert to file:// URI")
524 .to_string();
525 let expected = normalize_path(&format!(
526 "{}/{}",
527 snapshot_root.path().to_string_lossy(),
528 data_dir_for_schema_chunk("public", 7)
529 ));
530 let target = build_copy_target(&snapshot_uri, &storage, "public", 7)
531 .expect("file:// copy target should be built");
532
533 assert!(snapshot_uri.contains("%20"));
534 assert!(!target.location.contains("%20"));
535 assert!(target.location.contains("my backup"));
536 assert_eq!(target.location, expected);
537 }
538}