1use std::collections::HashSet;
16use std::path::PathBuf;
17use std::sync::Arc;
18use std::time::Duration;
19
20use async_trait::async_trait;
21use clap::{Parser, ValueEnum};
22use common_base::secrets::{ExposeSecret, SecretString};
23use common_error::ext::BoxedError;
24use common_telemetry::{debug, error, info};
25use object_store::layers::LoggingLayer;
26use object_store::services::Oss;
27use object_store::{ObjectStore, services};
28use serde_json::Value;
29use snafu::{OptionExt, ResultExt};
30use tokio::sync::Semaphore;
31use tokio::time::Instant;
32
33use crate::data::{COPY_PATH_PLACEHOLDER, default_database};
34use crate::database::{DatabaseClient, parse_proxy_opts};
35use crate::error::{
36 EmptyResultSnafu, Error, OpenDalSnafu, OutputDirNotSetSnafu, Result, S3ConfigNotSetSnafu,
37 SchemaNotFoundSnafu,
38};
39use crate::{Tool, database};
40
41type TableReference = (String, String, String);
42
43#[derive(Debug, Default, Clone, ValueEnum)]
44enum ExportTarget {
45 Schema,
47 Data,
49 #[default]
51 All,
52}
53
54#[derive(Debug, Default, Parser)]
56pub struct ExportCommand {
57 #[clap(long)]
59 addr: String,
60
61 #[clap(long)]
64 output_dir: Option<String>,
65
66 #[clap(long, default_value_t = default_database())]
68 database: String,
69
70 #[clap(long, short = 'j', default_value = "1", alias = "export-jobs")]
74 db_parallelism: usize,
75
76 #[clap(long, default_value = "4")]
80 table_parallelism: usize,
81
82 #[clap(long, default_value = "3")]
84 max_retry: usize,
85
86 #[clap(long, short = 't', value_enum, default_value = "all")]
88 target: ExportTarget,
89
90 #[clap(long)]
93 start_time: Option<String>,
94
95 #[clap(long)]
98 end_time: Option<String>,
99
100 #[clap(long)]
102 auth_basic: Option<String>,
103
104 #[clap(long, value_parser = humantime::parse_duration)]
109 timeout: Option<Duration>,
110
111 #[clap(long)]
115 proxy: Option<String>,
116
117 #[clap(long)]
119 no_proxy: bool,
120
121 #[clap(long)]
123 s3: bool,
124
125 #[clap(long)]
133 ddl_local_dir: Option<String>,
134
135 #[clap(long)]
138 s3_bucket: Option<String>,
139
140 #[clap(long)]
143 s3_root: Option<String>,
144
145 #[clap(long)]
148 s3_endpoint: Option<String>,
149
150 #[clap(long)]
153 s3_access_key: Option<String>,
154
155 #[clap(long)]
158 s3_secret_key: Option<String>,
159
160 #[clap(long)]
163 s3_region: Option<String>,
164
165 #[clap(long)]
167 oss: bool,
168
169 #[clap(long)]
172 oss_bucket: Option<String>,
173
174 #[clap(long)]
177 oss_endpoint: Option<String>,
178
179 #[clap(long)]
182 oss_access_key_id: Option<String>,
183
184 #[clap(long)]
187 oss_access_key_secret: Option<String>,
188}
189
190impl ExportCommand {
191 pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
192 if self.s3
193 && (self.s3_bucket.is_none()
194 || self.s3_endpoint.is_none()
195 || self.s3_access_key.is_none()
196 || self.s3_secret_key.is_none()
197 || self.s3_region.is_none())
198 {
199 return Err(BoxedError::new(S3ConfigNotSetSnafu {}.build()));
200 }
201 if !self.s3 && !self.oss && self.output_dir.is_none() {
202 return Err(BoxedError::new(OutputDirNotSetSnafu {}.build()));
203 }
204 let (catalog, schema) =
205 database::split_database(&self.database).map_err(BoxedError::new)?;
206 let proxy = parse_proxy_opts(self.proxy.clone(), self.no_proxy)?;
207 let database_client = DatabaseClient::new(
208 self.addr.clone(),
209 catalog.clone(),
210 self.auth_basic.clone(),
211 self.timeout.unwrap_or_default(),
213 proxy,
214 );
215
216 Ok(Box::new(Export {
217 catalog,
218 schema,
219 database_client,
220 output_dir: self.output_dir.clone(),
221 export_jobs: self.db_parallelism,
222 target: self.target.clone(),
223 start_time: self.start_time.clone(),
224 end_time: self.end_time.clone(),
225 parallelism: self.table_parallelism,
226 s3: self.s3,
227 ddl_local_dir: self.ddl_local_dir.clone(),
228 s3_bucket: self.s3_bucket.clone(),
229 s3_root: self.s3_root.clone(),
230 s3_endpoint: self.s3_endpoint.clone(),
231 s3_access_key: self
233 .s3_access_key
234 .as_ref()
235 .map(|k| SecretString::from(k.clone())),
236 s3_secret_key: self
237 .s3_secret_key
238 .as_ref()
239 .map(|k| SecretString::from(k.clone())),
240 s3_region: self.s3_region.clone(),
241 oss: self.oss,
242 oss_bucket: self.oss_bucket.clone(),
243 oss_endpoint: self.oss_endpoint.clone(),
244 oss_access_key_id: self
246 .oss_access_key_id
247 .as_ref()
248 .map(|k| SecretString::from(k.clone())),
249 oss_access_key_secret: self
250 .oss_access_key_secret
251 .as_ref()
252 .map(|k| SecretString::from(k.clone())),
253 }))
254 }
255}
256
257#[derive(Clone)]
258pub struct Export {
259 catalog: String,
260 schema: Option<String>,
261 database_client: DatabaseClient,
262 output_dir: Option<String>,
263 export_jobs: usize,
264 target: ExportTarget,
265 start_time: Option<String>,
266 end_time: Option<String>,
267 parallelism: usize,
268 s3: bool,
269 ddl_local_dir: Option<String>,
270 s3_bucket: Option<String>,
271 s3_root: Option<String>,
272 s3_endpoint: Option<String>,
273 s3_access_key: Option<SecretString>,
275 s3_secret_key: Option<SecretString>,
276 s3_region: Option<String>,
277 oss: bool,
278 oss_bucket: Option<String>,
279 oss_endpoint: Option<String>,
280 oss_access_key_id: Option<SecretString>,
282 oss_access_key_secret: Option<SecretString>,
283}
284
285impl Export {
286 fn catalog_path(&self) -> PathBuf {
287 if self.s3 || self.oss {
288 PathBuf::from(&self.catalog)
289 } else if let Some(dir) = &self.output_dir {
290 PathBuf::from(dir).join(&self.catalog)
291 } else {
292 unreachable!("catalog_path: output_dir must be set when not using remote storage")
293 }
294 }
295
296 async fn get_db_names(&self) -> Result<Vec<String>> {
297 let db_names = self.all_db_names().await?;
298 let Some(schema) = &self.schema else {
299 return Ok(db_names);
300 };
301
302 db_names
304 .into_iter()
305 .find(|db_name| db_name.to_lowercase() == schema.to_lowercase())
306 .map(|name| vec![name])
307 .context(SchemaNotFoundSnafu {
308 catalog: &self.catalog,
309 schema,
310 })
311 }
312
313 async fn all_db_names(&self) -> Result<Vec<String>> {
315 let records = self
316 .database_client
317 .sql_in_public("SHOW DATABASES")
318 .await?
319 .context(EmptyResultSnafu)?;
320 let mut result = Vec::with_capacity(records.len());
321 for value in records {
322 let Value::String(schema) = &value[0] else {
323 unreachable!()
324 };
325 if schema == common_catalog::consts::INFORMATION_SCHEMA_NAME {
326 continue;
327 }
328 if schema == common_catalog::consts::PG_CATALOG_NAME {
329 continue;
330 }
331 result.push(schema.clone());
332 }
333 Ok(result)
334 }
335
336 async fn get_table_list(
339 &self,
340 catalog: &str,
341 schema: &str,
342 ) -> Result<(
343 Vec<TableReference>,
344 Vec<TableReference>,
345 Vec<TableReference>,
346 )> {
347 let sql = format!(
349 "SELECT table_catalog, table_schema, table_name \
350 FROM information_schema.columns \
351 WHERE column_name = '__tsid' \
352 and table_catalog = \'{catalog}\' \
353 and table_schema = \'{schema}\'"
354 );
355 let records = self
356 .database_client
357 .sql_in_public(&sql)
358 .await?
359 .context(EmptyResultSnafu)?;
360 let mut metric_physical_tables = HashSet::with_capacity(records.len());
361 for value in records {
362 let mut t = Vec::with_capacity(3);
363 for v in &value {
364 let Value::String(value) = v else {
365 unreachable!()
366 };
367 t.push(value);
368 }
369 metric_physical_tables.insert((t[0].clone(), t[1].clone(), t[2].clone()));
370 }
371
372 let sql = format!(
373 "SELECT table_catalog, table_schema, table_name, table_type \
374 FROM information_schema.tables \
375 WHERE (table_type = \'BASE TABLE\' OR table_type = \'VIEW\') \
376 and table_catalog = \'{catalog}\' \
377 and table_schema = \'{schema}\'",
378 );
379 let records = self
380 .database_client
381 .sql_in_public(&sql)
382 .await?
383 .context(EmptyResultSnafu)?;
384
385 debug!("Fetched table/view list: {:?}", records);
386
387 if records.is_empty() {
388 return Ok((vec![], vec![], vec![]));
389 }
390
391 let mut remaining_tables = Vec::with_capacity(records.len());
392 let mut views = Vec::new();
393 for value in records {
394 let mut t = Vec::with_capacity(4);
395 for v in &value {
396 let Value::String(value) = v else {
397 unreachable!()
398 };
399 t.push(value);
400 }
401 let table = (t[0].clone(), t[1].clone(), t[2].clone());
402 let table_type = t[3].as_str();
403 if !metric_physical_tables.contains(&table) {
405 if table_type == "VIEW" {
406 views.push(table);
407 } else {
408 remaining_tables.push(table);
409 }
410 }
411 }
412
413 Ok((
414 metric_physical_tables.into_iter().collect(),
415 remaining_tables,
416 views,
417 ))
418 }
419
420 async fn show_create(
421 &self,
422 show_type: &str,
423 catalog: &str,
424 schema: &str,
425 table: Option<&str>,
426 ) -> Result<String> {
427 let sql = match table {
428 Some(table) => format!(
429 r#"SHOW CREATE {} "{}"."{}"."{}""#,
430 show_type, catalog, schema, table
431 ),
432 None => format!(r#"SHOW CREATE {} "{}"."{}""#, show_type, catalog, schema),
433 };
434 let records = self
435 .database_client
436 .sql_in_public(&sql)
437 .await?
438 .context(EmptyResultSnafu)?;
439 let Value::String(create) = &records[0][1] else {
440 unreachable!()
441 };
442
443 Ok(format!("{};\n", create))
444 }
445
446 async fn export_create_database(&self) -> Result<()> {
447 let timer = Instant::now();
448 let db_names = self.get_db_names().await?;
449 let db_count = db_names.len();
450 let operator = self.build_prefer_fs_operator().await?;
451
452 for schema in db_names {
453 let create_database = self
454 .show_create("DATABASE", &self.catalog, &schema, None)
455 .await?;
456
457 let file_path = self.get_file_path(&schema, "create_database.sql");
458 self.write_to_storage(&operator, &file_path, create_database.into_bytes())
459 .await?;
460
461 info!(
462 "Exported {}.{} database creation SQL to {}",
463 self.catalog,
464 schema,
465 self.format_output_path(&file_path)
466 );
467 }
468
469 let elapsed = timer.elapsed();
470 info!("Success {db_count} jobs, cost: {elapsed:?}");
471
472 Ok(())
473 }
474
475 async fn export_create_table(&self) -> Result<()> {
476 let timer = Instant::now();
477 let semaphore = Arc::new(Semaphore::new(self.export_jobs));
478 let db_names = self.get_db_names().await?;
479 let db_count = db_names.len();
480 let operator = Arc::new(self.build_prefer_fs_operator().await?);
481 let mut tasks = Vec::with_capacity(db_names.len());
482
483 for schema in db_names {
484 let semaphore_moved = semaphore.clone();
485 let export_self = self.clone();
486 let operator = operator.clone();
487 tasks.push(async move {
488 let _permit = semaphore_moved.acquire().await.unwrap();
489 let (metric_physical_tables, remaining_tables, views) = export_self
490 .get_table_list(&export_self.catalog, &schema)
491 .await?;
492
493 if !export_self.s3 && !export_self.oss {
495 let db_dir = format!("{}/{}/", export_self.catalog, schema);
496 operator.create_dir(&db_dir).await.context(OpenDalSnafu)?;
497 }
498
499 let file_path = export_self.get_file_path(&schema, "create_tables.sql");
500 let mut content = Vec::new();
501
502 for (c, s, t) in metric_physical_tables.iter().chain(&remaining_tables) {
504 let create_table = export_self.show_create("TABLE", c, s, Some(t)).await?;
505 content.extend_from_slice(create_table.as_bytes());
506 }
507
508 for (c, s, v) in &views {
510 let create_view = export_self.show_create("VIEW", c, s, Some(v)).await?;
511 content.extend_from_slice(create_view.as_bytes());
512 }
513
514 export_self
516 .write_to_storage(&operator, &file_path, content)
517 .await?;
518
519 info!(
520 "Finished exporting {}.{schema} with {} table schemas to path: {}",
521 export_self.catalog,
522 metric_physical_tables.len() + remaining_tables.len() + views.len(),
523 export_self.format_output_path(&file_path)
524 );
525
526 Ok::<(), Error>(())
527 });
528 }
529
530 let success = self.execute_tasks(tasks).await;
531 let elapsed = timer.elapsed();
532 info!("Success {success}/{db_count} jobs, cost: {elapsed:?}");
533
534 Ok(())
535 }
536
537 async fn build_operator(&self) -> Result<ObjectStore> {
538 if self.s3 {
539 self.build_s3_operator().await
540 } else if self.oss {
541 self.build_oss_operator().await
542 } else {
543 self.build_fs_operator().await
544 }
545 }
546
547 async fn build_prefer_fs_operator(&self) -> Result<ObjectStore> {
549 if (self.s3 || self.oss) && self.ddl_local_dir.is_some() {
550 let root = self.ddl_local_dir.as_ref().unwrap().clone();
551 let op = ObjectStore::new(services::Fs::default().root(&root))
552 .context(OpenDalSnafu)?
553 .layer(LoggingLayer::default())
554 .finish();
555 Ok(op)
556 } else if self.s3 {
557 self.build_s3_operator().await
558 } else if self.oss {
559 self.build_oss_operator().await
560 } else {
561 self.build_fs_operator().await
562 }
563 }
564
565 async fn build_s3_operator(&self) -> Result<ObjectStore> {
566 let mut builder = services::S3::default().bucket(
567 self.s3_bucket
568 .as_ref()
569 .expect("s3_bucket must be provided when s3 is enabled"),
570 );
571
572 if let Some(root) = self.s3_root.as_ref() {
573 builder = builder.root(root);
574 }
575
576 if let Some(endpoint) = self.s3_endpoint.as_ref() {
577 builder = builder.endpoint(endpoint);
578 }
579
580 if let Some(region) = self.s3_region.as_ref() {
581 builder = builder.region(region);
582 }
583
584 if let Some(key_id) = self.s3_access_key.as_ref() {
585 builder = builder.access_key_id(key_id.expose_secret());
586 }
587
588 if let Some(secret_key) = self.s3_secret_key.as_ref() {
589 builder = builder.secret_access_key(secret_key.expose_secret());
590 }
591
592 let op = ObjectStore::new(builder)
593 .context(OpenDalSnafu)?
594 .layer(LoggingLayer::default())
595 .finish();
596 Ok(op)
597 }
598
599 async fn build_oss_operator(&self) -> Result<ObjectStore> {
600 let mut builder = Oss::default()
601 .bucket(self.oss_bucket.as_ref().expect("oss_bucket must be set"))
602 .endpoint(
603 self.oss_endpoint
604 .as_ref()
605 .expect("oss_endpoint must be set"),
606 );
607
608 if let Some(key_id) = self.oss_access_key_id.as_ref() {
610 builder = builder.access_key_id(key_id.expose_secret());
611 }
612 if let Some(secret_key) = self.oss_access_key_secret.as_ref() {
613 builder = builder.access_key_secret(secret_key.expose_secret());
614 }
615
616 let op = ObjectStore::new(builder)
617 .context(OpenDalSnafu)?
618 .layer(LoggingLayer::default())
619 .finish();
620 Ok(op)
621 }
622
623 async fn build_fs_operator(&self) -> Result<ObjectStore> {
624 let root = self
625 .output_dir
626 .as_ref()
627 .context(OutputDirNotSetSnafu)?
628 .clone();
629 let op = ObjectStore::new(services::Fs::default().root(&root))
630 .context(OpenDalSnafu)?
631 .layer(LoggingLayer::default())
632 .finish();
633 Ok(op)
634 }
635
636 async fn export_database_data(&self) -> Result<()> {
637 let timer = Instant::now();
638 let semaphore = Arc::new(Semaphore::new(self.export_jobs));
639 let db_names = self.get_db_names().await?;
640 let db_count = db_names.len();
641 let mut tasks = Vec::with_capacity(db_count);
642 let operator = Arc::new(self.build_operator().await?);
643 let fs_first_operator = Arc::new(self.build_prefer_fs_operator().await?);
644 let with_options = build_with_options(&self.start_time, &self.end_time, self.parallelism);
645
646 for schema in db_names {
647 let semaphore_moved = semaphore.clone();
648 let export_self = self.clone();
649 let with_options_clone = with_options.clone();
650 let operator = operator.clone();
651 let fs_first_operator = fs_first_operator.clone();
652
653 tasks.push(async move {
654 let _permit = semaphore_moved.acquire().await.unwrap();
655
656 if !export_self.s3 && !export_self.oss {
658 let db_dir = format!("{}/{}/", export_self.catalog, schema);
659 operator.create_dir(&db_dir).await.context(OpenDalSnafu)?;
660 }
661
662 let (path, connection_part) = export_self.get_storage_params(&schema);
663
664 let sql = format!(
666 r#"COPY DATABASE "{}"."{}" TO '{}' WITH ({}){};"#,
667 export_self.catalog, schema, path, with_options_clone, connection_part
668 );
669
670 let safe_sql = export_self.mask_sensitive_sql(&sql);
672 info!("Executing sql: {}", safe_sql);
673
674 export_self.database_client.sql_in_public(&sql).await?;
675 info!(
676 "Finished exporting {}.{} data to {}",
677 export_self.catalog, schema, path
678 );
679
680 let copy_database_from_sql = {
682 let command_without_connection = format!(
683 r#"COPY DATABASE "{}"."{}" FROM '{}' WITH ({});"#,
684 export_self.catalog, schema, COPY_PATH_PLACEHOLDER, with_options_clone
685 );
686
687 if connection_part.is_empty() {
688 command_without_connection
689 } else {
690 let command_with_connection = format!(
691 r#"COPY DATABASE "{}"."{}" FROM '{}' WITH ({}){};"#,
692 export_self.catalog, schema, path, with_options_clone, connection_part
693 );
694
695 format!(
696 "-- {}\n{}",
697 command_with_connection, command_without_connection
698 )
699 }
700 };
701
702 let copy_from_path = export_self.get_file_path(&schema, "copy_from.sql");
703 export_self
704 .write_to_storage(
705 &fs_first_operator,
706 ©_from_path,
707 copy_database_from_sql.into_bytes(),
708 )
709 .await?;
710
711 info!(
712 "Finished exporting {}.{} copy_from.sql to {}",
713 export_self.catalog,
714 schema,
715 export_self.format_output_path(©_from_path)
716 );
717
718 Ok::<(), Error>(())
719 });
720 }
721
722 let success = self.execute_tasks(tasks).await;
723 let elapsed = timer.elapsed();
724 info!("Success {success}/{db_count} jobs, costs: {elapsed:?}");
725
726 Ok(())
727 }
728
729 fn mask_sensitive_sql(&self, sql: &str) -> String {
731 let mut masked_sql = sql.to_string();
732
733 if let Some(access_key) = &self.s3_access_key {
735 masked_sql = masked_sql.replace(access_key.expose_secret(), "[REDACTED]");
736 }
737 if let Some(secret_key) = &self.s3_secret_key {
738 masked_sql = masked_sql.replace(secret_key.expose_secret(), "[REDACTED]");
739 }
740
741 if let Some(access_key_id) = &self.oss_access_key_id {
743 masked_sql = masked_sql.replace(access_key_id.expose_secret(), "[REDACTED]");
744 }
745 if let Some(access_key_secret) = &self.oss_access_key_secret {
746 masked_sql = masked_sql.replace(access_key_secret.expose_secret(), "[REDACTED]");
747 }
748
749 masked_sql
750 }
751
752 fn get_file_path(&self, schema: &str, file_name: &str) -> String {
753 format!("{}/{}/{}", self.catalog, schema, file_name)
754 }
755
756 fn format_output_path(&self, file_path: &str) -> String {
757 if self.s3 {
758 format!(
759 "s3://{}{}/{}",
760 self.s3_bucket.as_ref().unwrap_or(&String::new()),
761 if let Some(root) = &self.s3_root {
762 format!("/{}", root)
763 } else {
764 String::new()
765 },
766 file_path
767 )
768 } else if self.oss {
769 format!(
770 "oss://{}/{}/{}",
771 self.oss_bucket.as_ref().unwrap_or(&String::new()),
772 self.catalog,
773 file_path
774 )
775 } else {
776 format!(
777 "{}/{}",
778 self.output_dir.as_ref().unwrap_or(&String::new()),
779 file_path
780 )
781 }
782 }
783
784 async fn write_to_storage(
785 &self,
786 op: &ObjectStore,
787 file_path: &str,
788 content: Vec<u8>,
789 ) -> Result<()> {
790 op.write(file_path, content)
791 .await
792 .context(OpenDalSnafu)
793 .map(|_| ())
794 }
795
796 fn get_storage_params(&self, schema: &str) -> (String, String) {
797 if self.s3 {
798 let s3_path = format!(
799 "s3://{}{}/{}/{}/",
800 self.s3_bucket.as_ref().unwrap(),
802 if let Some(root) = &self.s3_root {
803 format!("/{}", root)
804 } else {
805 String::new()
806 },
807 self.catalog,
808 schema
809 );
810
811 let endpoint_option = if let Some(endpoint) = self.s3_endpoint.as_ref() {
813 format!(", ENDPOINT='{}'", endpoint)
814 } else {
815 String::new()
816 };
817
818 let connection_options = format!(
821 "ACCESS_KEY_ID='{}', SECRET_ACCESS_KEY='{}', REGION='{}'{}",
822 self.s3_access_key.as_ref().unwrap().expose_secret(),
823 self.s3_secret_key.as_ref().unwrap().expose_secret(),
824 self.s3_region.as_ref().unwrap(),
825 endpoint_option
826 );
827
828 (s3_path, format!(" CONNECTION ({})", connection_options))
829 } else if self.oss {
830 let oss_path = format!(
831 "oss://{}/{}/{}/",
832 self.oss_bucket.as_ref().unwrap(),
833 self.catalog,
834 schema
835 );
836 let endpoint_option = if let Some(endpoint) = self.oss_endpoint.as_ref() {
837 format!(", ENDPOINT='{}'", endpoint)
838 } else {
839 String::new()
840 };
841
842 let connection_options = format!(
843 "ACCESS_KEY_ID='{}', ACCESS_KEY_SECRET='{}'{}",
844 self.oss_access_key_id.as_ref().unwrap().expose_secret(),
845 self.oss_access_key_secret.as_ref().unwrap().expose_secret(),
846 endpoint_option
847 );
848 (oss_path, format!(" CONNECTION ({})", connection_options))
849 } else {
850 (
851 self.catalog_path()
852 .join(format!("{schema}/"))
853 .to_string_lossy()
854 .to_string(),
855 String::new(),
856 )
857 }
858 }
859
860 async fn execute_tasks(
861 &self,
862 tasks: Vec<impl std::future::Future<Output = Result<()>>>,
863 ) -> usize {
864 futures::future::join_all(tasks)
865 .await
866 .into_iter()
867 .filter(|r| match r {
868 Ok(_) => true,
869 Err(e) => {
870 error!(e; "export job failed");
871 false
872 }
873 })
874 .count()
875 }
876}
877
878#[async_trait]
879impl Tool for Export {
880 async fn do_work(&self) -> std::result::Result<(), BoxedError> {
881 match self.target {
882 ExportTarget::Schema => {
883 self.export_create_database()
884 .await
885 .map_err(BoxedError::new)?;
886 self.export_create_table().await.map_err(BoxedError::new)
887 }
888 ExportTarget::Data => self.export_database_data().await.map_err(BoxedError::new),
889 ExportTarget::All => {
890 self.export_create_database()
891 .await
892 .map_err(BoxedError::new)?;
893 self.export_create_table().await.map_err(BoxedError::new)?;
894 self.export_database_data().await.map_err(BoxedError::new)
895 }
896 }
897 }
898}
899
900fn build_with_options(
902 start_time: &Option<String>,
903 end_time: &Option<String>,
904 parallelism: usize,
905) -> String {
906 let mut options = vec!["format = 'parquet'".to_string()];
907 if let Some(start) = start_time {
908 options.push(format!("start_time = '{}'", start));
909 }
910 if let Some(end) = end_time {
911 options.push(format!("end_time = '{}'", end));
912 }
913 options.push(format!("parallelism = {}", parallelism));
914 options.join(", ")
915}