cli/data/
export.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::path::PathBuf;
17use std::sync::Arc;
18use std::time::Duration;
19
20use async_trait::async_trait;
21use clap::{Parser, ValueEnum};
22use common_base::secrets::{ExposeSecret, SecretString};
23use common_error::ext::BoxedError;
24use common_telemetry::{debug, error, info};
25use object_store::layers::LoggingLayer;
26use object_store::services::Oss;
27use object_store::{ObjectStore, services};
28use serde_json::Value;
29use snafu::{OptionExt, ResultExt};
30use tokio::sync::Semaphore;
31use tokio::time::Instant;
32
33use crate::data::{COPY_PATH_PLACEHOLDER, default_database};
34use crate::database::{DatabaseClient, parse_proxy_opts};
35use crate::error::{
36    EmptyResultSnafu, Error, OpenDalSnafu, OutputDirNotSetSnafu, Result, S3ConfigNotSetSnafu,
37    SchemaNotFoundSnafu,
38};
39use crate::{Tool, database};
40
41type TableReference = (String, String, String);
42
43#[derive(Debug, Default, Clone, ValueEnum)]
44enum ExportTarget {
45    /// Export all table schemas, corresponding to `SHOW CREATE TABLE`.
46    Schema,
47    /// Export all table data, corresponding to `COPY DATABASE TO`.
48    Data,
49    /// Export all table schemas and data at once.
50    #[default]
51    All,
52}
53
54/// Command for exporting data from the GreptimeDB.
55#[derive(Debug, Default, Parser)]
56pub struct ExportCommand {
57    /// Server address to connect
58    #[clap(long)]
59    addr: String,
60
61    /// Directory to put the exported data. E.g.: /tmp/greptimedb-export
62    /// for local export.
63    #[clap(long)]
64    output_dir: Option<String>,
65
66    /// The name of the catalog to export.
67    #[clap(long, default_value_t = default_database())]
68    database: String,
69
70    /// The number of databases exported in parallel.
71    /// For example, if there are 20 databases and `db_parallelism` is 4,
72    /// 4 databases will be exported concurrently.
73    #[clap(long, short = 'j', default_value = "1", alias = "export-jobs")]
74    db_parallelism: usize,
75
76    /// The number of tables exported in parallel within a single database.
77    /// For example, if a database has 30 tables and `parallelism` is 8,
78    /// 8 tables will be exported concurrently.
79    #[clap(long, default_value = "4")]
80    table_parallelism: usize,
81
82    /// Max retry times for each job.
83    #[clap(long, default_value = "3")]
84    max_retry: usize,
85
86    /// Things to export
87    #[clap(long, short = 't', value_enum, default_value = "all")]
88    target: ExportTarget,
89
90    /// A half-open time range: [start_time, end_time).
91    /// The start of the time range (time-index column) for data export.
92    #[clap(long)]
93    start_time: Option<String>,
94
95    /// A half-open time range: [start_time, end_time).
96    /// The end of the time range (time-index column) for data export.
97    #[clap(long)]
98    end_time: Option<String>,
99
100    /// The basic authentication for connecting to the server
101    #[clap(long)]
102    auth_basic: Option<String>,
103
104    /// The timeout of invoking the database.
105    ///
106    /// It is used to override the server-side timeout setting.
107    /// The default behavior will disable server-side default timeout(i.e. `0s`).
108    #[clap(long, value_parser = humantime::parse_duration)]
109    timeout: Option<Duration>,
110
111    /// The proxy server address to connect, if set, will override the system proxy.
112    ///
113    /// The default behavior will use the system proxy if neither `proxy` nor `no_proxy` is set.
114    #[clap(long)]
115    proxy: Option<String>,
116
117    /// Disable proxy server, if set, will not use any proxy.
118    #[clap(long)]
119    no_proxy: bool,
120
121    /// if export data to s3
122    #[clap(long)]
123    s3: bool,
124
125    /// if both `ddl_local_dir` and remote storage (s3/oss) are set, `ddl_local_dir` will be only used for
126    /// exported SQL files, and the data will be exported to remote storage.
127    ///
128    /// Note that `ddl_local_dir` export sql files to **LOCAL** file system, this is useful if export client don't have
129    /// direct access to remote storage.
130    ///
131    /// if remote storage is set but `ddl_local_dir` is not set, both SQL&data will be exported to remote storage.
132    #[clap(long)]
133    ddl_local_dir: Option<String>,
134
135    /// The s3 bucket name
136    /// if s3 is set, this is required
137    #[clap(long)]
138    s3_bucket: Option<String>,
139
140    // The s3 root path
141    /// if s3 is set, this is required
142    #[clap(long)]
143    s3_root: Option<String>,
144
145    /// The s3 endpoint
146    /// if s3 is set, this is required
147    #[clap(long)]
148    s3_endpoint: Option<String>,
149
150    /// The s3 access key
151    /// if s3 is set, this is required
152    #[clap(long)]
153    s3_access_key: Option<String>,
154
155    /// The s3 secret key
156    /// if s3 is set, this is required
157    #[clap(long)]
158    s3_secret_key: Option<String>,
159
160    /// The s3 region
161    /// if s3 is set, this is required
162    #[clap(long)]
163    s3_region: Option<String>,
164
165    /// if export data to oss
166    #[clap(long)]
167    oss: bool,
168
169    /// The oss bucket name
170    /// if oss is set, this is required
171    #[clap(long)]
172    oss_bucket: Option<String>,
173
174    /// The oss endpoint
175    /// if oss is set, this is required
176    #[clap(long)]
177    oss_endpoint: Option<String>,
178
179    /// The oss access key id
180    /// if oss is set, this is required
181    #[clap(long)]
182    oss_access_key_id: Option<String>,
183
184    /// The oss access key secret
185    /// if oss is set, this is required
186    #[clap(long)]
187    oss_access_key_secret: Option<String>,
188}
189
190impl ExportCommand {
191    pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
192        if self.s3
193            && (self.s3_bucket.is_none()
194                || self.s3_endpoint.is_none()
195                || self.s3_access_key.is_none()
196                || self.s3_secret_key.is_none()
197                || self.s3_region.is_none())
198        {
199            return Err(BoxedError::new(S3ConfigNotSetSnafu {}.build()));
200        }
201        if !self.s3 && !self.oss && self.output_dir.is_none() {
202            return Err(BoxedError::new(OutputDirNotSetSnafu {}.build()));
203        }
204        let (catalog, schema) =
205            database::split_database(&self.database).map_err(BoxedError::new)?;
206        let proxy = parse_proxy_opts(self.proxy.clone(), self.no_proxy)?;
207        let database_client = DatabaseClient::new(
208            self.addr.clone(),
209            catalog.clone(),
210            self.auth_basic.clone(),
211            // Treats `None` as `0s` to disable server-side default timeout.
212            self.timeout.unwrap_or_default(),
213            proxy,
214        );
215
216        Ok(Box::new(Export {
217            catalog,
218            schema,
219            database_client,
220            output_dir: self.output_dir.clone(),
221            export_jobs: self.db_parallelism,
222            target: self.target.clone(),
223            start_time: self.start_time.clone(),
224            end_time: self.end_time.clone(),
225            parallelism: self.table_parallelism,
226            s3: self.s3,
227            ddl_local_dir: self.ddl_local_dir.clone(),
228            s3_bucket: self.s3_bucket.clone(),
229            s3_root: self.s3_root.clone(),
230            s3_endpoint: self.s3_endpoint.clone(),
231            // Wrap sensitive values in SecretString
232            s3_access_key: self
233                .s3_access_key
234                .as_ref()
235                .map(|k| SecretString::from(k.clone())),
236            s3_secret_key: self
237                .s3_secret_key
238                .as_ref()
239                .map(|k| SecretString::from(k.clone())),
240            s3_region: self.s3_region.clone(),
241            oss: self.oss,
242            oss_bucket: self.oss_bucket.clone(),
243            oss_endpoint: self.oss_endpoint.clone(),
244            // Wrap sensitive values in SecretString
245            oss_access_key_id: self
246                .oss_access_key_id
247                .as_ref()
248                .map(|k| SecretString::from(k.clone())),
249            oss_access_key_secret: self
250                .oss_access_key_secret
251                .as_ref()
252                .map(|k| SecretString::from(k.clone())),
253        }))
254    }
255}
256
257#[derive(Clone)]
258pub struct Export {
259    catalog: String,
260    schema: Option<String>,
261    database_client: DatabaseClient,
262    output_dir: Option<String>,
263    export_jobs: usize,
264    target: ExportTarget,
265    start_time: Option<String>,
266    end_time: Option<String>,
267    parallelism: usize,
268    s3: bool,
269    ddl_local_dir: Option<String>,
270    s3_bucket: Option<String>,
271    s3_root: Option<String>,
272    s3_endpoint: Option<String>,
273    // Changed to SecretString for sensitive data
274    s3_access_key: Option<SecretString>,
275    s3_secret_key: Option<SecretString>,
276    s3_region: Option<String>,
277    oss: bool,
278    oss_bucket: Option<String>,
279    oss_endpoint: Option<String>,
280    // Changed to SecretString for sensitive data
281    oss_access_key_id: Option<SecretString>,
282    oss_access_key_secret: Option<SecretString>,
283}
284
285impl Export {
286    fn catalog_path(&self) -> PathBuf {
287        if self.s3 || self.oss {
288            PathBuf::from(&self.catalog)
289        } else if let Some(dir) = &self.output_dir {
290            PathBuf::from(dir).join(&self.catalog)
291        } else {
292            unreachable!("catalog_path: output_dir must be set when not using remote storage")
293        }
294    }
295
296    async fn get_db_names(&self) -> Result<Vec<String>> {
297        let db_names = self.all_db_names().await?;
298        let Some(schema) = &self.schema else {
299            return Ok(db_names);
300        };
301
302        // Check if the schema exists
303        db_names
304            .into_iter()
305            .find(|db_name| db_name.to_lowercase() == schema.to_lowercase())
306            .map(|name| vec![name])
307            .context(SchemaNotFoundSnafu {
308                catalog: &self.catalog,
309                schema,
310            })
311    }
312
313    /// Iterate over all db names.
314    async fn all_db_names(&self) -> Result<Vec<String>> {
315        let records = self
316            .database_client
317            .sql_in_public("SHOW DATABASES")
318            .await?
319            .context(EmptyResultSnafu)?;
320        let mut result = Vec::with_capacity(records.len());
321        for value in records {
322            let Value::String(schema) = &value[0] else {
323                unreachable!()
324            };
325            if schema == common_catalog::consts::INFORMATION_SCHEMA_NAME {
326                continue;
327            }
328            if schema == common_catalog::consts::PG_CATALOG_NAME {
329                continue;
330            }
331            result.push(schema.clone());
332        }
333        Ok(result)
334    }
335
336    /// Return a list of [`TableReference`] to be exported.
337    /// Includes all tables under the given `catalog` and `schema`.
338    async fn get_table_list(
339        &self,
340        catalog: &str,
341        schema: &str,
342    ) -> Result<(
343        Vec<TableReference>,
344        Vec<TableReference>,
345        Vec<TableReference>,
346    )> {
347        // Puts all metric table first
348        let sql = format!(
349            "SELECT table_catalog, table_schema, table_name \
350            FROM information_schema.columns \
351            WHERE column_name = '__tsid' \
352                and table_catalog = \'{catalog}\' \
353                and table_schema = \'{schema}\'"
354        );
355        let records = self
356            .database_client
357            .sql_in_public(&sql)
358            .await?
359            .context(EmptyResultSnafu)?;
360        let mut metric_physical_tables = HashSet::with_capacity(records.len());
361        for value in records {
362            let mut t = Vec::with_capacity(3);
363            for v in &value {
364                let Value::String(value) = v else {
365                    unreachable!()
366                };
367                t.push(value);
368            }
369            metric_physical_tables.insert((t[0].clone(), t[1].clone(), t[2].clone()));
370        }
371
372        let sql = format!(
373            "SELECT table_catalog, table_schema, table_name, table_type \
374            FROM information_schema.tables \
375            WHERE (table_type = \'BASE TABLE\' OR table_type = \'VIEW\') \
376                and table_catalog = \'{catalog}\' \
377                and table_schema = \'{schema}\'",
378        );
379        let records = self
380            .database_client
381            .sql_in_public(&sql)
382            .await?
383            .context(EmptyResultSnafu)?;
384
385        debug!("Fetched table/view list: {:?}", records);
386
387        if records.is_empty() {
388            return Ok((vec![], vec![], vec![]));
389        }
390
391        let mut remaining_tables = Vec::with_capacity(records.len());
392        let mut views = Vec::new();
393        for value in records {
394            let mut t = Vec::with_capacity(4);
395            for v in &value {
396                let Value::String(value) = v else {
397                    unreachable!()
398                };
399                t.push(value);
400            }
401            let table = (t[0].clone(), t[1].clone(), t[2].clone());
402            let table_type = t[3].as_str();
403            // Ignores the physical table
404            if !metric_physical_tables.contains(&table) {
405                if table_type == "VIEW" {
406                    views.push(table);
407                } else {
408                    remaining_tables.push(table);
409                }
410            }
411        }
412
413        Ok((
414            metric_physical_tables.into_iter().collect(),
415            remaining_tables,
416            views,
417        ))
418    }
419
420    async fn show_create(
421        &self,
422        show_type: &str,
423        catalog: &str,
424        schema: &str,
425        table: Option<&str>,
426    ) -> Result<String> {
427        let sql = match table {
428            Some(table) => format!(
429                r#"SHOW CREATE {} "{}"."{}"."{}""#,
430                show_type, catalog, schema, table
431            ),
432            None => format!(r#"SHOW CREATE {} "{}"."{}""#, show_type, catalog, schema),
433        };
434        let records = self
435            .database_client
436            .sql_in_public(&sql)
437            .await?
438            .context(EmptyResultSnafu)?;
439        let Value::String(create) = &records[0][1] else {
440            unreachable!()
441        };
442
443        Ok(format!("{};\n", create))
444    }
445
446    async fn export_create_database(&self) -> Result<()> {
447        let timer = Instant::now();
448        let db_names = self.get_db_names().await?;
449        let db_count = db_names.len();
450        let operator = self.build_prefer_fs_operator().await?;
451
452        for schema in db_names {
453            let create_database = self
454                .show_create("DATABASE", &self.catalog, &schema, None)
455                .await?;
456
457            let file_path = self.get_file_path(&schema, "create_database.sql");
458            self.write_to_storage(&operator, &file_path, create_database.into_bytes())
459                .await?;
460
461            info!(
462                "Exported {}.{} database creation SQL to {}",
463                self.catalog,
464                schema,
465                self.format_output_path(&file_path)
466            );
467        }
468
469        let elapsed = timer.elapsed();
470        info!("Success {db_count} jobs, cost: {elapsed:?}");
471
472        Ok(())
473    }
474
475    async fn export_create_table(&self) -> Result<()> {
476        let timer = Instant::now();
477        let semaphore = Arc::new(Semaphore::new(self.export_jobs));
478        let db_names = self.get_db_names().await?;
479        let db_count = db_names.len();
480        let operator = Arc::new(self.build_prefer_fs_operator().await?);
481        let mut tasks = Vec::with_capacity(db_names.len());
482
483        for schema in db_names {
484            let semaphore_moved = semaphore.clone();
485            let export_self = self.clone();
486            let operator = operator.clone();
487            tasks.push(async move {
488                let _permit = semaphore_moved.acquire().await.unwrap();
489                let (metric_physical_tables, remaining_tables, views) = export_self
490                    .get_table_list(&export_self.catalog, &schema)
491                    .await?;
492
493                // Create directory if needed for file system storage
494                if !export_self.s3 && !export_self.oss {
495                    let db_dir = format!("{}/{}/", export_self.catalog, schema);
496                    operator.create_dir(&db_dir).await.context(OpenDalSnafu)?;
497                }
498
499                let file_path = export_self.get_file_path(&schema, "create_tables.sql");
500                let mut content = Vec::new();
501
502                // Add table creation SQL
503                for (c, s, t) in metric_physical_tables.iter().chain(&remaining_tables) {
504                    let create_table = export_self.show_create("TABLE", c, s, Some(t)).await?;
505                    content.extend_from_slice(create_table.as_bytes());
506                }
507
508                // Add view creation SQL
509                for (c, s, v) in &views {
510                    let create_view = export_self.show_create("VIEW", c, s, Some(v)).await?;
511                    content.extend_from_slice(create_view.as_bytes());
512                }
513
514                // Write to storage
515                export_self
516                    .write_to_storage(&operator, &file_path, content)
517                    .await?;
518
519                info!(
520                    "Finished exporting {}.{schema} with {} table schemas to path: {}",
521                    export_self.catalog,
522                    metric_physical_tables.len() + remaining_tables.len() + views.len(),
523                    export_self.format_output_path(&file_path)
524                );
525
526                Ok::<(), Error>(())
527            });
528        }
529
530        let success = self.execute_tasks(tasks).await;
531        let elapsed = timer.elapsed();
532        info!("Success {success}/{db_count} jobs, cost: {elapsed:?}");
533
534        Ok(())
535    }
536
537    async fn build_operator(&self) -> Result<ObjectStore> {
538        if self.s3 {
539            self.build_s3_operator().await
540        } else if self.oss {
541            self.build_oss_operator().await
542        } else {
543            self.build_fs_operator().await
544        }
545    }
546
547    /// build operator with preference for file system
548    async fn build_prefer_fs_operator(&self) -> Result<ObjectStore> {
549        if (self.s3 || self.oss) && self.ddl_local_dir.is_some() {
550            let root = self.ddl_local_dir.as_ref().unwrap().clone();
551            let op = ObjectStore::new(services::Fs::default().root(&root))
552                .context(OpenDalSnafu)?
553                .layer(LoggingLayer::default())
554                .finish();
555            Ok(op)
556        } else if self.s3 {
557            self.build_s3_operator().await
558        } else if self.oss {
559            self.build_oss_operator().await
560        } else {
561            self.build_fs_operator().await
562        }
563    }
564
565    async fn build_s3_operator(&self) -> Result<ObjectStore> {
566        let mut builder = services::S3::default().bucket(
567            self.s3_bucket
568                .as_ref()
569                .expect("s3_bucket must be provided when s3 is enabled"),
570        );
571
572        if let Some(root) = self.s3_root.as_ref() {
573            builder = builder.root(root);
574        }
575
576        if let Some(endpoint) = self.s3_endpoint.as_ref() {
577            builder = builder.endpoint(endpoint);
578        }
579
580        if let Some(region) = self.s3_region.as_ref() {
581            builder = builder.region(region);
582        }
583
584        if let Some(key_id) = self.s3_access_key.as_ref() {
585            builder = builder.access_key_id(key_id.expose_secret());
586        }
587
588        if let Some(secret_key) = self.s3_secret_key.as_ref() {
589            builder = builder.secret_access_key(secret_key.expose_secret());
590        }
591
592        let op = ObjectStore::new(builder)
593            .context(OpenDalSnafu)?
594            .layer(LoggingLayer::default())
595            .finish();
596        Ok(op)
597    }
598
599    async fn build_oss_operator(&self) -> Result<ObjectStore> {
600        let mut builder = Oss::default()
601            .bucket(self.oss_bucket.as_ref().expect("oss_bucket must be set"))
602            .endpoint(
603                self.oss_endpoint
604                    .as_ref()
605                    .expect("oss_endpoint must be set"),
606            );
607
608        // Use expose_secret() to access the actual secret value
609        if let Some(key_id) = self.oss_access_key_id.as_ref() {
610            builder = builder.access_key_id(key_id.expose_secret());
611        }
612        if let Some(secret_key) = self.oss_access_key_secret.as_ref() {
613            builder = builder.access_key_secret(secret_key.expose_secret());
614        }
615
616        let op = ObjectStore::new(builder)
617            .context(OpenDalSnafu)?
618            .layer(LoggingLayer::default())
619            .finish();
620        Ok(op)
621    }
622
623    async fn build_fs_operator(&self) -> Result<ObjectStore> {
624        let root = self
625            .output_dir
626            .as_ref()
627            .context(OutputDirNotSetSnafu)?
628            .clone();
629        let op = ObjectStore::new(services::Fs::default().root(&root))
630            .context(OpenDalSnafu)?
631            .layer(LoggingLayer::default())
632            .finish();
633        Ok(op)
634    }
635
636    async fn export_database_data(&self) -> Result<()> {
637        let timer = Instant::now();
638        let semaphore = Arc::new(Semaphore::new(self.export_jobs));
639        let db_names = self.get_db_names().await?;
640        let db_count = db_names.len();
641        let mut tasks = Vec::with_capacity(db_count);
642        let operator = Arc::new(self.build_operator().await?);
643        let fs_first_operator = Arc::new(self.build_prefer_fs_operator().await?);
644        let with_options = build_with_options(&self.start_time, &self.end_time, self.parallelism);
645
646        for schema in db_names {
647            let semaphore_moved = semaphore.clone();
648            let export_self = self.clone();
649            let with_options_clone = with_options.clone();
650            let operator = operator.clone();
651            let fs_first_operator = fs_first_operator.clone();
652
653            tasks.push(async move {
654                let _permit = semaphore_moved.acquire().await.unwrap();
655
656                // Create directory if not using remote storage
657                if !export_self.s3 && !export_self.oss {
658                    let db_dir = format!("{}/{}/", export_self.catalog, schema);
659                    operator.create_dir(&db_dir).await.context(OpenDalSnafu)?;
660                }
661
662                let (path, connection_part) = export_self.get_storage_params(&schema);
663
664                // Execute COPY DATABASE TO command
665                let sql = format!(
666                    r#"COPY DATABASE "{}"."{}" TO '{}' WITH ({}){};"#,
667                    export_self.catalog, schema, path, with_options_clone, connection_part
668                );
669
670                // Log SQL command but mask sensitive information
671                let safe_sql = export_self.mask_sensitive_sql(&sql);
672                info!("Executing sql: {}", safe_sql);
673
674                export_self.database_client.sql_in_public(&sql).await?;
675                info!(
676                    "Finished exporting {}.{} data to {}",
677                    export_self.catalog, schema, path
678                );
679
680                // Create copy_from.sql file
681                let copy_database_from_sql = {
682                    let command_without_connection = format!(
683                        r#"COPY DATABASE "{}"."{}" FROM '{}' WITH ({});"#,
684                        export_self.catalog, schema, COPY_PATH_PLACEHOLDER, with_options_clone
685                    );
686
687                    if connection_part.is_empty() {
688                        command_without_connection
689                    } else {
690                        let command_with_connection = format!(
691                            r#"COPY DATABASE "{}"."{}" FROM '{}' WITH ({}){};"#,
692                            export_self.catalog, schema, path, with_options_clone, connection_part
693                        );
694
695                        format!(
696                            "-- {}\n{}",
697                            command_with_connection, command_without_connection
698                        )
699                    }
700                };
701
702                let copy_from_path = export_self.get_file_path(&schema, "copy_from.sql");
703                export_self
704                    .write_to_storage(
705                        &fs_first_operator,
706                        &copy_from_path,
707                        copy_database_from_sql.into_bytes(),
708                    )
709                    .await?;
710
711                info!(
712                    "Finished exporting {}.{} copy_from.sql to {}",
713                    export_self.catalog,
714                    schema,
715                    export_self.format_output_path(&copy_from_path)
716                );
717
718                Ok::<(), Error>(())
719            });
720        }
721
722        let success = self.execute_tasks(tasks).await;
723        let elapsed = timer.elapsed();
724        info!("Success {success}/{db_count} jobs, costs: {elapsed:?}");
725
726        Ok(())
727    }
728
729    /// Mask sensitive information in SQL commands for safe logging
730    fn mask_sensitive_sql(&self, sql: &str) -> String {
731        let mut masked_sql = sql.to_string();
732
733        // Mask S3 credentials
734        if let Some(access_key) = &self.s3_access_key {
735            masked_sql = masked_sql.replace(access_key.expose_secret(), "[REDACTED]");
736        }
737        if let Some(secret_key) = &self.s3_secret_key {
738            masked_sql = masked_sql.replace(secret_key.expose_secret(), "[REDACTED]");
739        }
740
741        // Mask OSS credentials
742        if let Some(access_key_id) = &self.oss_access_key_id {
743            masked_sql = masked_sql.replace(access_key_id.expose_secret(), "[REDACTED]");
744        }
745        if let Some(access_key_secret) = &self.oss_access_key_secret {
746            masked_sql = masked_sql.replace(access_key_secret.expose_secret(), "[REDACTED]");
747        }
748
749        masked_sql
750    }
751
752    fn get_file_path(&self, schema: &str, file_name: &str) -> String {
753        format!("{}/{}/{}", self.catalog, schema, file_name)
754    }
755
756    fn format_output_path(&self, file_path: &str) -> String {
757        if self.s3 {
758            format!(
759                "s3://{}{}/{}",
760                self.s3_bucket.as_ref().unwrap_or(&String::new()),
761                if let Some(root) = &self.s3_root {
762                    format!("/{}", root)
763                } else {
764                    String::new()
765                },
766                file_path
767            )
768        } else if self.oss {
769            format!(
770                "oss://{}/{}/{}",
771                self.oss_bucket.as_ref().unwrap_or(&String::new()),
772                self.catalog,
773                file_path
774            )
775        } else {
776            format!(
777                "{}/{}",
778                self.output_dir.as_ref().unwrap_or(&String::new()),
779                file_path
780            )
781        }
782    }
783
784    async fn write_to_storage(
785        &self,
786        op: &ObjectStore,
787        file_path: &str,
788        content: Vec<u8>,
789    ) -> Result<()> {
790        op.write(file_path, content)
791            .await
792            .context(OpenDalSnafu)
793            .map(|_| ())
794    }
795
796    fn get_storage_params(&self, schema: &str) -> (String, String) {
797        if self.s3 {
798            let s3_path = format!(
799                "s3://{}{}/{}/{}/",
800                // Safety: s3_bucket is required when s3 is enabled
801                self.s3_bucket.as_ref().unwrap(),
802                if let Some(root) = &self.s3_root {
803                    format!("/{}", root)
804                } else {
805                    String::new()
806                },
807                self.catalog,
808                schema
809            );
810
811            // endpoint is optional
812            let endpoint_option = if let Some(endpoint) = self.s3_endpoint.as_ref() {
813                format!(", ENDPOINT='{}'", endpoint)
814            } else {
815                String::new()
816            };
817
818            // Safety: All s3 options are required
819            // Use expose_secret() to access the actual secret values
820            let connection_options = format!(
821                "ACCESS_KEY_ID='{}', SECRET_ACCESS_KEY='{}', REGION='{}'{}",
822                self.s3_access_key.as_ref().unwrap().expose_secret(),
823                self.s3_secret_key.as_ref().unwrap().expose_secret(),
824                self.s3_region.as_ref().unwrap(),
825                endpoint_option
826            );
827
828            (s3_path, format!(" CONNECTION ({})", connection_options))
829        } else if self.oss {
830            let oss_path = format!(
831                "oss://{}/{}/{}/",
832                self.oss_bucket.as_ref().unwrap(),
833                self.catalog,
834                schema
835            );
836            let endpoint_option = if let Some(endpoint) = self.oss_endpoint.as_ref() {
837                format!(", ENDPOINT='{}'", endpoint)
838            } else {
839                String::new()
840            };
841
842            let connection_options = format!(
843                "ACCESS_KEY_ID='{}', ACCESS_KEY_SECRET='{}'{}",
844                self.oss_access_key_id.as_ref().unwrap().expose_secret(),
845                self.oss_access_key_secret.as_ref().unwrap().expose_secret(),
846                endpoint_option
847            );
848            (oss_path, format!(" CONNECTION ({})", connection_options))
849        } else {
850            (
851                self.catalog_path()
852                    .join(format!("{schema}/"))
853                    .to_string_lossy()
854                    .to_string(),
855                String::new(),
856            )
857        }
858    }
859
860    async fn execute_tasks(
861        &self,
862        tasks: Vec<impl std::future::Future<Output = Result<()>>>,
863    ) -> usize {
864        futures::future::join_all(tasks)
865            .await
866            .into_iter()
867            .filter(|r| match r {
868                Ok(_) => true,
869                Err(e) => {
870                    error!(e; "export job failed");
871                    false
872                }
873            })
874            .count()
875    }
876}
877
878#[async_trait]
879impl Tool for Export {
880    async fn do_work(&self) -> std::result::Result<(), BoxedError> {
881        match self.target {
882            ExportTarget::Schema => {
883                self.export_create_database()
884                    .await
885                    .map_err(BoxedError::new)?;
886                self.export_create_table().await.map_err(BoxedError::new)
887            }
888            ExportTarget::Data => self.export_database_data().await.map_err(BoxedError::new),
889            ExportTarget::All => {
890                self.export_create_database()
891                    .await
892                    .map_err(BoxedError::new)?;
893                self.export_create_table().await.map_err(BoxedError::new)?;
894                self.export_database_data().await.map_err(BoxedError::new)
895            }
896        }
897    }
898}
899
900/// Builds the WITH options string for SQL commands, assuming consistent syntax across S3 and local exports.
901fn build_with_options(
902    start_time: &Option<String>,
903    end_time: &Option<String>,
904    parallelism: usize,
905) -> String {
906    let mut options = vec!["format = 'parquet'".to_string()];
907    if let Some(start) = start_time {
908        options.push(format!("start_time = '{}'", start));
909    }
910    if let Some(end) = end_time {
911        options.push(format!("end_time = '{}'", end));
912    }
913    options.push(format!("parallelism = {}", parallelism));
914    options.join(", ")
915}