Skip to main content

cli/data/export_v2/
data.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_base::secrets::{ExposeSecret, SecretString};
16use common_telemetry::info;
17use object_store::util::{join_path, normalize_path};
18use snafu::ResultExt;
19use url::Url;
20
21use crate::common::ObjectStoreConfig;
22use crate::data::export_v2::error::{DatabaseSnafu, InvalidUriSnafu, Result, UrlParseSnafu};
23use crate::data::export_v2::manifest::{DataFormat, TimeRange};
24use crate::data::path::data_dir_for_schema_chunk;
25use crate::data::snapshot_storage::StorageScheme;
26use crate::data::sql::{escape_sql_identifier, escape_sql_literal};
27use crate::database::DatabaseClient;
28
29pub(super) struct CopyOptions {
30    pub(super) format: DataFormat,
31    pub(super) time_range: TimeRange,
32    pub(super) parallelism: usize,
33}
34
35pub(super) struct CopyTarget {
36    pub(super) location: String,
37    pub(super) connection: String,
38    secrets: Vec<Option<String>>,
39}
40
41impl CopyTarget {
42    fn mask_sql(&self, sql: &str) -> String {
43        mask_secrets(sql, &self.secrets)
44    }
45}
46
47pub(super) fn build_copy_target(
48    snapshot_uri: &str,
49    storage: &ObjectStoreConfig,
50    schema: &str,
51    chunk_id: u32,
52) -> Result<CopyTarget> {
53    let url = Url::parse(snapshot_uri).context(UrlParseSnafu)?;
54    let scheme = StorageScheme::from_uri(snapshot_uri)?;
55    let suffix = data_dir_for_schema_chunk(schema, chunk_id);
56
57    match scheme {
58        StorageScheme::File => {
59            let root = url.to_file_path().map_err(|_| {
60                InvalidUriSnafu {
61                    uri: snapshot_uri,
62                    reason: "file:// URI must use an absolute path like file:///tmp/backup",
63                }
64                .build()
65            })?;
66            let location = normalize_path(&format!("{}/{}", root.to_string_lossy(), suffix));
67            Ok(CopyTarget {
68                location,
69                connection: String::new(),
70                secrets: Vec::new(),
71            })
72        }
73        StorageScheme::S3 => {
74            let (bucket, root) = extract_bucket_root(&url, snapshot_uri)?;
75            let location = format!("s3://{}/{}", bucket, join_root(&root, &suffix));
76            let (connection, secrets) = build_s3_connection(storage);
77            Ok(CopyTarget {
78                location,
79                connection,
80                secrets,
81            })
82        }
83        StorageScheme::Oss => {
84            let (bucket, root) = extract_bucket_root(&url, snapshot_uri)?;
85            let location = format!("oss://{}/{}", bucket, join_root(&root, &suffix));
86            let (connection, secrets) = build_oss_connection(storage);
87            Ok(CopyTarget {
88                location,
89                connection,
90                secrets,
91            })
92        }
93        StorageScheme::Gcs => {
94            let (bucket, root) = extract_bucket_root(&url, snapshot_uri)?;
95            let location = format!("gcs://{}/{}", bucket, join_root(&root, &suffix));
96            let (connection, secrets) = build_gcs_connection(storage, snapshot_uri)?;
97            Ok(CopyTarget {
98                location,
99                connection,
100                secrets,
101            })
102        }
103        StorageScheme::Azblob => {
104            let (bucket, root) = extract_bucket_root(&url, snapshot_uri)?;
105            let location = format!("azblob://{}/{}", bucket, join_root(&root, &suffix));
106            let (connection, secrets) = build_azblob_connection(storage);
107            Ok(CopyTarget {
108                location,
109                connection,
110                secrets,
111            })
112        }
113    }
114}
115
116pub(super) async fn execute_copy_database(
117    database_client: &DatabaseClient,
118    catalog: &str,
119    schema: &str,
120    target: &CopyTarget,
121    options: &CopyOptions,
122) -> Result<()> {
123    let with_options = build_with_options(options);
124    let sql = format!(
125        r#"COPY DATABASE "{}"."{}" TO '{}' WITH ({}){};"#,
126        escape_sql_identifier(catalog),
127        escape_sql_identifier(schema),
128        escape_sql_literal(&target.location),
129        with_options,
130        target.connection
131    );
132    let safe_sql = target.mask_sql(&sql);
133    info!("Executing sql: {}", safe_sql);
134    database_client
135        .sql_in_public(&sql)
136        .await
137        .context(DatabaseSnafu)?;
138    Ok(())
139}
140
141fn build_with_options(options: &CopyOptions) -> String {
142    let mut parts = vec![format!("FORMAT='{}'", options.format)];
143    if let Some(start) = options.time_range.start {
144        parts.push(format!(
145            "START_TIME='{}'",
146            escape_sql_literal(&start.to_rfc3339())
147        ));
148    }
149    if let Some(end) = options.time_range.end {
150        parts.push(format!(
151            "END_TIME='{}'",
152            escape_sql_literal(&end.to_rfc3339())
153        ));
154    }
155    parts.push(format!("PARALLELISM={}", options.parallelism));
156    parts.join(", ")
157}
158
159fn extract_bucket_root(url: &Url, snapshot_uri: &str) -> Result<(String, String)> {
160    let bucket = url.host_str().unwrap_or("").to_string();
161    if bucket.is_empty() {
162        return InvalidUriSnafu {
163            uri: snapshot_uri,
164            reason: "URI must include bucket/container in host",
165        }
166        .fail();
167    }
168    let root = url
169        .path()
170        .trim_start_matches('/')
171        .trim_end_matches('/')
172        .to_string();
173    Ok((bucket, root))
174}
175
176fn join_root(root: &str, suffix: &str) -> String {
177    join_path(root, suffix).trim_start_matches('/').to_string()
178}
179
180fn build_s3_connection(storage: &ObjectStoreConfig) -> (String, Vec<Option<String>>) {
181    let access_key_id = expose_optional_secret(&storage.s3.s3_access_key_id);
182    let secret_access_key = expose_optional_secret(&storage.s3.s3_secret_access_key);
183
184    let mut options = Vec::new();
185    if let Some(access_key_id) = &access_key_id {
186        options.push(format!(
187            "ACCESS_KEY_ID='{}'",
188            escape_sql_literal(access_key_id)
189        ));
190    }
191    if let Some(secret_access_key) = &secret_access_key {
192        options.push(format!(
193            "SECRET_ACCESS_KEY='{}'",
194            escape_sql_literal(secret_access_key)
195        ));
196    }
197    if let Some(region) = &storage.s3.s3_region {
198        options.push(format!("REGION='{}'", escape_sql_literal(region)));
199    }
200    if let Some(endpoint) = &storage.s3.s3_endpoint {
201        options.push(format!("ENDPOINT='{}'", escape_sql_literal(endpoint)));
202    }
203
204    let secrets = vec![access_key_id, secret_access_key];
205    let connection = if options.is_empty() {
206        String::new()
207    } else {
208        format!(" CONNECTION ({})", options.join(", "))
209    };
210    (connection, secrets)
211}
212
213fn build_oss_connection(storage: &ObjectStoreConfig) -> (String, Vec<Option<String>>) {
214    let access_key_id = expose_optional_secret(&storage.oss.oss_access_key_id);
215    let access_key_secret = expose_optional_secret(&storage.oss.oss_access_key_secret);
216
217    let mut options = Vec::new();
218    if let Some(access_key_id) = &access_key_id {
219        options.push(format!(
220            "ACCESS_KEY_ID='{}'",
221            escape_sql_literal(access_key_id)
222        ));
223    }
224    if let Some(access_key_secret) = &access_key_secret {
225        options.push(format!(
226            "ACCESS_KEY_SECRET='{}'",
227            escape_sql_literal(access_key_secret)
228        ));
229    }
230    if !storage.oss.oss_endpoint.is_empty() {
231        options.push(format!(
232            "ENDPOINT='{}'",
233            escape_sql_literal(&storage.oss.oss_endpoint)
234        ));
235    }
236
237    let secrets = vec![access_key_id, access_key_secret];
238    let connection = if options.is_empty() {
239        String::new()
240    } else {
241        format!(" CONNECTION ({})", options.join(", "))
242    };
243    (connection, secrets)
244}
245
246fn build_gcs_connection(
247    storage: &ObjectStoreConfig,
248    snapshot_uri: &str,
249) -> Result<(String, Vec<Option<String>>)> {
250    let credential_path = expose_optional_secret(&storage.gcs.gcs_credential_path);
251    let credential = expose_optional_secret(&storage.gcs.gcs_credential);
252
253    if credential.is_none() && credential_path.is_some() {
254        return InvalidUriSnafu {
255            uri: snapshot_uri,
256            reason: "gcs_credential_path is not supported for server-side COPY; provide gcs_credential or rely on server-side ADC",
257        }
258        .fail();
259    }
260
261    let mut options = Vec::new();
262    if let Some(credential) = &credential {
263        options.push(format!("CREDENTIAL='{}'", escape_sql_literal(credential)));
264    }
265    if !storage.gcs.gcs_scope.is_empty() {
266        options.push(format!(
267            "SCOPE='{}'",
268            escape_sql_literal(&storage.gcs.gcs_scope)
269        ));
270    }
271    if !storage.gcs.gcs_endpoint.is_empty() {
272        options.push(format!(
273            "ENDPOINT='{}'",
274            escape_sql_literal(&storage.gcs.gcs_endpoint)
275        ));
276    }
277
278    let connection = if options.is_empty() {
279        String::new()
280    } else {
281        format!(" CONNECTION ({})", options.join(", "))
282    };
283    let secrets = vec![credential_path, credential];
284    Ok((connection, secrets))
285}
286
287fn build_azblob_connection(storage: &ObjectStoreConfig) -> (String, Vec<Option<String>>) {
288    let account_name = expose_optional_secret(&storage.azblob.azblob_account_name);
289    let account_key = expose_optional_secret(&storage.azblob.azblob_account_key);
290    let sas_token = storage.azblob.azblob_sas_token.clone();
291
292    let mut options = Vec::new();
293    if let Some(account_name) = &account_name {
294        options.push(format!(
295            "ACCOUNT_NAME='{}'",
296            escape_sql_literal(account_name)
297        ));
298    }
299    if let Some(account_key) = &account_key {
300        options.push(format!("ACCOUNT_KEY='{}'", escape_sql_literal(account_key)));
301    }
302    if let Some(sas_token) = &sas_token {
303        options.push(format!("SAS_TOKEN='{}'", escape_sql_literal(sas_token)));
304    }
305    if !storage.azblob.azblob_endpoint.is_empty() {
306        options.push(format!(
307            "ENDPOINT='{}'",
308            escape_sql_literal(&storage.azblob.azblob_endpoint)
309        ));
310    }
311
312    let secrets = vec![account_name, account_key, sas_token];
313    let connection = if options.is_empty() {
314        String::new()
315    } else {
316        format!(" CONNECTION ({})", options.join(", "))
317    };
318    (connection, secrets)
319}
320
321fn expose_optional_secret(secret: &Option<SecretString>) -> Option<String> {
322    secret.as_ref().map(|s| s.expose_secret().to_owned())
323}
324
325fn mask_secrets(sql: &str, secrets: &[Option<String>]) -> String {
326    let mut masked = sql.to_string();
327    for secret in secrets {
328        if let Some(secret) = secret
329            && !secret.is_empty()
330        {
331            masked = masked.replace(secret, "[REDACTED]");
332        }
333    }
334    masked
335}
336
337#[cfg(test)]
338mod tests {
339    use common_base::secrets::SecretString;
340    use common_test_util::temp_dir::create_temp_dir;
341
342    use super::*;
343    use crate::common::{PrefixedAzblobConnection, PrefixedGcsConnection, PrefixedOssConnection};
344
345    #[test]
346    fn test_build_oss_connection_includes_endpoint() {
347        let storage = ObjectStoreConfig {
348            oss: PrefixedOssConnection {
349                oss_endpoint: "https://oss.example.com".to_string(),
350                oss_access_key_id: Some(SecretString::from("key_id".to_string())),
351                oss_access_key_secret: Some(SecretString::from("key_secret".to_string())),
352                ..Default::default()
353            },
354            ..Default::default()
355        };
356
357        let (connection, _) = build_oss_connection(&storage);
358        assert!(connection.contains("ENDPOINT='https://oss.example.com'"));
359    }
360
361    #[test]
362    fn test_build_gcs_connection_uses_scope_and_inline_credential() {
363        let storage = ObjectStoreConfig {
364            gcs: PrefixedGcsConnection {
365                gcs_scope: "scope-a".to_string(),
366                gcs_endpoint: "https://storage.googleapis.com".to_string(),
367                gcs_credential: Some(SecretString::from("credential-json".to_string())),
368                ..Default::default()
369            },
370            ..Default::default()
371        };
372
373        let (connection, _) = build_gcs_connection(&storage, "gcs://bucket/root").unwrap();
374        assert!(connection.contains("CREDENTIAL='credential-json'"));
375        assert!(connection.contains("SCOPE='scope-a'"));
376        assert!(connection.contains("ENDPOINT='https://storage.googleapis.com'"));
377        assert!(!connection.contains("CREDENTIAL_PATH"));
378    }
379
380    #[test]
381    fn test_build_gcs_connection_rejects_credential_path_only() {
382        let storage = ObjectStoreConfig {
383            gcs: PrefixedGcsConnection {
384                gcs_scope: "scope-a".to_string(),
385                gcs_credential_path: Some(SecretString::from("/tmp/creds.json".to_string())),
386                ..Default::default()
387            },
388            ..Default::default()
389        };
390
391        let error = build_gcs_connection(&storage, "gcs://bucket/root")
392            .expect_err("credential_path-only should be rejected")
393            .to_string();
394        assert!(error.contains("gcs_credential_path is not supported"));
395    }
396
397    #[test]
398    fn test_build_azblob_connection_includes_endpoint() {
399        let storage = ObjectStoreConfig {
400            azblob: PrefixedAzblobConnection {
401                azblob_account_name: Some(SecretString::from("account".to_string())),
402                azblob_account_key: Some(SecretString::from("key".to_string())),
403                azblob_endpoint: "https://blob.example.com".to_string(),
404                ..Default::default()
405            },
406            ..Default::default()
407        };
408
409        let (connection, _) = build_azblob_connection(&storage);
410        assert!(connection.contains("ENDPOINT='https://blob.example.com'"));
411    }
412
413    #[test]
414    fn test_build_azblob_connection_redacts_sas_token() {
415        let storage = ObjectStoreConfig {
416            azblob: PrefixedAzblobConnection {
417                azblob_account_name: Some(SecretString::from("account".to_string())),
418                azblob_account_key: Some(SecretString::from("key".to_string())),
419                azblob_sas_token: Some("sig=secret-token".to_string()),
420                ..Default::default()
421            },
422            ..Default::default()
423        };
424
425        let (connection, secrets) = build_azblob_connection(&storage);
426        let masked = mask_secrets(&connection, &secrets);
427
428        assert!(connection.contains("SAS_TOKEN='sig=secret-token'"));
429        assert!(masked.contains("SAS_TOKEN='[REDACTED]'"));
430        assert!(!masked.contains("sig=secret-token"));
431    }
432
433    #[test]
434    fn test_build_copy_target_decodes_file_uri_path() {
435        let storage = ObjectStoreConfig::default();
436        let snapshot_root = create_temp_dir("my backup");
437        let snapshot_uri = Url::from_file_path(snapshot_root.path())
438            .expect("absolute platform path should convert to file:// URI")
439            .to_string();
440        let expected = normalize_path(&format!(
441            "{}/{}",
442            snapshot_root.path().to_string_lossy(),
443            data_dir_for_schema_chunk("public", 7)
444        ));
445        let target = build_copy_target(&snapshot_uri, &storage, "public", 7)
446            .expect("file:// copy target should be built");
447
448        assert!(snapshot_uri.contains("%20"));
449        assert!(!target.location.contains("%20"));
450        assert!(target.location.contains("my backup"));
451        assert_eq!(target.location, expected);
452    }
453}