1use std::collections::HashSet;
16use std::sync::Arc;
17use std::time::Duration;
18
19use async_trait::async_trait;
20use clap::{Parser, ValueEnum};
21use common_error::ext::BoxedError;
22use common_telemetry::{debug, error, info};
23use object_store::ObjectStore;
24use serde_json::Value;
25use snafu::{OptionExt, ResultExt};
26use tokio::sync::Semaphore;
27use tokio::time::Instant;
28
29use crate::common::{ObjectStoreConfig, new_fs_object_store};
30use crate::data::storage_export::{
31 AzblobBackend, FsBackend, GcsBackend, OssBackend, S3Backend, StorageExport, StorageType,
32};
33use crate::data::{COPY_PATH_PLACEHOLDER, default_database};
34use crate::database::{DatabaseClient, parse_proxy_opts};
35use crate::error::{
36 EmptyResultSnafu, Error, OpenDalSnafu, OutputDirNotSetSnafu, Result, SchemaNotFoundSnafu,
37};
38use crate::{Tool, database};
39
40type TableReference = (String, String, String);
41
42#[derive(Debug, Default, Clone, ValueEnum)]
43enum ExportTarget {
44 Schema,
46 Data,
48 #[default]
50 All,
51}
52
53#[derive(Debug, Default, Parser)]
55pub struct ExportCommand {
56 #[clap(long)]
58 addr: String,
59
60 #[clap(long)]
63 output_dir: Option<String>,
64
65 #[clap(long, default_value_t = default_database())]
67 database: String,
68
69 #[clap(long, short = 'j', default_value = "1", alias = "export-jobs")]
73 db_parallelism: usize,
74
75 #[clap(long, default_value = "4")]
79 table_parallelism: usize,
80
81 #[clap(long, default_value = "3")]
83 max_retry: usize,
84
85 #[clap(long, short = 't', value_enum, default_value = "all")]
87 target: ExportTarget,
88
89 #[clap(long)]
92 start_time: Option<String>,
93
94 #[clap(long)]
97 end_time: Option<String>,
98
99 #[clap(long)]
101 auth_basic: Option<String>,
102
103 #[clap(long, value_parser = humantime::parse_duration)]
108 timeout: Option<Duration>,
109
110 #[clap(long)]
115 proxy: Option<String>,
116
117 #[clap(long)]
121 no_proxy: bool,
122
123 #[clap(long)]
131 ddl_local_dir: Option<String>,
132
133 #[clap(flatten)]
134 storage: ObjectStoreConfig,
135}
136
137impl ExportCommand {
138 pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
139 let (storage_type, operator) = if self.storage.enable_s3 {
141 (
142 StorageType::S3(S3Backend::new(self.storage.s3.clone())?),
143 self.storage.build_s3()?,
144 )
145 } else if self.storage.enable_oss {
146 (
147 StorageType::Oss(OssBackend::new(self.storage.oss.clone())?),
148 self.storage.build_oss()?,
149 )
150 } else if self.storage.enable_gcs {
151 (
152 StorageType::Gcs(GcsBackend::new(self.storage.gcs.clone())?),
153 self.storage.build_gcs()?,
154 )
155 } else if self.storage.enable_azblob {
156 (
157 StorageType::Azblob(AzblobBackend::new(self.storage.azblob.clone())?),
158 self.storage.build_azblob()?,
159 )
160 } else if let Some(output_dir) = &self.output_dir {
161 (
162 StorageType::Fs(FsBackend::new(output_dir.clone())),
163 new_fs_object_store(output_dir)?,
164 )
165 } else {
166 return Err(BoxedError::new(OutputDirNotSetSnafu {}.build()));
167 };
168
169 let (catalog, schema) =
170 database::split_database(&self.database).map_err(BoxedError::new)?;
171 let proxy = parse_proxy_opts(self.proxy.clone(), self.no_proxy)?;
172 let database_client = DatabaseClient::new(
173 self.addr.clone(),
174 catalog.clone(),
175 self.auth_basic.clone(),
176 self.timeout.unwrap_or_default(),
178 proxy,
179 self.no_proxy,
180 );
181
182 Ok(Box::new(Export {
183 catalog,
184 schema,
185 database_client,
186 export_jobs: self.db_parallelism,
187 target: self.target.clone(),
188 start_time: self.start_time.clone(),
189 end_time: self.end_time.clone(),
190 parallelism: self.table_parallelism,
191 storage_type,
192 ddl_local_dir: self.ddl_local_dir.clone(),
193 operator,
194 }))
195 }
196}
197
198#[derive(Clone)]
199pub struct Export {
200 catalog: String,
201 schema: Option<String>,
202 database_client: DatabaseClient,
203 export_jobs: usize,
204 target: ExportTarget,
205 start_time: Option<String>,
206 end_time: Option<String>,
207 parallelism: usize,
208 storage_type: StorageType,
209 ddl_local_dir: Option<String>,
210 operator: ObjectStore,
211}
212
213impl Export {
214 async fn get_db_names(&self) -> Result<Vec<String>> {
215 let db_names = self.all_db_names().await?;
216 let Some(schema) = &self.schema else {
217 return Ok(db_names);
218 };
219
220 db_names
222 .into_iter()
223 .find(|db_name| db_name.to_lowercase() == schema.to_lowercase())
224 .map(|name| vec![name])
225 .context(SchemaNotFoundSnafu {
226 catalog: &self.catalog,
227 schema,
228 })
229 }
230
231 async fn all_db_names(&self) -> Result<Vec<String>> {
233 let records = self
234 .database_client
235 .sql_in_public("SHOW DATABASES")
236 .await?
237 .context(EmptyResultSnafu)?;
238 let mut result = Vec::with_capacity(records.len());
239 for value in records {
240 let Value::String(schema) = &value[0] else {
241 unreachable!()
242 };
243 if schema == common_catalog::consts::INFORMATION_SCHEMA_NAME {
244 continue;
245 }
246 if schema == common_catalog::consts::PG_CATALOG_NAME {
247 continue;
248 }
249 result.push(schema.clone());
250 }
251 Ok(result)
252 }
253
254 async fn get_table_list(
257 &self,
258 catalog: &str,
259 schema: &str,
260 ) -> Result<(
261 Vec<TableReference>,
262 Vec<TableReference>,
263 Vec<TableReference>,
264 )> {
265 let sql = format!(
267 "SELECT table_catalog, table_schema, table_name \
268 FROM information_schema.columns \
269 WHERE column_name = '__tsid' \
270 and table_catalog = \'{catalog}\' \
271 and table_schema = \'{schema}\'"
272 );
273 let records = self
274 .database_client
275 .sql_in_public(&sql)
276 .await?
277 .context(EmptyResultSnafu)?;
278 let mut metric_physical_tables = HashSet::with_capacity(records.len());
279 for value in records {
280 let mut t = Vec::with_capacity(3);
281 for v in &value {
282 let Value::String(value) = v else {
283 unreachable!()
284 };
285 t.push(value);
286 }
287 metric_physical_tables.insert((t[0].clone(), t[1].clone(), t[2].clone()));
288 }
289
290 let sql = format!(
291 "SELECT table_catalog, table_schema, table_name, table_type \
292 FROM information_schema.tables \
293 WHERE (table_type = \'BASE TABLE\' OR table_type = \'VIEW\') \
294 and table_catalog = \'{catalog}\' \
295 and table_schema = \'{schema}\'",
296 );
297 let records = self
298 .database_client
299 .sql_in_public(&sql)
300 .await?
301 .context(EmptyResultSnafu)?;
302
303 debug!("Fetched table/view list: {:?}", records);
304
305 if records.is_empty() {
306 return Ok((vec![], vec![], vec![]));
307 }
308
309 let mut remaining_tables = Vec::with_capacity(records.len());
310 let mut views = Vec::new();
311 for value in records {
312 let mut t = Vec::with_capacity(4);
313 for v in &value {
314 let Value::String(value) = v else {
315 unreachable!()
316 };
317 t.push(value);
318 }
319 let table = (t[0].clone(), t[1].clone(), t[2].clone());
320 let table_type = t[3].as_str();
321 if !metric_physical_tables.contains(&table) {
323 if table_type == "VIEW" {
324 views.push(table);
325 } else {
326 remaining_tables.push(table);
327 }
328 }
329 }
330
331 Ok((
332 metric_physical_tables.into_iter().collect(),
333 remaining_tables,
334 views,
335 ))
336 }
337
338 async fn show_create(
339 &self,
340 show_type: &str,
341 catalog: &str,
342 schema: &str,
343 table: Option<&str>,
344 ) -> Result<String> {
345 let sql = match table {
346 Some(table) => format!(
347 r#"SHOW CREATE {} "{}"."{}"."{}""#,
348 show_type, catalog, schema, table
349 ),
350 None => format!(r#"SHOW CREATE {} "{}"."{}""#, show_type, catalog, schema),
351 };
352 let records = self
353 .database_client
354 .sql_in_public(&sql)
355 .await?
356 .context(EmptyResultSnafu)?;
357 let Value::String(create) = &records[0][1] else {
358 unreachable!()
359 };
360
361 Ok(format!("{};\n", create))
362 }
363
364 async fn export_create_database(&self) -> Result<()> {
365 let timer = Instant::now();
366 let db_names = self.get_db_names().await?;
367 let db_count = db_names.len();
368 let operator = self.build_prefer_fs_operator().await?;
369
370 for schema in db_names {
371 let create_database = self
372 .show_create("DATABASE", &self.catalog, &schema, None)
373 .await?;
374
375 let file_path = self.get_file_path(&schema, "create_database.sql");
376 self.write_to_storage(&operator, &file_path, create_database.into_bytes())
377 .await?;
378
379 info!(
380 "Exported {}.{} database creation SQL to {}",
381 self.catalog,
382 schema,
383 self.storage_type.format_output_path(&file_path)
384 );
385 }
386
387 let elapsed = timer.elapsed();
388 info!("Success {db_count} jobs, cost: {elapsed:?}");
389
390 Ok(())
391 }
392
393 async fn export_create_table(&self) -> Result<()> {
394 let timer = Instant::now();
395 let semaphore = Arc::new(Semaphore::new(self.export_jobs));
396 let db_names = self.get_db_names().await?;
397 let db_count = db_names.len();
398 let operator = Arc::new(self.build_prefer_fs_operator().await?);
399 let mut tasks = Vec::with_capacity(db_names.len());
400
401 for schema in db_names {
402 let semaphore_moved = semaphore.clone();
403 let export_self = self.clone();
404 let operator = operator.clone();
405 tasks.push(async move {
406 let _permit = semaphore_moved.acquire().await.unwrap();
407 let (metric_physical_tables, remaining_tables, views) = export_self
408 .get_table_list(&export_self.catalog, &schema)
409 .await?;
410
411 if !export_self.storage_type.is_remote_storage() {
413 let db_dir = format!("{}/{}/", export_self.catalog, schema);
414 operator.create_dir(&db_dir).await.context(OpenDalSnafu)?;
415 }
416
417 let file_path = export_self.get_file_path(&schema, "create_tables.sql");
418 let mut content = Vec::new();
419
420 for (c, s, t) in metric_physical_tables.iter().chain(&remaining_tables) {
422 let create_table = export_self.show_create("TABLE", c, s, Some(t)).await?;
423 content.extend_from_slice(create_table.as_bytes());
424 }
425
426 for (c, s, v) in &views {
428 let create_view = export_self.show_create("VIEW", c, s, Some(v)).await?;
429 content.extend_from_slice(create_view.as_bytes());
430 }
431
432 export_self
434 .write_to_storage(&operator, &file_path, content)
435 .await?;
436
437 info!(
438 "Finished exporting {}.{schema} with {} table schemas to path: {}",
439 export_self.catalog,
440 metric_physical_tables.len() + remaining_tables.len() + views.len(),
441 export_self.storage_type.format_output_path(&file_path)
442 );
443
444 Ok::<(), Error>(())
445 });
446 }
447
448 let success = self.execute_tasks(tasks).await;
449 let elapsed = timer.elapsed();
450 info!("Success {success}/{db_count} jobs, cost: {elapsed:?}");
451
452 Ok(())
453 }
454
455 async fn build_operator(&self) -> Result<ObjectStore> {
456 Ok(self.operator.clone())
457 }
458
459 async fn build_prefer_fs_operator(&self) -> Result<ObjectStore> {
461 if self.storage_type.is_remote_storage()
462 && let Some(ddl_local_dir) = &self.ddl_local_dir
463 {
464 let root = ddl_local_dir.clone();
465 let op = new_fs_object_store(&root).map_err(|e| Error::Other {
466 source: e,
467 location: snafu::location!(),
468 })?;
469 Ok(op)
470 } else {
471 Ok(self.operator.clone())
472 }
473 }
474
475 async fn export_database_data(&self) -> Result<()> {
476 let timer = Instant::now();
477 let semaphore = Arc::new(Semaphore::new(self.export_jobs));
478 let db_names = self.get_db_names().await?;
479 let db_count = db_names.len();
480 let mut tasks = Vec::with_capacity(db_count);
481 let operator = Arc::new(self.build_operator().await?);
482 let fs_first_operator = Arc::new(self.build_prefer_fs_operator().await?);
483 let with_options = build_with_options(&self.start_time, &self.end_time, self.parallelism);
484
485 for schema in db_names {
486 let semaphore_moved = semaphore.clone();
487 let export_self = self.clone();
488 let with_options_clone = with_options.clone();
489 let operator = operator.clone();
490 let fs_first_operator = fs_first_operator.clone();
491
492 tasks.push(async move {
493 let _permit = semaphore_moved.acquire().await.unwrap();
494
495 if !export_self.storage_type.is_remote_storage() {
497 let db_dir = format!("{}/{}/", export_self.catalog, schema);
498 operator.create_dir(&db_dir).await.context(OpenDalSnafu)?;
499 }
500
501 let (path, connection_part) = export_self
502 .storage_type
503 .get_storage_path(&export_self.catalog, &schema);
504
505 let sql = format!(
507 r#"COPY DATABASE "{}"."{}" TO '{}' WITH ({}){};"#,
508 export_self.catalog, schema, path, with_options_clone, connection_part
509 );
510
511 let safe_sql = export_self.storage_type.mask_sensitive_info(&sql);
513 info!("Executing sql: {}", safe_sql);
514
515 export_self.database_client.sql_in_public(&sql).await?;
516 info!(
517 "Finished exporting {}.{} data to {}",
518 export_self.catalog, schema, path
519 );
520
521 let copy_database_from_sql = {
523 let command_without_connection = format!(
524 r#"COPY DATABASE "{}"."{}" FROM '{}' WITH ({});"#,
525 export_self.catalog, schema, COPY_PATH_PLACEHOLDER, with_options_clone
526 );
527
528 if connection_part.is_empty() {
529 command_without_connection
530 } else {
531 let command_with_connection = format!(
532 r#"COPY DATABASE "{}"."{}" FROM '{}' WITH ({}){};"#,
533 export_self.catalog, schema, path, with_options_clone, connection_part
534 );
535
536 format!(
537 "-- {}\n{}",
538 command_with_connection, command_without_connection
539 )
540 }
541 };
542
543 let copy_from_path = export_self.get_file_path(&schema, "copy_from.sql");
544 export_self
545 .write_to_storage(
546 &fs_first_operator,
547 ©_from_path,
548 copy_database_from_sql.into_bytes(),
549 )
550 .await?;
551
552 info!(
553 "Finished exporting {}.{} copy_from.sql to {}",
554 export_self.catalog,
555 schema,
556 export_self.storage_type.format_output_path(©_from_path)
557 );
558
559 Ok::<(), Error>(())
560 });
561 }
562
563 let success = self.execute_tasks(tasks).await;
564 let elapsed = timer.elapsed();
565 info!("Success {success}/{db_count} jobs, costs: {elapsed:?}");
566
567 Ok(())
568 }
569
570 fn get_file_path(&self, schema: &str, file_name: &str) -> String {
571 format!("{}/{}/{}", self.catalog, schema, file_name)
572 }
573
574 async fn write_to_storage(
575 &self,
576 op: &ObjectStore,
577 file_path: &str,
578 content: Vec<u8>,
579 ) -> Result<()> {
580 op.write(file_path, content)
581 .await
582 .context(OpenDalSnafu)
583 .map(|_| ())
584 }
585
586 async fn execute_tasks(
587 &self,
588 tasks: Vec<impl std::future::Future<Output = Result<()>>>,
589 ) -> usize {
590 futures::future::join_all(tasks)
591 .await
592 .into_iter()
593 .filter(|r| match r {
594 Ok(_) => true,
595 Err(e) => {
596 error!(e; "export job failed");
597 false
598 }
599 })
600 .count()
601 }
602}
603
604#[async_trait]
605impl Tool for Export {
606 async fn do_work(&self) -> std::result::Result<(), BoxedError> {
607 match self.target {
608 ExportTarget::Schema => {
609 self.export_create_database()
610 .await
611 .map_err(BoxedError::new)?;
612 self.export_create_table().await.map_err(BoxedError::new)
613 }
614 ExportTarget::Data => self.export_database_data().await.map_err(BoxedError::new),
615 ExportTarget::All => {
616 self.export_create_database()
617 .await
618 .map_err(BoxedError::new)?;
619 self.export_create_table().await.map_err(BoxedError::new)?;
620 self.export_database_data().await.map_err(BoxedError::new)
621 }
622 }
623 }
624}
625
626fn build_with_options(
628 start_time: &Option<String>,
629 end_time: &Option<String>,
630 parallelism: usize,
631) -> String {
632 let mut options = vec!["format = 'parquet'".to_string()];
633 if let Some(start) = start_time {
634 options.push(format!("start_time = '{}'", start));
635 }
636 if let Some(end) = end_time {
637 options.push(format!("end_time = '{}'", end));
638 }
639 options.push(format!("parallelism = {}", parallelism));
640 options.join(", ")
641}
642
643#[cfg(test)]
644mod tests {
645 use clap::Parser;
646 use common_test_util::temp_dir::create_temp_dir;
647
648 use super::*;
649
650 const MOCK_AZBLOB_ACCOUNT_KEY_B64: &str = "dGVzdC1rZXk=";
651
652 #[tokio::test]
655 async fn test_export_command_build_with_local_fs() {
656 let temp_dir = create_temp_dir("test_export_local_fs");
657 let output_dir = temp_dir.path().to_str().unwrap();
658
659 let cmd = ExportCommand::parse_from([
660 "export",
661 "--addr",
662 "127.0.0.1:4000",
663 "--output-dir",
664 output_dir,
665 ]);
666
667 let result = cmd.build().await;
668 assert!(result.is_ok());
669 }
670
671 #[tokio::test]
672 async fn test_export_command_build_with_s3_success() {
673 let cmd = ExportCommand::parse_from([
674 "export",
675 "--addr",
676 "127.0.0.1:4000",
677 "--s3",
678 "--s3-bucket",
679 "test-bucket",
680 "--s3-root",
681 "test-root",
682 "--s3-access-key-id",
683 "test-key",
684 "--s3-secret-access-key",
685 "test-secret",
686 "--s3-region",
688 "us-west-2",
689 "--s3-endpoint",
690 "https://s3.amazonaws.com",
691 ]);
692
693 let result = cmd.build().await;
694 assert!(result.is_ok());
695 }
696
697 #[tokio::test]
698 async fn test_export_command_build_with_oss_success() {
699 let cmd = ExportCommand::parse_from([
700 "export",
701 "--addr",
702 "127.0.0.1:4000",
703 "--oss",
704 "--oss-bucket",
705 "test-bucket",
706 "--oss-root",
707 "test-root",
708 "--oss-access-key-id",
709 "test-key-id",
710 "--oss-access-key-secret",
711 "test-secret",
712 "--oss-endpoint",
713 "https://oss.example.com",
714 ]);
715
716 let result = cmd.build().await;
717 assert!(result.is_ok());
718 }
719
720 #[tokio::test]
721 async fn test_export_command_build_with_gcs_success() {
722 let cmd = ExportCommand::parse_from([
723 "export",
724 "--addr",
725 "127.0.0.1:4000",
726 "--gcs",
727 "--gcs-bucket",
728 "test-bucket",
729 "--gcs-root",
730 "test-root",
731 "--gcs-scope",
732 "test-scope",
733 "--gcs-credential",
734 "test-credential-content",
735 "--gcs-endpoint",
736 "https://storage.googleapis.com",
737 ]);
738
739 let result = cmd.build().await;
740 assert!(result.is_ok());
741 }
742
743 #[tokio::test]
744 async fn test_export_command_build_with_gcs_adc_success() {
745 let cmd = ExportCommand::parse_from([
747 "export",
748 "--addr",
749 "127.0.0.1:4000",
750 "--gcs",
751 "--gcs-bucket",
752 "test-bucket",
753 "--gcs-root",
754 "test-root",
755 "--gcs-scope",
756 "test-scope",
757 ]);
760
761 let result = cmd.build().await;
762 assert!(result.is_ok());
763 }
764
765 #[tokio::test]
766 async fn test_export_command_build_with_azblob_success() {
767 let cmd = ExportCommand::parse_from([
768 "export",
769 "--addr",
770 "127.0.0.1:4000",
771 "--azblob",
772 "--azblob-container",
773 "test-container",
774 "--azblob-root",
775 "test-root",
776 "--azblob-account-name",
777 "test-account",
778 "--azblob-account-key",
779 MOCK_AZBLOB_ACCOUNT_KEY_B64,
780 "--azblob-endpoint",
781 "https://account.blob.core.windows.net",
782 ]);
783
784 let result = cmd.build().await;
785 assert!(result.is_ok());
786 }
787
788 #[tokio::test]
789 async fn test_export_command_build_with_azblob_with_sas_token() {
790 let cmd = ExportCommand::parse_from([
792 "export",
793 "--addr",
794 "127.0.0.1:4000",
795 "--azblob",
796 "--azblob-container",
797 "test-container",
798 "--azblob-root",
799 "test-root",
800 "--azblob-account-name",
801 "test-account",
802 "--azblob-account-key",
803 MOCK_AZBLOB_ACCOUNT_KEY_B64,
804 "--azblob-endpoint",
805 "https://account.blob.core.windows.net",
806 "--azblob-sas-token",
807 "test-sas-token",
808 ]);
809
810 let result = cmd.build().await;
811 assert!(result.is_ok());
812 }
813
814 #[test]
817 fn test_export_command_build_with_conflict() {
818 let result =
820 ExportCommand::try_parse_from(["export", "--addr", "127.0.0.1:4000", "--s3", "--oss"]);
821
822 assert!(result.is_err());
823 let err = result.unwrap_err();
824 assert!(err.kind() == clap::error::ErrorKind::ArgumentConflict);
826 }
827
828 #[tokio::test]
829 async fn test_export_command_build_with_s3_no_enable_flag() {
830 let result = ExportCommand::try_parse_from([
832 "export",
833 "--addr",
834 "127.0.0.1:4000",
835 "--s3-bucket",
837 "test-bucket",
838 "--s3-access-key-id",
839 "test-key",
840 "--output-dir",
841 "/tmp/test",
842 ]);
843
844 assert!(result.is_err());
845 let err = result.unwrap_err();
846 assert_eq!(err.kind(), clap::error::ErrorKind::MissingRequiredArgument);
847 assert!(err.to_string().contains("--s3"));
848 }
849
850 #[tokio::test]
851 async fn test_export_command_build_with_oss_no_enable_flag() {
852 let result = ExportCommand::try_parse_from([
854 "export",
855 "--addr",
856 "127.0.0.1:4000",
857 "--oss-bucket",
858 "test-bucket",
859 "--output-dir",
860 "/tmp/test",
861 ]);
862
863 assert!(result.is_err());
864 let err = result.unwrap_err();
865 assert_eq!(err.kind(), clap::error::ErrorKind::MissingRequiredArgument);
866 assert!(err.to_string().contains("--oss"));
867 }
868
869 #[tokio::test]
870 async fn test_export_command_build_with_gcs_no_enable_flag() {
871 let result = ExportCommand::try_parse_from([
873 "export",
874 "--addr",
875 "127.0.0.1:4000",
876 "--gcs-bucket",
877 "test-bucket",
878 "--output-dir",
879 "/tmp/test",
880 ]);
881
882 assert!(result.is_err());
883 let err = result.unwrap_err();
884 assert_eq!(err.kind(), clap::error::ErrorKind::MissingRequiredArgument);
885 assert!(err.to_string().contains("--gcs"));
886 }
887
888 #[tokio::test]
889 async fn test_export_command_build_with_azblob_no_enable_flag() {
890 let result = ExportCommand::try_parse_from([
892 "export",
893 "--addr",
894 "127.0.0.1:4000",
895 "--azblob-container",
896 "test-container",
897 "--output-dir",
898 "/tmp/test",
899 ]);
900
901 assert!(result.is_err());
902 let err = result.unwrap_err();
903 assert_eq!(err.kind(), clap::error::ErrorKind::MissingRequiredArgument);
904 assert!(err.to_string().contains("--azblob"));
905 }
906
907 #[tokio::test]
910 async fn test_export_command_build_with_s3_empty_root() {
911 let cmd = ExportCommand::parse_from([
913 "export",
914 "--addr",
915 "127.0.0.1:4000",
916 "--s3",
917 "--s3-bucket",
918 "test-bucket",
919 "--s3-root",
920 "", "--s3-access-key-id",
922 "test-key",
923 "--s3-secret-access-key",
924 "test-secret",
925 "--s3-region",
926 "us-west-2",
927 ]);
928
929 let result = cmd.build().await;
930 assert!(
932 result.is_ok(),
933 "Expected success but got: {:?}",
934 result.err()
935 );
936 }
937
938 #[tokio::test]
939 async fn test_export_command_build_with_oss_empty_access_key_id() {
940 let cmd = ExportCommand::parse_from([
942 "export",
943 "--addr",
944 "127.0.0.1:4000",
945 "--oss",
946 "--oss-bucket",
947 "test-bucket",
948 "--oss-access-key-id",
949 "", "--oss-access-key-secret",
951 "test-secret",
952 "--oss-endpoint",
953 "https://oss.example.com",
954 ]);
955
956 let result = cmd.build().await;
957 assert!(result.is_err());
958 if let Err(err) = result {
959 assert!(
960 err.to_string().contains("OSS access key ID must be set"),
961 "Actual error: {}",
962 err
963 );
964 }
965 }
966
967 #[tokio::test]
968 async fn test_export_command_build_with_oss_missing_endpoint() {
969 let cmd = ExportCommand::parse_from([
971 "export",
972 "--addr",
973 "127.0.0.1:4000",
974 "--oss",
975 "--oss-bucket",
976 "test-bucket",
977 "--oss-root",
978 "test-root",
979 "--oss-access-key-id",
980 "test-key-id",
981 "--oss-access-key-secret",
982 "test-secret",
983 ]);
984
985 let result = cmd.build().await;
986 assert!(result.is_err());
987 if let Err(err) = result {
988 assert!(
989 err.to_string().contains("OSS endpoint must be set"),
990 "Actual error: {}",
991 err
992 );
993 }
994 }
995
996 #[tokio::test]
997 async fn test_export_command_build_with_oss_multiple_missing_fields() {
998 let cmd = ExportCommand::parse_from([
1000 "export",
1001 "--addr",
1002 "127.0.0.1:4000",
1003 "--oss",
1004 "--oss-bucket",
1005 "test-bucket",
1006 ]);
1008
1009 let result = cmd.build().await;
1010 assert!(result.is_err());
1011 if let Err(err) = result {
1012 let err_str = err.to_string();
1013 assert!(
1015 err_str.contains("OSS"),
1016 "Error should mention OSS: {}",
1017 err_str
1018 );
1019 assert!(
1020 err_str.contains("must be set"),
1021 "Error should mention required fields: {}",
1022 err_str
1023 );
1024 }
1025 }
1026
1027 #[tokio::test]
1028 async fn test_export_command_build_with_gcs_empty_bucket() {
1029 let cmd = ExportCommand::parse_from([
1031 "export",
1032 "--addr",
1033 "127.0.0.1:4000",
1034 "--gcs",
1035 "--gcs-bucket",
1036 "", "--gcs-root",
1038 "test-root",
1039 "--gcs-scope",
1040 "test-scope",
1041 ]);
1042
1043 let result = cmd.build().await;
1044 assert!(result.is_err());
1045 if let Err(err) = result {
1046 assert!(
1047 err.to_string().contains("GCS bucket must be set"),
1048 "Actual error: {}",
1049 err
1050 );
1051 }
1052 }
1053
1054 #[tokio::test]
1055 async fn test_export_command_build_with_gcs_empty_root() {
1056 let cmd = ExportCommand::parse_from([
1058 "export",
1059 "--addr",
1060 "127.0.0.1:4000",
1061 "--gcs",
1062 "--gcs-bucket",
1063 "test-bucket",
1064 "--gcs-root",
1065 "", "--gcs-scope",
1067 "test-scope",
1068 "--gcs-credential",
1069 "test-credential",
1070 "--gcs-endpoint",
1071 "https://storage.googleapis.com",
1072 ]);
1073
1074 let result = cmd.build().await;
1075 assert!(result.is_err());
1076 if let Err(err) = result {
1077 assert!(
1078 err.to_string().contains("GCS root must be set"),
1079 "Actual error: {}",
1080 err
1081 );
1082 }
1083 }
1084
1085 #[tokio::test]
1086 async fn test_export_command_build_with_azblob_empty_account_name() {
1087 let cmd = ExportCommand::parse_from([
1089 "export",
1090 "--addr",
1091 "127.0.0.1:4000",
1092 "--azblob",
1093 "--azblob-container",
1094 "test-container",
1095 "--azblob-root",
1096 "test-root",
1097 "--azblob-account-name",
1098 "", "--azblob-account-key",
1100 MOCK_AZBLOB_ACCOUNT_KEY_B64,
1101 "--azblob-endpoint",
1102 "https://account.blob.core.windows.net",
1103 ]);
1104
1105 let result = cmd.build().await;
1106 assert!(result.is_err());
1107 if let Err(err) = result {
1108 assert!(
1109 err.to_string().contains("AzBlob account name must be set"),
1110 "Actual error: {}",
1111 err
1112 );
1113 }
1114 }
1115
1116 #[tokio::test]
1117 async fn test_export_command_build_with_azblob_missing_account_key() {
1118 let cmd = ExportCommand::parse_from([
1120 "export",
1121 "--addr",
1122 "127.0.0.1:4000",
1123 "--azblob",
1124 "--azblob-container",
1125 "test-container",
1126 "--azblob-root",
1127 "test-root",
1128 "--azblob-account-name",
1129 "test-account",
1130 "--azblob-endpoint",
1131 "https://account.blob.core.windows.net",
1132 ]);
1133
1134 let result = cmd.build().await;
1135 assert!(result.is_err());
1136 if let Err(err) = result {
1137 assert!(
1138 err.to_string()
1139 .contains("AzBlob account key (when sas_token is not provided) must be set"),
1140 "Actual error: {}",
1141 err
1142 );
1143 }
1144 }
1145
1146 #[tokio::test]
1149 async fn test_export_command_build_with_no_storage() {
1150 let cmd = ExportCommand::parse_from(["export", "--addr", "127.0.0.1:4000"]);
1152
1153 let result = cmd.build().await;
1154 assert!(result.is_err());
1155 if let Err(err) = result {
1156 assert!(
1157 err.to_string().contains("Output directory not set"),
1158 "Actual error: {}",
1159 err
1160 );
1161 }
1162 }
1163
1164 #[tokio::test]
1165 async fn test_export_command_build_with_s3_minimal_config() {
1166 let cmd = ExportCommand::parse_from([
1168 "export",
1169 "--addr",
1170 "127.0.0.1:4000",
1171 "--s3",
1172 "--s3-bucket",
1173 "test-bucket",
1174 "--s3-access-key-id",
1175 "test-key",
1176 "--s3-secret-access-key",
1177 "test-secret",
1178 "--s3-region",
1179 "us-west-2",
1180 ]);
1182
1183 let result = cmd.build().await;
1184 assert!(result.is_ok(), "Minimal S3 config should succeed");
1185 }
1186
1187 #[tokio::test]
1188 async fn test_export_command_build_with_oss_minimal_config() {
1189 let cmd = ExportCommand::parse_from([
1191 "export",
1192 "--addr",
1193 "127.0.0.1:4000",
1194 "--oss",
1195 "--oss-bucket",
1196 "test-bucket",
1197 "--oss-access-key-id",
1198 "test-key-id",
1199 "--oss-access-key-secret",
1200 "test-secret",
1201 "--oss-endpoint",
1202 "https://oss.example.com",
1203 ]);
1205
1206 let result = cmd.build().await;
1207 assert!(result.is_ok(), "Minimal OSS config should succeed");
1208 }
1209
1210 #[tokio::test]
1211 async fn test_export_command_build_with_gcs_minimal_config() {
1212 let cmd = ExportCommand::parse_from([
1214 "export",
1215 "--addr",
1216 "127.0.0.1:4000",
1217 "--gcs",
1218 "--gcs-bucket",
1219 "test-bucket",
1220 "--gcs-root",
1221 "test-root",
1222 "--gcs-scope",
1223 "test-scope",
1224 ]);
1226
1227 let result = cmd.build().await;
1228 assert!(result.is_ok(), "Minimal GCS config should succeed");
1229 }
1230
1231 #[tokio::test]
1232 async fn test_export_command_build_with_azblob_minimal_config() {
1233 let cmd = ExportCommand::parse_from([
1235 "export",
1236 "--addr",
1237 "127.0.0.1:4000",
1238 "--azblob",
1239 "--azblob-container",
1240 "test-container",
1241 "--azblob-root",
1242 "test-root",
1243 "--azblob-account-name",
1244 "test-account",
1245 "--azblob-account-key",
1246 MOCK_AZBLOB_ACCOUNT_KEY_B64,
1247 "--azblob-endpoint",
1248 "https://account.blob.core.windows.net",
1249 ]);
1251
1252 let result = cmd.build().await;
1253 assert!(result.is_ok(), "Minimal AzBlob config should succeed");
1254 }
1255
1256 #[tokio::test]
1257 async fn test_export_command_build_with_local_and_s3() {
1258 let temp_dir = create_temp_dir("test_export_local_and_s3");
1260 let output_dir = temp_dir.path().to_str().unwrap();
1261
1262 let cmd = ExportCommand::parse_from([
1263 "export",
1264 "--addr",
1265 "127.0.0.1:4000",
1266 "--output-dir",
1267 output_dir,
1268 "--s3",
1269 "--s3-bucket",
1270 "test-bucket",
1271 "--s3-access-key-id",
1272 "test-key",
1273 "--s3-secret-access-key",
1274 "test-secret",
1275 "--s3-region",
1276 "us-west-2",
1277 ]);
1278
1279 let result = cmd.build().await;
1280 assert!(
1281 result.is_ok(),
1282 "S3 should be selected when both are provided"
1283 );
1284 }
1285
1286 #[tokio::test]
1289 async fn test_export_command_build_with_azblob_only_sas_token() {
1290 let cmd = ExportCommand::parse_from([
1292 "export",
1293 "--addr",
1294 "127.0.0.1:4000",
1295 "--azblob",
1296 "--azblob-container",
1297 "test-container",
1298 "--azblob-root",
1299 "test-root",
1300 "--azblob-account-name",
1301 "test-account",
1302 "--azblob-endpoint",
1303 "https://account.blob.core.windows.net",
1304 "--azblob-sas-token",
1305 "test-sas-token",
1306 ]);
1308
1309 let result = cmd.build().await;
1310 assert!(
1311 result.is_ok(),
1312 "AzBlob with only sas_token should succeed: {:?}",
1313 result.err()
1314 );
1315 }
1316
1317 #[tokio::test]
1318 async fn test_export_command_build_with_azblob_empty_account_key_with_sas() {
1319 let cmd = ExportCommand::parse_from([
1321 "export",
1322 "--addr",
1323 "127.0.0.1:4000",
1324 "--azblob",
1325 "--azblob-container",
1326 "test-container",
1327 "--azblob-root",
1328 "test-root",
1329 "--azblob-account-name",
1330 "test-account",
1331 "--azblob-account-key",
1332 "", "--azblob-endpoint",
1334 "https://account.blob.core.windows.net",
1335 "--azblob-sas-token",
1336 "test-sas-token",
1337 ]);
1338
1339 let result = cmd.build().await;
1340 assert!(
1341 result.is_ok(),
1342 "AzBlob with empty account_key but sas_token should succeed: {:?}",
1343 result.err()
1344 );
1345 }
1346}