cli/data/
export.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::path::PathBuf;
17use std::sync::Arc;
18use std::time::Duration;
19
20use async_trait::async_trait;
21use clap::{Parser, ValueEnum};
22use common_base::secrets::{ExposeSecret, SecretString};
23use common_error::ext::BoxedError;
24use common_telemetry::{debug, error, info};
25use object_store::layers::LoggingLayer;
26use object_store::services::Oss;
27use object_store::{services, ObjectStore};
28use serde_json::Value;
29use snafu::{OptionExt, ResultExt};
30use tokio::sync::Semaphore;
31use tokio::time::Instant;
32
33use crate::database::{parse_proxy_opts, DatabaseClient};
34use crate::error::{
35    EmptyResultSnafu, Error, OpenDalSnafu, OutputDirNotSetSnafu, Result, S3ConfigNotSetSnafu,
36    SchemaNotFoundSnafu,
37};
38use crate::{database, Tool};
39
40type TableReference = (String, String, String);
41
42#[derive(Debug, Default, Clone, ValueEnum)]
43enum ExportTarget {
44    /// Export all table schemas, corresponding to `SHOW CREATE TABLE`.
45    Schema,
46    /// Export all table data, corresponding to `COPY DATABASE TO`.
47    Data,
48    /// Export all table schemas and data at once.
49    #[default]
50    All,
51}
52
53/// Command for exporting data from the GreptimeDB.
54#[derive(Debug, Default, Parser)]
55pub struct ExportCommand {
56    /// Server address to connect
57    #[clap(long)]
58    addr: String,
59
60    /// Directory to put the exported data. E.g.: /tmp/greptimedb-export
61    /// for local export.
62    #[clap(long)]
63    output_dir: Option<String>,
64
65    /// The name of the catalog to export.
66    #[clap(long, default_value = "greptime-*")]
67    database: String,
68
69    /// Parallelism of the export.
70    #[clap(long, short = 'j', default_value = "1")]
71    export_jobs: usize,
72
73    /// Max retry times for each job.
74    #[clap(long, default_value = "3")]
75    max_retry: usize,
76
77    /// Things to export
78    #[clap(long, short = 't', value_enum, default_value = "all")]
79    target: ExportTarget,
80
81    /// A half-open time range: [start_time, end_time).
82    /// The start of the time range (time-index column) for data export.
83    #[clap(long)]
84    start_time: Option<String>,
85
86    /// A half-open time range: [start_time, end_time).
87    /// The end of the time range (time-index column) for data export.
88    #[clap(long)]
89    end_time: Option<String>,
90
91    /// The basic authentication for connecting to the server
92    #[clap(long)]
93    auth_basic: Option<String>,
94
95    /// The timeout of invoking the database.
96    ///
97    /// It is used to override the server-side timeout setting.
98    /// The default behavior will disable server-side default timeout(i.e. `0s`).
99    #[clap(long, value_parser = humantime::parse_duration)]
100    timeout: Option<Duration>,
101
102    /// The proxy server address to connect, if set, will override the system proxy.
103    ///
104    /// The default behavior will use the system proxy if neither `proxy` nor `no_proxy` is set.
105    #[clap(long)]
106    proxy: Option<String>,
107
108    /// Disable proxy server, if set, will not use any proxy.
109    #[clap(long)]
110    no_proxy: bool,
111
112    /// if export data to s3
113    #[clap(long)]
114    s3: bool,
115
116    /// if both `ddl_local_dir` and remote storage (s3/oss) are set, `ddl_local_dir` will be only used for
117    /// exported SQL files, and the data will be exported to remote storage.
118    ///
119    /// Note that `ddl_local_dir` export sql files to **LOCAL** file system, this is useful if export client don't have
120    /// direct access to remote storage.
121    ///
122    /// if remote storage is set but `ddl_local_dir` is not set, both SQL&data will be exported to remote storage.
123    #[clap(long)]
124    ddl_local_dir: Option<String>,
125
126    /// The s3 bucket name
127    /// if s3 is set, this is required
128    #[clap(long)]
129    s3_bucket: Option<String>,
130
131    // The s3 root path
132    /// if s3 is set, this is required
133    #[clap(long)]
134    s3_root: Option<String>,
135
136    /// The s3 endpoint
137    /// if s3 is set, this is required
138    #[clap(long)]
139    s3_endpoint: Option<String>,
140
141    /// The s3 access key
142    /// if s3 is set, this is required
143    #[clap(long)]
144    s3_access_key: Option<String>,
145
146    /// The s3 secret key
147    /// if s3 is set, this is required
148    #[clap(long)]
149    s3_secret_key: Option<String>,
150
151    /// The s3 region
152    /// if s3 is set, this is required
153    #[clap(long)]
154    s3_region: Option<String>,
155
156    /// if export data to oss
157    #[clap(long)]
158    oss: bool,
159
160    /// The oss bucket name
161    /// if oss is set, this is required
162    #[clap(long)]
163    oss_bucket: Option<String>,
164
165    /// The oss endpoint
166    /// if oss is set, this is required
167    #[clap(long)]
168    oss_endpoint: Option<String>,
169
170    /// The oss access key id
171    /// if oss is set, this is required
172    #[clap(long)]
173    oss_access_key_id: Option<String>,
174
175    /// The oss access key secret
176    /// if oss is set, this is required
177    #[clap(long)]
178    oss_access_key_secret: Option<String>,
179}
180
181impl ExportCommand {
182    pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
183        if self.s3
184            && (self.s3_bucket.is_none()
185                || self.s3_endpoint.is_none()
186                || self.s3_access_key.is_none()
187                || self.s3_secret_key.is_none()
188                || self.s3_region.is_none())
189        {
190            return Err(BoxedError::new(S3ConfigNotSetSnafu {}.build()));
191        }
192        if !self.s3 && !self.oss && self.output_dir.is_none() {
193            return Err(BoxedError::new(OutputDirNotSetSnafu {}.build()));
194        }
195        let (catalog, schema) =
196            database::split_database(&self.database).map_err(BoxedError::new)?;
197        let proxy = parse_proxy_opts(self.proxy.clone(), self.no_proxy)?;
198        let database_client = DatabaseClient::new(
199            self.addr.clone(),
200            catalog.clone(),
201            self.auth_basic.clone(),
202            // Treats `None` as `0s` to disable server-side default timeout.
203            self.timeout.unwrap_or_default(),
204            proxy,
205        );
206
207        Ok(Box::new(Export {
208            catalog,
209            schema,
210            database_client,
211            output_dir: self.output_dir.clone(),
212            parallelism: self.export_jobs,
213            target: self.target.clone(),
214            start_time: self.start_time.clone(),
215            end_time: self.end_time.clone(),
216            s3: self.s3,
217            ddl_local_dir: self.ddl_local_dir.clone(),
218            s3_bucket: self.s3_bucket.clone(),
219            s3_root: self.s3_root.clone(),
220            s3_endpoint: self.s3_endpoint.clone(),
221            // Wrap sensitive values in SecretString
222            s3_access_key: self
223                .s3_access_key
224                .as_ref()
225                .map(|k| SecretString::from(k.clone())),
226            s3_secret_key: self
227                .s3_secret_key
228                .as_ref()
229                .map(|k| SecretString::from(k.clone())),
230            s3_region: self.s3_region.clone(),
231            oss: self.oss,
232            oss_bucket: self.oss_bucket.clone(),
233            oss_endpoint: self.oss_endpoint.clone(),
234            // Wrap sensitive values in SecretString
235            oss_access_key_id: self
236                .oss_access_key_id
237                .as_ref()
238                .map(|k| SecretString::from(k.clone())),
239            oss_access_key_secret: self
240                .oss_access_key_secret
241                .as_ref()
242                .map(|k| SecretString::from(k.clone())),
243        }))
244    }
245}
246
247#[derive(Clone)]
248pub struct Export {
249    catalog: String,
250    schema: Option<String>,
251    database_client: DatabaseClient,
252    output_dir: Option<String>,
253    parallelism: usize,
254    target: ExportTarget,
255    start_time: Option<String>,
256    end_time: Option<String>,
257    s3: bool,
258    ddl_local_dir: Option<String>,
259    s3_bucket: Option<String>,
260    s3_root: Option<String>,
261    s3_endpoint: Option<String>,
262    // Changed to SecretString for sensitive data
263    s3_access_key: Option<SecretString>,
264    s3_secret_key: Option<SecretString>,
265    s3_region: Option<String>,
266    oss: bool,
267    oss_bucket: Option<String>,
268    oss_endpoint: Option<String>,
269    // Changed to SecretString for sensitive data
270    oss_access_key_id: Option<SecretString>,
271    oss_access_key_secret: Option<SecretString>,
272}
273
274impl Export {
275    fn catalog_path(&self) -> PathBuf {
276        if self.s3 || self.oss {
277            PathBuf::from(&self.catalog)
278        } else if let Some(dir) = &self.output_dir {
279            PathBuf::from(dir).join(&self.catalog)
280        } else {
281            unreachable!("catalog_path: output_dir must be set when not using remote storage")
282        }
283    }
284
285    async fn get_db_names(&self) -> Result<Vec<String>> {
286        let db_names = self.all_db_names().await?;
287        let Some(schema) = &self.schema else {
288            return Ok(db_names);
289        };
290
291        // Check if the schema exists
292        db_names
293            .into_iter()
294            .find(|db_name| db_name.to_lowercase() == schema.to_lowercase())
295            .map(|name| vec![name])
296            .context(SchemaNotFoundSnafu {
297                catalog: &self.catalog,
298                schema,
299            })
300    }
301
302    /// Iterate over all db names.
303    async fn all_db_names(&self) -> Result<Vec<String>> {
304        let records = self
305            .database_client
306            .sql_in_public("SHOW DATABASES")
307            .await?
308            .context(EmptyResultSnafu)?;
309        let mut result = Vec::with_capacity(records.len());
310        for value in records {
311            let Value::String(schema) = &value[0] else {
312                unreachable!()
313            };
314            if schema == common_catalog::consts::INFORMATION_SCHEMA_NAME {
315                continue;
316            }
317            if schema == common_catalog::consts::PG_CATALOG_NAME {
318                continue;
319            }
320            result.push(schema.clone());
321        }
322        Ok(result)
323    }
324
325    /// Return a list of [`TableReference`] to be exported.
326    /// Includes all tables under the given `catalog` and `schema`.
327    async fn get_table_list(
328        &self,
329        catalog: &str,
330        schema: &str,
331    ) -> Result<(
332        Vec<TableReference>,
333        Vec<TableReference>,
334        Vec<TableReference>,
335    )> {
336        // Puts all metric table first
337        let sql = format!(
338            "SELECT table_catalog, table_schema, table_name \
339            FROM information_schema.columns \
340            WHERE column_name = '__tsid' \
341                and table_catalog = \'{catalog}\' \
342                and table_schema = \'{schema}\'"
343        );
344        let records = self
345            .database_client
346            .sql_in_public(&sql)
347            .await?
348            .context(EmptyResultSnafu)?;
349        let mut metric_physical_tables = HashSet::with_capacity(records.len());
350        for value in records {
351            let mut t = Vec::with_capacity(3);
352            for v in &value {
353                let Value::String(value) = v else {
354                    unreachable!()
355                };
356                t.push(value);
357            }
358            metric_physical_tables.insert((t[0].clone(), t[1].clone(), t[2].clone()));
359        }
360
361        let sql = format!(
362            "SELECT table_catalog, table_schema, table_name, table_type \
363            FROM information_schema.tables \
364            WHERE (table_type = \'BASE TABLE\' OR table_type = \'VIEW\') \
365                and table_catalog = \'{catalog}\' \
366                and table_schema = \'{schema}\'",
367        );
368        let records = self
369            .database_client
370            .sql_in_public(&sql)
371            .await?
372            .context(EmptyResultSnafu)?;
373
374        debug!("Fetched table/view list: {:?}", records);
375
376        if records.is_empty() {
377            return Ok((vec![], vec![], vec![]));
378        }
379
380        let mut remaining_tables = Vec::with_capacity(records.len());
381        let mut views = Vec::new();
382        for value in records {
383            let mut t = Vec::with_capacity(4);
384            for v in &value {
385                let Value::String(value) = v else {
386                    unreachable!()
387                };
388                t.push(value);
389            }
390            let table = (t[0].clone(), t[1].clone(), t[2].clone());
391            let table_type = t[3].as_str();
392            // Ignores the physical table
393            if !metric_physical_tables.contains(&table) {
394                if table_type == "VIEW" {
395                    views.push(table);
396                } else {
397                    remaining_tables.push(table);
398                }
399            }
400        }
401
402        Ok((
403            metric_physical_tables.into_iter().collect(),
404            remaining_tables,
405            views,
406        ))
407    }
408
409    async fn show_create(
410        &self,
411        show_type: &str,
412        catalog: &str,
413        schema: &str,
414        table: Option<&str>,
415    ) -> Result<String> {
416        let sql = match table {
417            Some(table) => format!(
418                r#"SHOW CREATE {} "{}"."{}"."{}""#,
419                show_type, catalog, schema, table
420            ),
421            None => format!(r#"SHOW CREATE {} "{}"."{}""#, show_type, catalog, schema),
422        };
423        let records = self
424            .database_client
425            .sql_in_public(&sql)
426            .await?
427            .context(EmptyResultSnafu)?;
428        let Value::String(create) = &records[0][1] else {
429            unreachable!()
430        };
431
432        Ok(format!("{};\n", create))
433    }
434
435    async fn export_create_database(&self) -> Result<()> {
436        let timer = Instant::now();
437        let db_names = self.get_db_names().await?;
438        let db_count = db_names.len();
439        let operator = self.build_prefer_fs_operator().await?;
440
441        for schema in db_names {
442            let create_database = self
443                .show_create("DATABASE", &self.catalog, &schema, None)
444                .await?;
445
446            let file_path = self.get_file_path(&schema, "create_database.sql");
447            self.write_to_storage(&operator, &file_path, create_database.into_bytes())
448                .await?;
449
450            info!(
451                "Exported {}.{} database creation SQL to {}",
452                self.catalog,
453                schema,
454                self.format_output_path(&file_path)
455            );
456        }
457
458        let elapsed = timer.elapsed();
459        info!("Success {db_count} jobs, cost: {elapsed:?}");
460
461        Ok(())
462    }
463
464    async fn export_create_table(&self) -> Result<()> {
465        let timer = Instant::now();
466        let semaphore = Arc::new(Semaphore::new(self.parallelism));
467        let db_names = self.get_db_names().await?;
468        let db_count = db_names.len();
469        let operator = Arc::new(self.build_prefer_fs_operator().await?);
470        let mut tasks = Vec::with_capacity(db_names.len());
471
472        for schema in db_names {
473            let semaphore_moved = semaphore.clone();
474            let export_self = self.clone();
475            let operator = operator.clone();
476            tasks.push(async move {
477                let _permit = semaphore_moved.acquire().await.unwrap();
478                let (metric_physical_tables, remaining_tables, views) = export_self
479                    .get_table_list(&export_self.catalog, &schema)
480                    .await?;
481
482                // Create directory if needed for file system storage
483                if !export_self.s3 && !export_self.oss {
484                    let db_dir = format!("{}/{}/", export_self.catalog, schema);
485                    operator.create_dir(&db_dir).await.context(OpenDalSnafu)?;
486                }
487
488                let file_path = export_self.get_file_path(&schema, "create_tables.sql");
489                let mut content = Vec::new();
490
491                // Add table creation SQL
492                for (c, s, t) in metric_physical_tables.iter().chain(&remaining_tables) {
493                    let create_table = export_self.show_create("TABLE", c, s, Some(t)).await?;
494                    content.extend_from_slice(create_table.as_bytes());
495                }
496
497                // Add view creation SQL
498                for (c, s, v) in &views {
499                    let create_view = export_self.show_create("VIEW", c, s, Some(v)).await?;
500                    content.extend_from_slice(create_view.as_bytes());
501                }
502
503                // Write to storage
504                export_self
505                    .write_to_storage(&operator, &file_path, content)
506                    .await?;
507
508                info!(
509                    "Finished exporting {}.{schema} with {} table schemas to path: {}",
510                    export_self.catalog,
511                    metric_physical_tables.len() + remaining_tables.len() + views.len(),
512                    export_self.format_output_path(&file_path)
513                );
514
515                Ok::<(), Error>(())
516            });
517        }
518
519        let success = self.execute_tasks(tasks).await;
520        let elapsed = timer.elapsed();
521        info!("Success {success}/{db_count} jobs, cost: {elapsed:?}");
522
523        Ok(())
524    }
525
526    async fn build_operator(&self) -> Result<ObjectStore> {
527        if self.s3 {
528            self.build_s3_operator().await
529        } else if self.oss {
530            self.build_oss_operator().await
531        } else {
532            self.build_fs_operator().await
533        }
534    }
535
536    /// build operator with preference for file system
537    async fn build_prefer_fs_operator(&self) -> Result<ObjectStore> {
538        if (self.s3 || self.oss) && self.ddl_local_dir.is_some() {
539            let root = self.ddl_local_dir.as_ref().unwrap().clone();
540            let op = ObjectStore::new(services::Fs::default().root(&root))
541                .context(OpenDalSnafu)?
542                .layer(LoggingLayer::default())
543                .finish();
544            Ok(op)
545        } else if self.s3 {
546            self.build_s3_operator().await
547        } else if self.oss {
548            self.build_oss_operator().await
549        } else {
550            self.build_fs_operator().await
551        }
552    }
553
554    async fn build_s3_operator(&self) -> Result<ObjectStore> {
555        let mut builder = services::S3::default().bucket(
556            self.s3_bucket
557                .as_ref()
558                .expect("s3_bucket must be provided when s3 is enabled"),
559        );
560
561        if let Some(root) = self.s3_root.as_ref() {
562            builder = builder.root(root);
563        }
564
565        if let Some(endpoint) = self.s3_endpoint.as_ref() {
566            builder = builder.endpoint(endpoint);
567        }
568
569        if let Some(region) = self.s3_region.as_ref() {
570            builder = builder.region(region);
571        }
572
573        if let Some(key_id) = self.s3_access_key.as_ref() {
574            builder = builder.access_key_id(key_id.expose_secret());
575        }
576
577        if let Some(secret_key) = self.s3_secret_key.as_ref() {
578            builder = builder.secret_access_key(secret_key.expose_secret());
579        }
580
581        let op = ObjectStore::new(builder)
582            .context(OpenDalSnafu)?
583            .layer(LoggingLayer::default())
584            .finish();
585        Ok(op)
586    }
587
588    async fn build_oss_operator(&self) -> Result<ObjectStore> {
589        let mut builder = Oss::default()
590            .bucket(self.oss_bucket.as_ref().expect("oss_bucket must be set"))
591            .endpoint(
592                self.oss_endpoint
593                    .as_ref()
594                    .expect("oss_endpoint must be set"),
595            );
596
597        // Use expose_secret() to access the actual secret value
598        if let Some(key_id) = self.oss_access_key_id.as_ref() {
599            builder = builder.access_key_id(key_id.expose_secret());
600        }
601        if let Some(secret_key) = self.oss_access_key_secret.as_ref() {
602            builder = builder.access_key_secret(secret_key.expose_secret());
603        }
604
605        let op = ObjectStore::new(builder)
606            .context(OpenDalSnafu)?
607            .layer(LoggingLayer::default())
608            .finish();
609        Ok(op)
610    }
611
612    async fn build_fs_operator(&self) -> Result<ObjectStore> {
613        let root = self
614            .output_dir
615            .as_ref()
616            .context(OutputDirNotSetSnafu)?
617            .clone();
618        let op = ObjectStore::new(services::Fs::default().root(&root))
619            .context(OpenDalSnafu)?
620            .layer(LoggingLayer::default())
621            .finish();
622        Ok(op)
623    }
624
625    async fn export_database_data(&self) -> Result<()> {
626        let timer = Instant::now();
627        let semaphore = Arc::new(Semaphore::new(self.parallelism));
628        let db_names = self.get_db_names().await?;
629        let db_count = db_names.len();
630        let mut tasks = Vec::with_capacity(db_count);
631        let operator = Arc::new(self.build_operator().await?);
632        let fs_first_operator = Arc::new(self.build_prefer_fs_operator().await?);
633        let with_options = build_with_options(&self.start_time, &self.end_time);
634
635        for schema in db_names {
636            let semaphore_moved = semaphore.clone();
637            let export_self = self.clone();
638            let with_options_clone = with_options.clone();
639            let operator = operator.clone();
640            let fs_first_operator = fs_first_operator.clone();
641
642            tasks.push(async move {
643                let _permit = semaphore_moved.acquire().await.unwrap();
644
645                // Create directory if not using remote storage
646                if !export_self.s3 && !export_self.oss {
647                    let db_dir = format!("{}/{}/", export_self.catalog, schema);
648                    operator.create_dir(&db_dir).await.context(OpenDalSnafu)?;
649                }
650
651                let (path, connection_part) = export_self.get_storage_params(&schema);
652
653                // Execute COPY DATABASE TO command
654                let sql = format!(
655                    r#"COPY DATABASE "{}"."{}" TO '{}' WITH ({}){};"#,
656                    export_self.catalog, schema, path, with_options_clone, connection_part
657                );
658
659                // Log SQL command but mask sensitive information
660                let safe_sql = export_self.mask_sensitive_sql(&sql);
661                info!("Executing sql: {}", safe_sql);
662
663                export_self.database_client.sql_in_public(&sql).await?;
664                info!(
665                    "Finished exporting {}.{} data to {}",
666                    export_self.catalog, schema, path
667                );
668
669                // Create copy_from.sql file
670                let copy_database_from_sql = format!(
671                    r#"COPY DATABASE "{}"."{}" FROM '{}' WITH ({}){};"#,
672                    export_self.catalog, schema, path, with_options_clone, connection_part
673                );
674
675                let copy_from_path = export_self.get_file_path(&schema, "copy_from.sql");
676                export_self
677                    .write_to_storage(
678                        &fs_first_operator,
679                        &copy_from_path,
680                        copy_database_from_sql.into_bytes(),
681                    )
682                    .await?;
683
684                info!(
685                    "Finished exporting {}.{} copy_from.sql to {}",
686                    export_self.catalog,
687                    schema,
688                    export_self.format_output_path(&copy_from_path)
689                );
690
691                Ok::<(), Error>(())
692            });
693        }
694
695        let success = self.execute_tasks(tasks).await;
696        let elapsed = timer.elapsed();
697        info!("Success {success}/{db_count} jobs, costs: {elapsed:?}");
698
699        Ok(())
700    }
701
702    /// Mask sensitive information in SQL commands for safe logging
703    fn mask_sensitive_sql(&self, sql: &str) -> String {
704        let mut masked_sql = sql.to_string();
705
706        // Mask S3 credentials
707        if let Some(access_key) = &self.s3_access_key {
708            masked_sql = masked_sql.replace(access_key.expose_secret(), "[REDACTED]");
709        }
710        if let Some(secret_key) = &self.s3_secret_key {
711            masked_sql = masked_sql.replace(secret_key.expose_secret(), "[REDACTED]");
712        }
713
714        // Mask OSS credentials
715        if let Some(access_key_id) = &self.oss_access_key_id {
716            masked_sql = masked_sql.replace(access_key_id.expose_secret(), "[REDACTED]");
717        }
718        if let Some(access_key_secret) = &self.oss_access_key_secret {
719            masked_sql = masked_sql.replace(access_key_secret.expose_secret(), "[REDACTED]");
720        }
721
722        masked_sql
723    }
724
725    fn get_file_path(&self, schema: &str, file_name: &str) -> String {
726        format!("{}/{}/{}", self.catalog, schema, file_name)
727    }
728
729    fn format_output_path(&self, file_path: &str) -> String {
730        if self.s3 {
731            format!(
732                "s3://{}{}/{}",
733                self.s3_bucket.as_ref().unwrap_or(&String::new()),
734                if let Some(root) = &self.s3_root {
735                    format!("/{}", root)
736                } else {
737                    String::new()
738                },
739                file_path
740            )
741        } else if self.oss {
742            format!(
743                "oss://{}/{}/{}",
744                self.oss_bucket.as_ref().unwrap_or(&String::new()),
745                self.catalog,
746                file_path
747            )
748        } else {
749            format!(
750                "{}/{}",
751                self.output_dir.as_ref().unwrap_or(&String::new()),
752                file_path
753            )
754        }
755    }
756
757    async fn write_to_storage(
758        &self,
759        op: &ObjectStore,
760        file_path: &str,
761        content: Vec<u8>,
762    ) -> Result<()> {
763        op.write(file_path, content)
764            .await
765            .context(OpenDalSnafu)
766            .map(|_| ())
767    }
768
769    fn get_storage_params(&self, schema: &str) -> (String, String) {
770        if self.s3 {
771            let s3_path = format!(
772                "s3://{}{}/{}/{}/",
773                // Safety: s3_bucket is required when s3 is enabled
774                self.s3_bucket.as_ref().unwrap(),
775                if let Some(root) = &self.s3_root {
776                    format!("/{}", root)
777                } else {
778                    String::new()
779                },
780                self.catalog,
781                schema
782            );
783
784            // endpoint is optional
785            let endpoint_option = if let Some(endpoint) = self.s3_endpoint.as_ref() {
786                format!(", ENDPOINT='{}'", endpoint)
787            } else {
788                String::new()
789            };
790
791            // Safety: All s3 options are required
792            // Use expose_secret() to access the actual secret values
793            let connection_options = format!(
794                "ACCESS_KEY_ID='{}', SECRET_ACCESS_KEY='{}', REGION='{}'{}",
795                self.s3_access_key.as_ref().unwrap().expose_secret(),
796                self.s3_secret_key.as_ref().unwrap().expose_secret(),
797                self.s3_region.as_ref().unwrap(),
798                endpoint_option
799            );
800
801            (s3_path, format!(" CONNECTION ({})", connection_options))
802        } else if self.oss {
803            let oss_path = format!(
804                "oss://{}/{}/{}/",
805                self.oss_bucket.as_ref().unwrap(),
806                self.catalog,
807                schema
808            );
809            let endpoint_option = if let Some(endpoint) = self.oss_endpoint.as_ref() {
810                format!(", ENDPOINT='{}'", endpoint)
811            } else {
812                String::new()
813            };
814
815            let connection_options = format!(
816                "ACCESS_KEY_ID='{}', ACCESS_KEY_SECRET='{}'{}",
817                self.oss_access_key_id.as_ref().unwrap().expose_secret(),
818                self.oss_access_key_secret.as_ref().unwrap().expose_secret(),
819                endpoint_option
820            );
821            (oss_path, format!(" CONNECTION ({})", connection_options))
822        } else {
823            (
824                self.catalog_path()
825                    .join(format!("{schema}/"))
826                    .to_string_lossy()
827                    .to_string(),
828                String::new(),
829            )
830        }
831    }
832
833    async fn execute_tasks(
834        &self,
835        tasks: Vec<impl std::future::Future<Output = Result<()>>>,
836    ) -> usize {
837        futures::future::join_all(tasks)
838            .await
839            .into_iter()
840            .filter(|r| match r {
841                Ok(_) => true,
842                Err(e) => {
843                    error!(e; "export job failed");
844                    false
845                }
846            })
847            .count()
848    }
849}
850
851#[async_trait]
852impl Tool for Export {
853    async fn do_work(&self) -> std::result::Result<(), BoxedError> {
854        match self.target {
855            ExportTarget::Schema => {
856                self.export_create_database()
857                    .await
858                    .map_err(BoxedError::new)?;
859                self.export_create_table().await.map_err(BoxedError::new)
860            }
861            ExportTarget::Data => self.export_database_data().await.map_err(BoxedError::new),
862            ExportTarget::All => {
863                self.export_create_database()
864                    .await
865                    .map_err(BoxedError::new)?;
866                self.export_create_table().await.map_err(BoxedError::new)?;
867                self.export_database_data().await.map_err(BoxedError::new)
868            }
869        }
870    }
871}
872
873/// Builds the WITH options string for SQL commands, assuming consistent syntax across S3 and local exports.
874fn build_with_options(start_time: &Option<String>, end_time: &Option<String>) -> String {
875    let mut options = vec!["format = 'parquet'".to_string()];
876    if let Some(start) = start_time {
877        options.push(format!("start_time = '{}'", start));
878    }
879    if let Some(end) = end_time {
880        options.push(format!("end_time = '{}'", end));
881    }
882    options.join(", ")
883}