Skip to main content

cli/data/export_v2/
data.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_base::secrets::{ExposeSecret, SecretString};
16use common_telemetry::info;
17use object_store::util::{join_path, normalize_path};
18use snafu::ResultExt;
19use url::Url;
20
21use crate::common::ObjectStoreConfig;
22use crate::data::export_v2::error::{DatabaseSnafu, InvalidUriSnafu, Result, UrlParseSnafu};
23use crate::data::export_v2::manifest::{DataFormat, TimeRange};
24use crate::data::path::data_dir_for_schema_chunk;
25use crate::data::snapshot_storage::StorageScheme;
26use crate::data::sql::{escape_sql_identifier, escape_sql_literal};
27use crate::database::DatabaseClient;
28
29pub(super) struct CopyOptions {
30    pub(super) format: DataFormat,
31    pub(super) time_range: TimeRange,
32    pub(super) parallelism: usize,
33}
34
35pub(super) struct CopyTarget {
36    pub(super) location: String,
37    pub(super) connection: String,
38    secrets: Vec<Option<String>>,
39}
40
41pub(crate) struct CopySource {
42    pub(crate) location: String,
43    pub(crate) connection: String,
44    secrets: Vec<Option<String>>,
45}
46
47impl CopyTarget {
48    fn mask_sql(&self, sql: &str) -> String {
49        mask_secrets(sql, &self.secrets)
50    }
51}
52
53impl CopySource {
54    fn mask_sql(&self, sql: &str) -> String {
55        mask_secrets(sql, &self.secrets)
56    }
57}
58
59pub(super) fn build_copy_target(
60    snapshot_uri: &str,
61    storage: &ObjectStoreConfig,
62    schema: &str,
63    chunk_id: u32,
64) -> Result<CopyTarget> {
65    let location = build_copy_location(snapshot_uri, storage, schema, chunk_id)?;
66    Ok(CopyTarget {
67        location: location.location,
68        connection: location.connection,
69        secrets: location.secrets,
70    })
71}
72
73pub(crate) fn build_copy_source(
74    snapshot_uri: &str,
75    storage: &ObjectStoreConfig,
76    schema: &str,
77    chunk_id: u32,
78) -> Result<CopySource> {
79    let location = build_copy_location(snapshot_uri, storage, schema, chunk_id)?;
80    Ok(CopySource {
81        location: location.location,
82        connection: location.connection,
83        secrets: location.secrets,
84    })
85}
86
87struct CopyLocation {
88    location: String,
89    connection: String,
90    secrets: Vec<Option<String>>,
91}
92
93fn build_copy_location(
94    snapshot_uri: &str,
95    storage: &ObjectStoreConfig,
96    schema: &str,
97    chunk_id: u32,
98) -> Result<CopyLocation> {
99    let url = Url::parse(snapshot_uri).context(UrlParseSnafu)?;
100    let scheme = StorageScheme::from_uri(snapshot_uri)?;
101    let suffix = data_dir_for_schema_chunk(schema, chunk_id);
102
103    match scheme {
104        StorageScheme::File => {
105            let root = url.to_file_path().map_err(|_| {
106                InvalidUriSnafu {
107                    uri: snapshot_uri,
108                    reason: "file:// URI must use an absolute path like file:///tmp/backup",
109                }
110                .build()
111            })?;
112            let location = normalize_path(&format!("{}/{}", root.to_string_lossy(), suffix));
113            Ok(CopyLocation {
114                location,
115                connection: String::new(),
116                secrets: Vec::new(),
117            })
118        }
119        StorageScheme::S3 => {
120            let (bucket, root) = extract_bucket_root(&url, snapshot_uri)?;
121            let location = format!("s3://{}/{}", bucket, join_root(&root, &suffix));
122            let (connection, secrets) = build_s3_connection(storage);
123            Ok(CopyLocation {
124                location,
125                connection,
126                secrets,
127            })
128        }
129        StorageScheme::Oss => {
130            let (bucket, root) = extract_bucket_root(&url, snapshot_uri)?;
131            let location = format!("oss://{}/{}", bucket, join_root(&root, &suffix));
132            let (connection, secrets) = build_oss_connection(storage);
133            Ok(CopyLocation {
134                location,
135                connection,
136                secrets,
137            })
138        }
139        StorageScheme::Gcs => {
140            let (bucket, root) = extract_bucket_root(&url, snapshot_uri)?;
141            let location = format!("gcs://{}/{}", bucket, join_root(&root, &suffix));
142            let (connection, secrets) = build_gcs_connection(storage, snapshot_uri)?;
143            Ok(CopyLocation {
144                location,
145                connection,
146                secrets,
147            })
148        }
149        StorageScheme::Azblob => {
150            let (bucket, root) = extract_bucket_root(&url, snapshot_uri)?;
151            let location = format!("azblob://{}/{}", bucket, join_root(&root, &suffix));
152            let (connection, secrets) = build_azblob_connection(storage);
153            Ok(CopyLocation {
154                location,
155                connection,
156                secrets,
157            })
158        }
159    }
160}
161
162pub(super) async fn execute_copy_database(
163    database_client: &DatabaseClient,
164    catalog: &str,
165    schema: &str,
166    target: &CopyTarget,
167    options: &CopyOptions,
168) -> Result<()> {
169    let with_options = build_with_options(options);
170    let sql = format!(
171        r#"COPY DATABASE "{}"."{}" TO '{}' WITH ({}){};"#,
172        escape_sql_identifier(catalog),
173        escape_sql_identifier(schema),
174        escape_sql_literal(&target.location),
175        with_options,
176        target.connection
177    );
178    let safe_sql = target.mask_sql(&sql);
179    info!("Executing sql: {}", safe_sql);
180    database_client
181        .sql_in_public(&sql)
182        .await
183        .context(DatabaseSnafu)?;
184    Ok(())
185}
186
187pub(crate) async fn execute_copy_database_from(
188    database_client: &DatabaseClient,
189    catalog: &str,
190    schema: &str,
191    source: &CopySource,
192    format: DataFormat,
193) -> Result<()> {
194    let sql = format!(
195        r#"COPY DATABASE "{}"."{}" FROM '{}' WITH (FORMAT='{}'){};"#,
196        escape_sql_identifier(catalog),
197        escape_sql_identifier(schema),
198        escape_sql_literal(&source.location),
199        format,
200        source.connection
201    );
202    let safe_sql = source.mask_sql(&sql);
203    info!("Executing sql: {}", safe_sql);
204    database_client
205        .sql_in_public(&sql)
206        .await
207        .context(DatabaseSnafu)?;
208    Ok(())
209}
210
211fn build_with_options(options: &CopyOptions) -> String {
212    let mut parts = vec![format!("FORMAT='{}'", options.format)];
213    if let Some(start) = options.time_range.start {
214        parts.push(format!(
215            "START_TIME='{}'",
216            escape_sql_literal(&start.to_rfc3339())
217        ));
218    }
219    if let Some(end) = options.time_range.end {
220        parts.push(format!(
221            "END_TIME='{}'",
222            escape_sql_literal(&end.to_rfc3339())
223        ));
224    }
225    parts.push(format!("PARALLELISM={}", options.parallelism));
226    parts.join(", ")
227}
228
229fn extract_bucket_root(url: &Url, snapshot_uri: &str) -> Result<(String, String)> {
230    let bucket = url.host_str().unwrap_or("").to_string();
231    if bucket.is_empty() {
232        return InvalidUriSnafu {
233            uri: snapshot_uri,
234            reason: "URI must include bucket/container in host",
235        }
236        .fail();
237    }
238    let root = url
239        .path()
240        .trim_start_matches('/')
241        .trim_end_matches('/')
242        .to_string();
243    Ok((bucket, root))
244}
245
246fn join_root(root: &str, suffix: &str) -> String {
247    join_path(root, suffix).trim_start_matches('/').to_string()
248}
249
250fn build_s3_connection(storage: &ObjectStoreConfig) -> (String, Vec<Option<String>>) {
251    let access_key_id = expose_optional_secret(&storage.s3.s3_access_key_id);
252    let secret_access_key = expose_optional_secret(&storage.s3.s3_secret_access_key);
253
254    let mut options = Vec::new();
255    if let Some(access_key_id) = &access_key_id {
256        options.push(format!(
257            "ACCESS_KEY_ID='{}'",
258            escape_sql_literal(access_key_id)
259        ));
260    }
261    if let Some(secret_access_key) = &secret_access_key {
262        options.push(format!(
263            "SECRET_ACCESS_KEY='{}'",
264            escape_sql_literal(secret_access_key)
265        ));
266    }
267    if let Some(region) = &storage.s3.s3_region {
268        options.push(format!("REGION='{}'", escape_sql_literal(region)));
269    }
270    if let Some(endpoint) = &storage.s3.s3_endpoint {
271        options.push(format!("ENDPOINT='{}'", escape_sql_literal(endpoint)));
272    }
273
274    let secrets = vec![access_key_id, secret_access_key];
275    let connection = if options.is_empty() {
276        String::new()
277    } else {
278        format!(" CONNECTION ({})", options.join(", "))
279    };
280    (connection, secrets)
281}
282
283fn build_oss_connection(storage: &ObjectStoreConfig) -> (String, Vec<Option<String>>) {
284    let access_key_id = expose_optional_secret(&storage.oss.oss_access_key_id);
285    let access_key_secret = expose_optional_secret(&storage.oss.oss_access_key_secret);
286
287    let mut options = Vec::new();
288    if let Some(access_key_id) = &access_key_id {
289        options.push(format!(
290            "ACCESS_KEY_ID='{}'",
291            escape_sql_literal(access_key_id)
292        ));
293    }
294    if let Some(access_key_secret) = &access_key_secret {
295        options.push(format!(
296            "ACCESS_KEY_SECRET='{}'",
297            escape_sql_literal(access_key_secret)
298        ));
299    }
300    if !storage.oss.oss_endpoint.is_empty() {
301        options.push(format!(
302            "ENDPOINT='{}'",
303            escape_sql_literal(&storage.oss.oss_endpoint)
304        ));
305    }
306
307    let secrets = vec![access_key_id, access_key_secret];
308    let connection = if options.is_empty() {
309        String::new()
310    } else {
311        format!(" CONNECTION ({})", options.join(", "))
312    };
313    (connection, secrets)
314}
315
316fn build_gcs_connection(
317    storage: &ObjectStoreConfig,
318    snapshot_uri: &str,
319) -> Result<(String, Vec<Option<String>>)> {
320    let credential_path = expose_optional_secret(&storage.gcs.gcs_credential_path);
321    let credential = expose_optional_secret(&storage.gcs.gcs_credential);
322
323    if credential.is_none() && credential_path.is_some() {
324        return InvalidUriSnafu {
325            uri: snapshot_uri,
326            reason: "gcs_credential_path is not supported for server-side COPY; provide gcs_credential or rely on server-side ADC",
327        }
328        .fail();
329    }
330
331    let mut options = Vec::new();
332    if let Some(credential) = &credential {
333        options.push(format!("CREDENTIAL='{}'", escape_sql_literal(credential)));
334    }
335    if !storage.gcs.gcs_scope.is_empty() {
336        options.push(format!(
337            "SCOPE='{}'",
338            escape_sql_literal(&storage.gcs.gcs_scope)
339        ));
340    }
341    if !storage.gcs.gcs_endpoint.is_empty() {
342        options.push(format!(
343            "ENDPOINT='{}'",
344            escape_sql_literal(&storage.gcs.gcs_endpoint)
345        ));
346    }
347
348    let connection = if options.is_empty() {
349        String::new()
350    } else {
351        format!(" CONNECTION ({})", options.join(", "))
352    };
353    let secrets = vec![credential_path, credential];
354    Ok((connection, secrets))
355}
356
357fn build_azblob_connection(storage: &ObjectStoreConfig) -> (String, Vec<Option<String>>) {
358    let account_name = expose_optional_secret(&storage.azblob.azblob_account_name);
359    let account_key = expose_optional_secret(&storage.azblob.azblob_account_key);
360    let sas_token = storage.azblob.azblob_sas_token.clone();
361
362    let mut options = Vec::new();
363    if let Some(account_name) = &account_name {
364        options.push(format!(
365            "ACCOUNT_NAME='{}'",
366            escape_sql_literal(account_name)
367        ));
368    }
369    if let Some(account_key) = &account_key {
370        options.push(format!("ACCOUNT_KEY='{}'", escape_sql_literal(account_key)));
371    }
372    if let Some(sas_token) = &sas_token {
373        options.push(format!("SAS_TOKEN='{}'", escape_sql_literal(sas_token)));
374    }
375    if !storage.azblob.azblob_endpoint.is_empty() {
376        options.push(format!(
377            "ENDPOINT='{}'",
378            escape_sql_literal(&storage.azblob.azblob_endpoint)
379        ));
380    }
381
382    let secrets = vec![account_name, account_key, sas_token];
383    let connection = if options.is_empty() {
384        String::new()
385    } else {
386        format!(" CONNECTION ({})", options.join(", "))
387    };
388    (connection, secrets)
389}
390
391fn expose_optional_secret(secret: &Option<SecretString>) -> Option<String> {
392    secret.as_ref().map(|s| s.expose_secret().to_owned())
393}
394
395fn mask_secrets(sql: &str, secrets: &[Option<String>]) -> String {
396    let mut masked = sql.to_string();
397    for secret in secrets {
398        if let Some(secret) = secret
399            && !secret.is_empty()
400        {
401            let escaped = escape_sql_literal(secret);
402            if escaped != *secret {
403                masked = masked.replace(&escaped, "[REDACTED]");
404            }
405            masked = masked.replace(secret, "[REDACTED]");
406        }
407    }
408    masked
409}
410
411#[cfg(test)]
412mod tests {
413    use common_base::secrets::SecretString;
414    use common_test_util::temp_dir::create_temp_dir;
415
416    use super::*;
417    use crate::common::{PrefixedAzblobConnection, PrefixedGcsConnection, PrefixedOssConnection};
418
419    #[test]
420    fn test_build_oss_connection_includes_endpoint() {
421        let storage = ObjectStoreConfig {
422            oss: PrefixedOssConnection {
423                oss_endpoint: "https://oss.example.com".to_string(),
424                oss_access_key_id: Some(SecretString::from("key_id".to_string())),
425                oss_access_key_secret: Some(SecretString::from("key_secret".to_string())),
426                ..Default::default()
427            },
428            ..Default::default()
429        };
430
431        let (connection, _) = build_oss_connection(&storage);
432        assert!(connection.contains("ENDPOINT='https://oss.example.com'"));
433    }
434
435    #[test]
436    fn test_build_gcs_connection_uses_scope_and_inline_credential() {
437        let storage = ObjectStoreConfig {
438            gcs: PrefixedGcsConnection {
439                gcs_scope: "scope-a".to_string(),
440                gcs_endpoint: "https://storage.googleapis.com".to_string(),
441                gcs_credential: Some(SecretString::from("credential-json".to_string())),
442                ..Default::default()
443            },
444            ..Default::default()
445        };
446
447        let (connection, _) = build_gcs_connection(&storage, "gcs://bucket/root").unwrap();
448        assert!(connection.contains("CREDENTIAL='credential-json'"));
449        assert!(connection.contains("SCOPE='scope-a'"));
450        assert!(connection.contains("ENDPOINT='https://storage.googleapis.com'"));
451        assert!(!connection.contains("CREDENTIAL_PATH"));
452    }
453
454    #[test]
455    fn test_build_gcs_connection_rejects_credential_path_only() {
456        let storage = ObjectStoreConfig {
457            gcs: PrefixedGcsConnection {
458                gcs_scope: "scope-a".to_string(),
459                gcs_credential_path: Some(SecretString::from("/tmp/creds.json".to_string())),
460                ..Default::default()
461            },
462            ..Default::default()
463        };
464
465        let error = build_gcs_connection(&storage, "gcs://bucket/root")
466            .expect_err("credential_path-only should be rejected")
467            .to_string();
468        assert!(error.contains("gcs_credential_path is not supported"));
469    }
470
471    #[test]
472    fn test_build_azblob_connection_includes_endpoint() {
473        let storage = ObjectStoreConfig {
474            azblob: PrefixedAzblobConnection {
475                azblob_account_name: Some(SecretString::from("account".to_string())),
476                azblob_account_key: Some(SecretString::from("key".to_string())),
477                azblob_endpoint: "https://blob.example.com".to_string(),
478                ..Default::default()
479            },
480            ..Default::default()
481        };
482
483        let (connection, _) = build_azblob_connection(&storage);
484        assert!(connection.contains("ENDPOINT='https://blob.example.com'"));
485    }
486
487    #[test]
488    fn test_build_azblob_connection_redacts_sas_token() {
489        let storage = ObjectStoreConfig {
490            azblob: PrefixedAzblobConnection {
491                azblob_account_name: Some(SecretString::from("account".to_string())),
492                azblob_account_key: Some(SecretString::from("key".to_string())),
493                azblob_sas_token: Some("sig=secret-token".to_string()),
494                ..Default::default()
495            },
496            ..Default::default()
497        };
498
499        let (connection, secrets) = build_azblob_connection(&storage);
500        let masked = mask_secrets(&connection, &secrets);
501
502        assert!(connection.contains("SAS_TOKEN='sig=secret-token'"));
503        assert!(masked.contains("SAS_TOKEN='[REDACTED]'"));
504        assert!(!masked.contains("sig=secret-token"));
505    }
506
507    #[test]
508    fn test_mask_secrets_redacts_sql_escaped_literals() {
509        let sql =
510            "COPY DATABASE \"greptime\".\"public\" TO 's3://bucket' CONNECTION (SECRET='ab''cd');";
511        let masked = mask_secrets(sql, &[Some("ab'cd".to_string())]);
512
513        assert!(!masked.contains("ab'cd"));
514        assert!(!masked.contains("ab''cd"));
515        assert!(masked.contains("SECRET='[REDACTED]'"));
516    }
517
518    #[test]
519    fn test_build_copy_target_decodes_file_uri_path() {
520        let storage = ObjectStoreConfig::default();
521        let snapshot_root = create_temp_dir("my backup");
522        let snapshot_uri = Url::from_file_path(snapshot_root.path())
523            .expect("absolute platform path should convert to file:// URI")
524            .to_string();
525        let expected = normalize_path(&format!(
526            "{}/{}",
527            snapshot_root.path().to_string_lossy(),
528            data_dir_for_schema_chunk("public", 7)
529        ));
530        let target = build_copy_target(&snapshot_uri, &storage, "public", 7)
531            .expect("file:// copy target should be built");
532
533        assert!(snapshot_uri.contains("%20"));
534        assert!(!target.location.contains("%20"));
535        assert!(target.location.contains("my backup"));
536        assert_eq!(target.location, expected);
537    }
538}