1use std::collections::HashSet;
16use std::sync::Arc;
17use std::time::Duration;
18
19use async_trait::async_trait;
20use clap::{Parser, ValueEnum};
21use common_error::ext::BoxedError;
22use common_telemetry::{debug, error, info};
23use object_store::ObjectStore;
24use serde_json::Value;
25use snafu::{OptionExt, ResultExt};
26use tokio::sync::Semaphore;
27use tokio::time::Instant;
28
29use crate::common::{ObjectStoreConfig, new_fs_object_store};
30use crate::data::storage_export::{
31 AzblobBackend, FsBackend, GcsBackend, OssBackend, S3Backend, StorageExport, StorageType,
32};
33use crate::data::{COPY_PATH_PLACEHOLDER, default_database};
34use crate::database::{DatabaseClient, parse_proxy_opts};
35use crate::error::{
36 EmptyResultSnafu, Error, OpenDalSnafu, OutputDirNotSetSnafu, Result, SchemaNotFoundSnafu,
37};
38use crate::{Tool, database};
39
40type TableReference = (String, String, String);
41
42#[derive(Debug, Default, Clone, ValueEnum)]
43enum ExportTarget {
44 Schema,
46 Data,
48 #[default]
50 All,
51}
52
53#[derive(Debug, Default, Parser)]
55pub struct ExportCommand {
56 #[clap(long)]
58 addr: String,
59
60 #[clap(long)]
63 output_dir: Option<String>,
64
65 #[clap(long, default_value_t = default_database())]
67 database: String,
68
69 #[clap(long, short = 'j', default_value = "1", alias = "export-jobs")]
73 db_parallelism: usize,
74
75 #[clap(long, default_value = "4")]
79 table_parallelism: usize,
80
81 #[clap(long, default_value = "3")]
83 max_retry: usize,
84
85 #[clap(long, short = 't', value_enum, default_value = "all")]
87 target: ExportTarget,
88
89 #[clap(long)]
92 start_time: Option<String>,
93
94 #[clap(long)]
97 end_time: Option<String>,
98
99 #[clap(long)]
101 auth_basic: Option<String>,
102
103 #[clap(long, value_parser = humantime::parse_duration)]
108 timeout: Option<Duration>,
109
110 #[clap(long)]
114 proxy: Option<String>,
115
116 #[clap(long)]
118 no_proxy: bool,
119
120 #[clap(long)]
128 ddl_local_dir: Option<String>,
129
130 #[clap(flatten)]
131 storage: ObjectStoreConfig,
132}
133
134impl ExportCommand {
135 pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
136 let (storage_type, operator) = if self.storage.enable_s3 {
138 (
139 StorageType::S3(S3Backend::new(self.storage.s3.clone())?),
140 self.storage.build_s3()?,
141 )
142 } else if self.storage.enable_oss {
143 (
144 StorageType::Oss(OssBackend::new(self.storage.oss.clone())?),
145 self.storage.build_oss()?,
146 )
147 } else if self.storage.enable_gcs {
148 (
149 StorageType::Gcs(GcsBackend::new(self.storage.gcs.clone())?),
150 self.storage.build_gcs()?,
151 )
152 } else if self.storage.enable_azblob {
153 (
154 StorageType::Azblob(AzblobBackend::new(self.storage.azblob.clone())?),
155 self.storage.build_azblob()?,
156 )
157 } else if let Some(output_dir) = &self.output_dir {
158 (
159 StorageType::Fs(FsBackend::new(output_dir.clone())),
160 new_fs_object_store(output_dir)?,
161 )
162 } else {
163 return Err(BoxedError::new(OutputDirNotSetSnafu {}.build()));
164 };
165
166 let (catalog, schema) =
167 database::split_database(&self.database).map_err(BoxedError::new)?;
168 let proxy = parse_proxy_opts(self.proxy.clone(), self.no_proxy)?;
169 let database_client = DatabaseClient::new(
170 self.addr.clone(),
171 catalog.clone(),
172 self.auth_basic.clone(),
173 self.timeout.unwrap_or_default(),
175 proxy,
176 );
177
178 Ok(Box::new(Export {
179 catalog,
180 schema,
181 database_client,
182 export_jobs: self.db_parallelism,
183 target: self.target.clone(),
184 start_time: self.start_time.clone(),
185 end_time: self.end_time.clone(),
186 parallelism: self.table_parallelism,
187 storage_type,
188 ddl_local_dir: self.ddl_local_dir.clone(),
189 operator,
190 }))
191 }
192}
193
194#[derive(Clone)]
195pub struct Export {
196 catalog: String,
197 schema: Option<String>,
198 database_client: DatabaseClient,
199 export_jobs: usize,
200 target: ExportTarget,
201 start_time: Option<String>,
202 end_time: Option<String>,
203 parallelism: usize,
204 storage_type: StorageType,
205 ddl_local_dir: Option<String>,
206 operator: ObjectStore,
207}
208
209impl Export {
210 async fn get_db_names(&self) -> Result<Vec<String>> {
211 let db_names = self.all_db_names().await?;
212 let Some(schema) = &self.schema else {
213 return Ok(db_names);
214 };
215
216 db_names
218 .into_iter()
219 .find(|db_name| db_name.to_lowercase() == schema.to_lowercase())
220 .map(|name| vec![name])
221 .context(SchemaNotFoundSnafu {
222 catalog: &self.catalog,
223 schema,
224 })
225 }
226
227 async fn all_db_names(&self) -> Result<Vec<String>> {
229 let records = self
230 .database_client
231 .sql_in_public("SHOW DATABASES")
232 .await?
233 .context(EmptyResultSnafu)?;
234 let mut result = Vec::with_capacity(records.len());
235 for value in records {
236 let Value::String(schema) = &value[0] else {
237 unreachable!()
238 };
239 if schema == common_catalog::consts::INFORMATION_SCHEMA_NAME {
240 continue;
241 }
242 if schema == common_catalog::consts::PG_CATALOG_NAME {
243 continue;
244 }
245 result.push(schema.clone());
246 }
247 Ok(result)
248 }
249
250 async fn get_table_list(
253 &self,
254 catalog: &str,
255 schema: &str,
256 ) -> Result<(
257 Vec<TableReference>,
258 Vec<TableReference>,
259 Vec<TableReference>,
260 )> {
261 let sql = format!(
263 "SELECT table_catalog, table_schema, table_name \
264 FROM information_schema.columns \
265 WHERE column_name = '__tsid' \
266 and table_catalog = \'{catalog}\' \
267 and table_schema = \'{schema}\'"
268 );
269 let records = self
270 .database_client
271 .sql_in_public(&sql)
272 .await?
273 .context(EmptyResultSnafu)?;
274 let mut metric_physical_tables = HashSet::with_capacity(records.len());
275 for value in records {
276 let mut t = Vec::with_capacity(3);
277 for v in &value {
278 let Value::String(value) = v else {
279 unreachable!()
280 };
281 t.push(value);
282 }
283 metric_physical_tables.insert((t[0].clone(), t[1].clone(), t[2].clone()));
284 }
285
286 let sql = format!(
287 "SELECT table_catalog, table_schema, table_name, table_type \
288 FROM information_schema.tables \
289 WHERE (table_type = \'BASE TABLE\' OR table_type = \'VIEW\') \
290 and table_catalog = \'{catalog}\' \
291 and table_schema = \'{schema}\'",
292 );
293 let records = self
294 .database_client
295 .sql_in_public(&sql)
296 .await?
297 .context(EmptyResultSnafu)?;
298
299 debug!("Fetched table/view list: {:?}", records);
300
301 if records.is_empty() {
302 return Ok((vec![], vec![], vec![]));
303 }
304
305 let mut remaining_tables = Vec::with_capacity(records.len());
306 let mut views = Vec::new();
307 for value in records {
308 let mut t = Vec::with_capacity(4);
309 for v in &value {
310 let Value::String(value) = v else {
311 unreachable!()
312 };
313 t.push(value);
314 }
315 let table = (t[0].clone(), t[1].clone(), t[2].clone());
316 let table_type = t[3].as_str();
317 if !metric_physical_tables.contains(&table) {
319 if table_type == "VIEW" {
320 views.push(table);
321 } else {
322 remaining_tables.push(table);
323 }
324 }
325 }
326
327 Ok((
328 metric_physical_tables.into_iter().collect(),
329 remaining_tables,
330 views,
331 ))
332 }
333
334 async fn show_create(
335 &self,
336 show_type: &str,
337 catalog: &str,
338 schema: &str,
339 table: Option<&str>,
340 ) -> Result<String> {
341 let sql = match table {
342 Some(table) => format!(
343 r#"SHOW CREATE {} "{}"."{}"."{}""#,
344 show_type, catalog, schema, table
345 ),
346 None => format!(r#"SHOW CREATE {} "{}"."{}""#, show_type, catalog, schema),
347 };
348 let records = self
349 .database_client
350 .sql_in_public(&sql)
351 .await?
352 .context(EmptyResultSnafu)?;
353 let Value::String(create) = &records[0][1] else {
354 unreachable!()
355 };
356
357 Ok(format!("{};\n", create))
358 }
359
360 async fn export_create_database(&self) -> Result<()> {
361 let timer = Instant::now();
362 let db_names = self.get_db_names().await?;
363 let db_count = db_names.len();
364 let operator = self.build_prefer_fs_operator().await?;
365
366 for schema in db_names {
367 let create_database = self
368 .show_create("DATABASE", &self.catalog, &schema, None)
369 .await?;
370
371 let file_path = self.get_file_path(&schema, "create_database.sql");
372 self.write_to_storage(&operator, &file_path, create_database.into_bytes())
373 .await?;
374
375 info!(
376 "Exported {}.{} database creation SQL to {}",
377 self.catalog,
378 schema,
379 self.storage_type.format_output_path(&file_path)
380 );
381 }
382
383 let elapsed = timer.elapsed();
384 info!("Success {db_count} jobs, cost: {elapsed:?}");
385
386 Ok(())
387 }
388
389 async fn export_create_table(&self) -> Result<()> {
390 let timer = Instant::now();
391 let semaphore = Arc::new(Semaphore::new(self.export_jobs));
392 let db_names = self.get_db_names().await?;
393 let db_count = db_names.len();
394 let operator = Arc::new(self.build_prefer_fs_operator().await?);
395 let mut tasks = Vec::with_capacity(db_names.len());
396
397 for schema in db_names {
398 let semaphore_moved = semaphore.clone();
399 let export_self = self.clone();
400 let operator = operator.clone();
401 tasks.push(async move {
402 let _permit = semaphore_moved.acquire().await.unwrap();
403 let (metric_physical_tables, remaining_tables, views) = export_self
404 .get_table_list(&export_self.catalog, &schema)
405 .await?;
406
407 if !export_self.storage_type.is_remote_storage() {
409 let db_dir = format!("{}/{}/", export_self.catalog, schema);
410 operator.create_dir(&db_dir).await.context(OpenDalSnafu)?;
411 }
412
413 let file_path = export_self.get_file_path(&schema, "create_tables.sql");
414 let mut content = Vec::new();
415
416 for (c, s, t) in metric_physical_tables.iter().chain(&remaining_tables) {
418 let create_table = export_self.show_create("TABLE", c, s, Some(t)).await?;
419 content.extend_from_slice(create_table.as_bytes());
420 }
421
422 for (c, s, v) in &views {
424 let create_view = export_self.show_create("VIEW", c, s, Some(v)).await?;
425 content.extend_from_slice(create_view.as_bytes());
426 }
427
428 export_self
430 .write_to_storage(&operator, &file_path, content)
431 .await?;
432
433 info!(
434 "Finished exporting {}.{schema} with {} table schemas to path: {}",
435 export_self.catalog,
436 metric_physical_tables.len() + remaining_tables.len() + views.len(),
437 export_self.storage_type.format_output_path(&file_path)
438 );
439
440 Ok::<(), Error>(())
441 });
442 }
443
444 let success = self.execute_tasks(tasks).await;
445 let elapsed = timer.elapsed();
446 info!("Success {success}/{db_count} jobs, cost: {elapsed:?}");
447
448 Ok(())
449 }
450
451 async fn build_operator(&self) -> Result<ObjectStore> {
452 Ok(self.operator.clone())
453 }
454
455 async fn build_prefer_fs_operator(&self) -> Result<ObjectStore> {
457 if self.storage_type.is_remote_storage() && self.ddl_local_dir.is_some() {
458 let root = self.ddl_local_dir.as_ref().unwrap().clone();
459 let op = new_fs_object_store(&root).map_err(|e| Error::Other {
460 source: e,
461 location: snafu::location!(),
462 })?;
463 Ok(op)
464 } else {
465 Ok(self.operator.clone())
466 }
467 }
468
469 async fn export_database_data(&self) -> Result<()> {
470 let timer = Instant::now();
471 let semaphore = Arc::new(Semaphore::new(self.export_jobs));
472 let db_names = self.get_db_names().await?;
473 let db_count = db_names.len();
474 let mut tasks = Vec::with_capacity(db_count);
475 let operator = Arc::new(self.build_operator().await?);
476 let fs_first_operator = Arc::new(self.build_prefer_fs_operator().await?);
477 let with_options = build_with_options(&self.start_time, &self.end_time, self.parallelism);
478
479 for schema in db_names {
480 let semaphore_moved = semaphore.clone();
481 let export_self = self.clone();
482 let with_options_clone = with_options.clone();
483 let operator = operator.clone();
484 let fs_first_operator = fs_first_operator.clone();
485
486 tasks.push(async move {
487 let _permit = semaphore_moved.acquire().await.unwrap();
488
489 if !export_self.storage_type.is_remote_storage() {
491 let db_dir = format!("{}/{}/", export_self.catalog, schema);
492 operator.create_dir(&db_dir).await.context(OpenDalSnafu)?;
493 }
494
495 let (path, connection_part) = export_self
496 .storage_type
497 .get_storage_path(&export_self.catalog, &schema);
498
499 let sql = format!(
501 r#"COPY DATABASE "{}"."{}" TO '{}' WITH ({}){};"#,
502 export_self.catalog, schema, path, with_options_clone, connection_part
503 );
504
505 let safe_sql = export_self.storage_type.mask_sensitive_info(&sql);
507 info!("Executing sql: {}", safe_sql);
508
509 export_self.database_client.sql_in_public(&sql).await?;
510 info!(
511 "Finished exporting {}.{} data to {}",
512 export_self.catalog, schema, path
513 );
514
515 let copy_database_from_sql = {
517 let command_without_connection = format!(
518 r#"COPY DATABASE "{}"."{}" FROM '{}' WITH ({});"#,
519 export_self.catalog, schema, COPY_PATH_PLACEHOLDER, with_options_clone
520 );
521
522 if connection_part.is_empty() {
523 command_without_connection
524 } else {
525 let command_with_connection = format!(
526 r#"COPY DATABASE "{}"."{}" FROM '{}' WITH ({}){};"#,
527 export_self.catalog, schema, path, with_options_clone, connection_part
528 );
529
530 format!(
531 "-- {}\n{}",
532 command_with_connection, command_without_connection
533 )
534 }
535 };
536
537 let copy_from_path = export_self.get_file_path(&schema, "copy_from.sql");
538 export_self
539 .write_to_storage(
540 &fs_first_operator,
541 ©_from_path,
542 copy_database_from_sql.into_bytes(),
543 )
544 .await?;
545
546 info!(
547 "Finished exporting {}.{} copy_from.sql to {}",
548 export_self.catalog,
549 schema,
550 export_self.storage_type.format_output_path(©_from_path)
551 );
552
553 Ok::<(), Error>(())
554 });
555 }
556
557 let success = self.execute_tasks(tasks).await;
558 let elapsed = timer.elapsed();
559 info!("Success {success}/{db_count} jobs, costs: {elapsed:?}");
560
561 Ok(())
562 }
563
564 fn get_file_path(&self, schema: &str, file_name: &str) -> String {
565 format!("{}/{}/{}", self.catalog, schema, file_name)
566 }
567
568 async fn write_to_storage(
569 &self,
570 op: &ObjectStore,
571 file_path: &str,
572 content: Vec<u8>,
573 ) -> Result<()> {
574 op.write(file_path, content)
575 .await
576 .context(OpenDalSnafu)
577 .map(|_| ())
578 }
579
580 async fn execute_tasks(
581 &self,
582 tasks: Vec<impl std::future::Future<Output = Result<()>>>,
583 ) -> usize {
584 futures::future::join_all(tasks)
585 .await
586 .into_iter()
587 .filter(|r| match r {
588 Ok(_) => true,
589 Err(e) => {
590 error!(e; "export job failed");
591 false
592 }
593 })
594 .count()
595 }
596}
597
598#[async_trait]
599impl Tool for Export {
600 async fn do_work(&self) -> std::result::Result<(), BoxedError> {
601 match self.target {
602 ExportTarget::Schema => {
603 self.export_create_database()
604 .await
605 .map_err(BoxedError::new)?;
606 self.export_create_table().await.map_err(BoxedError::new)
607 }
608 ExportTarget::Data => self.export_database_data().await.map_err(BoxedError::new),
609 ExportTarget::All => {
610 self.export_create_database()
611 .await
612 .map_err(BoxedError::new)?;
613 self.export_create_table().await.map_err(BoxedError::new)?;
614 self.export_database_data().await.map_err(BoxedError::new)
615 }
616 }
617 }
618}
619
620fn build_with_options(
622 start_time: &Option<String>,
623 end_time: &Option<String>,
624 parallelism: usize,
625) -> String {
626 let mut options = vec!["format = 'parquet'".to_string()];
627 if let Some(start) = start_time {
628 options.push(format!("start_time = '{}'", start));
629 }
630 if let Some(end) = end_time {
631 options.push(format!("end_time = '{}'", end));
632 }
633 options.push(format!("parallelism = {}", parallelism));
634 options.join(", ")
635}
636
637#[cfg(test)]
638mod tests {
639 use clap::Parser;
640 use common_test_util::temp_dir::create_temp_dir;
641
642 use super::*;
643
644 const MOCK_AZBLOB_ACCOUNT_KEY_B64: &str = "dGVzdC1rZXk=";
645
646 #[tokio::test]
649 async fn test_export_command_build_with_local_fs() {
650 let temp_dir = create_temp_dir("test_export_local_fs");
651 let output_dir = temp_dir.path().to_str().unwrap();
652
653 let cmd = ExportCommand::parse_from([
654 "export",
655 "--addr",
656 "127.0.0.1:4000",
657 "--output-dir",
658 output_dir,
659 ]);
660
661 let result = cmd.build().await;
662 assert!(result.is_ok());
663 }
664
665 #[tokio::test]
666 async fn test_export_command_build_with_s3_success() {
667 let cmd = ExportCommand::parse_from([
668 "export",
669 "--addr",
670 "127.0.0.1:4000",
671 "--s3",
672 "--s3-bucket",
673 "test-bucket",
674 "--s3-root",
675 "test-root",
676 "--s3-access-key-id",
677 "test-key",
678 "--s3-secret-access-key",
679 "test-secret",
680 "--s3-region",
682 "us-west-2",
683 "--s3-endpoint",
684 "https://s3.amazonaws.com",
685 ]);
686
687 let result = cmd.build().await;
688 assert!(result.is_ok());
689 }
690
691 #[tokio::test]
692 async fn test_export_command_build_with_oss_success() {
693 let cmd = ExportCommand::parse_from([
694 "export",
695 "--addr",
696 "127.0.0.1:4000",
697 "--oss",
698 "--oss-bucket",
699 "test-bucket",
700 "--oss-root",
701 "test-root",
702 "--oss-access-key-id",
703 "test-key-id",
704 "--oss-access-key-secret",
705 "test-secret",
706 "--oss-endpoint",
707 "https://oss.example.com",
708 ]);
709
710 let result = cmd.build().await;
711 assert!(result.is_ok());
712 }
713
714 #[tokio::test]
715 async fn test_export_command_build_with_gcs_success() {
716 let cmd = ExportCommand::parse_from([
717 "export",
718 "--addr",
719 "127.0.0.1:4000",
720 "--gcs",
721 "--gcs-bucket",
722 "test-bucket",
723 "--gcs-root",
724 "test-root",
725 "--gcs-scope",
726 "test-scope",
727 "--gcs-credential-path",
728 "/path/to/credential",
729 "--gcs-credential",
730 "test-credential-content",
731 "--gcs-endpoint",
732 "https://storage.googleapis.com",
733 ]);
734
735 let result = cmd.build().await;
736 assert!(result.is_ok());
737 }
738
739 #[tokio::test]
740 async fn test_export_command_build_with_gcs_adc_success() {
741 let cmd = ExportCommand::parse_from([
743 "export",
744 "--addr",
745 "127.0.0.1:4000",
746 "--gcs",
747 "--gcs-bucket",
748 "test-bucket",
749 "--gcs-root",
750 "test-root",
751 "--gcs-scope",
752 "test-scope",
753 ]);
756
757 let result = cmd.build().await;
758 assert!(result.is_ok());
759 }
760
761 #[tokio::test]
762 async fn test_export_command_build_with_azblob_success() {
763 let cmd = ExportCommand::parse_from([
764 "export",
765 "--addr",
766 "127.0.0.1:4000",
767 "--azblob",
768 "--azblob-container",
769 "test-container",
770 "--azblob-root",
771 "test-root",
772 "--azblob-account-name",
773 "test-account",
774 "--azblob-account-key",
775 MOCK_AZBLOB_ACCOUNT_KEY_B64,
776 "--azblob-endpoint",
777 "https://account.blob.core.windows.net",
778 ]);
779
780 let result = cmd.build().await;
781 assert!(result.is_ok());
782 }
783
784 #[tokio::test]
785 async fn test_export_command_build_with_azblob_with_sas_token() {
786 let cmd = ExportCommand::parse_from([
788 "export",
789 "--addr",
790 "127.0.0.1:4000",
791 "--azblob",
792 "--azblob-container",
793 "test-container",
794 "--azblob-root",
795 "test-root",
796 "--azblob-account-name",
797 "test-account",
798 "--azblob-account-key",
799 MOCK_AZBLOB_ACCOUNT_KEY_B64,
800 "--azblob-endpoint",
801 "https://account.blob.core.windows.net",
802 "--azblob-sas-token",
803 "test-sas-token",
804 ]);
805
806 let result = cmd.build().await;
807 assert!(result.is_ok());
808 }
809
810 #[test]
813 fn test_export_command_build_with_conflict() {
814 let result =
816 ExportCommand::try_parse_from(["export", "--addr", "127.0.0.1:4000", "--s3", "--oss"]);
817
818 assert!(result.is_err());
819 let err = result.unwrap_err();
820 assert!(err.kind() == clap::error::ErrorKind::ArgumentConflict);
822 }
823
824 #[tokio::test]
825 async fn test_export_command_build_with_s3_no_enable_flag() {
826 let result = ExportCommand::try_parse_from([
828 "export",
829 "--addr",
830 "127.0.0.1:4000",
831 "--s3-bucket",
833 "test-bucket",
834 "--s3-access-key-id",
835 "test-key",
836 "--output-dir",
837 "/tmp/test",
838 ]);
839
840 assert!(result.is_err());
841 let err = result.unwrap_err();
842 assert_eq!(err.kind(), clap::error::ErrorKind::MissingRequiredArgument);
843 assert!(err.to_string().contains("--s3"));
844 }
845
846 #[tokio::test]
847 async fn test_export_command_build_with_oss_no_enable_flag() {
848 let result = ExportCommand::try_parse_from([
850 "export",
851 "--addr",
852 "127.0.0.1:4000",
853 "--oss-bucket",
854 "test-bucket",
855 "--output-dir",
856 "/tmp/test",
857 ]);
858
859 assert!(result.is_err());
860 let err = result.unwrap_err();
861 assert_eq!(err.kind(), clap::error::ErrorKind::MissingRequiredArgument);
862 assert!(err.to_string().contains("--oss"));
863 }
864
865 #[tokio::test]
866 async fn test_export_command_build_with_gcs_no_enable_flag() {
867 let result = ExportCommand::try_parse_from([
869 "export",
870 "--addr",
871 "127.0.0.1:4000",
872 "--gcs-bucket",
873 "test-bucket",
874 "--output-dir",
875 "/tmp/test",
876 ]);
877
878 assert!(result.is_err());
879 let err = result.unwrap_err();
880 assert_eq!(err.kind(), clap::error::ErrorKind::MissingRequiredArgument);
881 assert!(err.to_string().contains("--gcs"));
882 }
883
884 #[tokio::test]
885 async fn test_export_command_build_with_azblob_no_enable_flag() {
886 let result = ExportCommand::try_parse_from([
888 "export",
889 "--addr",
890 "127.0.0.1:4000",
891 "--azblob-container",
892 "test-container",
893 "--output-dir",
894 "/tmp/test",
895 ]);
896
897 assert!(result.is_err());
898 let err = result.unwrap_err();
899 assert_eq!(err.kind(), clap::error::ErrorKind::MissingRequiredArgument);
900 assert!(err.to_string().contains("--azblob"));
901 }
902
903 #[tokio::test]
906 async fn test_export_command_build_with_s3_empty_root() {
907 let cmd = ExportCommand::parse_from([
909 "export",
910 "--addr",
911 "127.0.0.1:4000",
912 "--s3",
913 "--s3-bucket",
914 "test-bucket",
915 "--s3-root",
916 "", "--s3-access-key-id",
918 "test-key",
919 "--s3-secret-access-key",
920 "test-secret",
921 "--s3-region",
922 "us-west-2",
923 ]);
924
925 let result = cmd.build().await;
926 assert!(
928 result.is_ok(),
929 "Expected success but got: {:?}",
930 result.err()
931 );
932 }
933
934 #[tokio::test]
935 async fn test_export_command_build_with_oss_empty_access_key_id() {
936 let cmd = ExportCommand::parse_from([
938 "export",
939 "--addr",
940 "127.0.0.1:4000",
941 "--oss",
942 "--oss-bucket",
943 "test-bucket",
944 "--oss-access-key-id",
945 "", "--oss-access-key-secret",
947 "test-secret",
948 "--oss-endpoint",
949 "https://oss.example.com",
950 ]);
951
952 let result = cmd.build().await;
953 assert!(result.is_err());
954 if let Err(err) = result {
955 assert!(
956 err.to_string().contains("OSS access key ID must be set"),
957 "Actual error: {}",
958 err
959 );
960 }
961 }
962
963 #[tokio::test]
964 async fn test_export_command_build_with_oss_missing_endpoint() {
965 let cmd = ExportCommand::parse_from([
967 "export",
968 "--addr",
969 "127.0.0.1:4000",
970 "--oss",
971 "--oss-bucket",
972 "test-bucket",
973 "--oss-root",
974 "test-root",
975 "--oss-access-key-id",
976 "test-key-id",
977 "--oss-access-key-secret",
978 "test-secret",
979 ]);
980
981 let result = cmd.build().await;
982 assert!(result.is_err());
983 if let Err(err) = result {
984 assert!(
985 err.to_string().contains("OSS endpoint must be set"),
986 "Actual error: {}",
987 err
988 );
989 }
990 }
991
992 #[tokio::test]
993 async fn test_export_command_build_with_oss_multiple_missing_fields() {
994 let cmd = ExportCommand::parse_from([
996 "export",
997 "--addr",
998 "127.0.0.1:4000",
999 "--oss",
1000 "--oss-bucket",
1001 "test-bucket",
1002 ]);
1004
1005 let result = cmd.build().await;
1006 assert!(result.is_err());
1007 if let Err(err) = result {
1008 let err_str = err.to_string();
1009 assert!(
1011 err_str.contains("OSS"),
1012 "Error should mention OSS: {}",
1013 err_str
1014 );
1015 assert!(
1016 err_str.contains("must be set"),
1017 "Error should mention required fields: {}",
1018 err_str
1019 );
1020 }
1021 }
1022
1023 #[tokio::test]
1024 async fn test_export_command_build_with_gcs_empty_bucket() {
1025 let cmd = ExportCommand::parse_from([
1027 "export",
1028 "--addr",
1029 "127.0.0.1:4000",
1030 "--gcs",
1031 "--gcs-bucket",
1032 "", "--gcs-root",
1034 "test-root",
1035 "--gcs-scope",
1036 "test-scope",
1037 ]);
1038
1039 let result = cmd.build().await;
1040 assert!(result.is_err());
1041 if let Err(err) = result {
1042 assert!(
1043 err.to_string().contains("GCS bucket must be set"),
1044 "Actual error: {}",
1045 err
1046 );
1047 }
1048 }
1049
1050 #[tokio::test]
1051 async fn test_export_command_build_with_gcs_empty_root() {
1052 let cmd = ExportCommand::parse_from([
1054 "export",
1055 "--addr",
1056 "127.0.0.1:4000",
1057 "--gcs",
1058 "--gcs-bucket",
1059 "test-bucket",
1060 "--gcs-root",
1061 "", "--gcs-scope",
1063 "test-scope",
1064 "--gcs-credential-path",
1065 "/path/to/credential",
1066 "--gcs-credential",
1067 "test-credential",
1068 "--gcs-endpoint",
1069 "https://storage.googleapis.com",
1070 ]);
1071
1072 let result = cmd.build().await;
1073 assert!(result.is_err());
1074 if let Err(err) = result {
1075 assert!(
1076 err.to_string().contains("GCS root must be set"),
1077 "Actual error: {}",
1078 err
1079 );
1080 }
1081 }
1082
1083 #[tokio::test]
1084 async fn test_export_command_build_with_azblob_empty_account_name() {
1085 let cmd = ExportCommand::parse_from([
1087 "export",
1088 "--addr",
1089 "127.0.0.1:4000",
1090 "--azblob",
1091 "--azblob-container",
1092 "test-container",
1093 "--azblob-root",
1094 "test-root",
1095 "--azblob-account-name",
1096 "", "--azblob-account-key",
1098 MOCK_AZBLOB_ACCOUNT_KEY_B64,
1099 "--azblob-endpoint",
1100 "https://account.blob.core.windows.net",
1101 ]);
1102
1103 let result = cmd.build().await;
1104 assert!(result.is_err());
1105 if let Err(err) = result {
1106 assert!(
1107 err.to_string().contains("AzBlob account name must be set"),
1108 "Actual error: {}",
1109 err
1110 );
1111 }
1112 }
1113
1114 #[tokio::test]
1115 async fn test_export_command_build_with_azblob_missing_account_key() {
1116 let cmd = ExportCommand::parse_from([
1118 "export",
1119 "--addr",
1120 "127.0.0.1:4000",
1121 "--azblob",
1122 "--azblob-container",
1123 "test-container",
1124 "--azblob-root",
1125 "test-root",
1126 "--azblob-account-name",
1127 "test-account",
1128 "--azblob-endpoint",
1129 "https://account.blob.core.windows.net",
1130 ]);
1131
1132 let result = cmd.build().await;
1133 assert!(result.is_err());
1134 if let Err(err) = result {
1135 assert!(
1136 err.to_string()
1137 .contains("AzBlob account key (when sas_token is not provided) must be set"),
1138 "Actual error: {}",
1139 err
1140 );
1141 }
1142 }
1143
1144 #[tokio::test]
1147 async fn test_export_command_build_with_no_storage() {
1148 let cmd = ExportCommand::parse_from(["export", "--addr", "127.0.0.1:4000"]);
1150
1151 let result = cmd.build().await;
1152 assert!(result.is_err());
1153 if let Err(err) = result {
1154 assert!(
1155 err.to_string().contains("Output directory not set"),
1156 "Actual error: {}",
1157 err
1158 );
1159 }
1160 }
1161
1162 #[tokio::test]
1163 async fn test_export_command_build_with_s3_minimal_config() {
1164 let cmd = ExportCommand::parse_from([
1166 "export",
1167 "--addr",
1168 "127.0.0.1:4000",
1169 "--s3",
1170 "--s3-bucket",
1171 "test-bucket",
1172 "--s3-access-key-id",
1173 "test-key",
1174 "--s3-secret-access-key",
1175 "test-secret",
1176 "--s3-region",
1177 "us-west-2",
1178 ]);
1180
1181 let result = cmd.build().await;
1182 assert!(result.is_ok(), "Minimal S3 config should succeed");
1183 }
1184
1185 #[tokio::test]
1186 async fn test_export_command_build_with_oss_minimal_config() {
1187 let cmd = ExportCommand::parse_from([
1189 "export",
1190 "--addr",
1191 "127.0.0.1:4000",
1192 "--oss",
1193 "--oss-bucket",
1194 "test-bucket",
1195 "--oss-access-key-id",
1196 "test-key-id",
1197 "--oss-access-key-secret",
1198 "test-secret",
1199 "--oss-endpoint",
1200 "https://oss.example.com",
1201 ]);
1203
1204 let result = cmd.build().await;
1205 assert!(result.is_ok(), "Minimal OSS config should succeed");
1206 }
1207
1208 #[tokio::test]
1209 async fn test_export_command_build_with_gcs_minimal_config() {
1210 let cmd = ExportCommand::parse_from([
1212 "export",
1213 "--addr",
1214 "127.0.0.1:4000",
1215 "--gcs",
1216 "--gcs-bucket",
1217 "test-bucket",
1218 "--gcs-root",
1219 "test-root",
1220 "--gcs-scope",
1221 "test-scope",
1222 ]);
1224
1225 let result = cmd.build().await;
1226 assert!(result.is_ok(), "Minimal GCS config should succeed");
1227 }
1228
1229 #[tokio::test]
1230 async fn test_export_command_build_with_azblob_minimal_config() {
1231 let cmd = ExportCommand::parse_from([
1233 "export",
1234 "--addr",
1235 "127.0.0.1:4000",
1236 "--azblob",
1237 "--azblob-container",
1238 "test-container",
1239 "--azblob-root",
1240 "test-root",
1241 "--azblob-account-name",
1242 "test-account",
1243 "--azblob-account-key",
1244 MOCK_AZBLOB_ACCOUNT_KEY_B64,
1245 "--azblob-endpoint",
1246 "https://account.blob.core.windows.net",
1247 ]);
1249
1250 let result = cmd.build().await;
1251 assert!(result.is_ok(), "Minimal AzBlob config should succeed");
1252 }
1253
1254 #[tokio::test]
1255 async fn test_export_command_build_with_local_and_s3() {
1256 let temp_dir = create_temp_dir("test_export_local_and_s3");
1258 let output_dir = temp_dir.path().to_str().unwrap();
1259
1260 let cmd = ExportCommand::parse_from([
1261 "export",
1262 "--addr",
1263 "127.0.0.1:4000",
1264 "--output-dir",
1265 output_dir,
1266 "--s3",
1267 "--s3-bucket",
1268 "test-bucket",
1269 "--s3-access-key-id",
1270 "test-key",
1271 "--s3-secret-access-key",
1272 "test-secret",
1273 "--s3-region",
1274 "us-west-2",
1275 ]);
1276
1277 let result = cmd.build().await;
1278 assert!(
1279 result.is_ok(),
1280 "S3 should be selected when both are provided"
1281 );
1282 }
1283
1284 #[tokio::test]
1287 async fn test_export_command_build_with_azblob_only_sas_token() {
1288 let cmd = ExportCommand::parse_from([
1290 "export",
1291 "--addr",
1292 "127.0.0.1:4000",
1293 "--azblob",
1294 "--azblob-container",
1295 "test-container",
1296 "--azblob-root",
1297 "test-root",
1298 "--azblob-account-name",
1299 "test-account",
1300 "--azblob-endpoint",
1301 "https://account.blob.core.windows.net",
1302 "--azblob-sas-token",
1303 "test-sas-token",
1304 ]);
1306
1307 let result = cmd.build().await;
1308 assert!(
1309 result.is_ok(),
1310 "AzBlob with only sas_token should succeed: {:?}",
1311 result.err()
1312 );
1313 }
1314
1315 #[tokio::test]
1316 async fn test_export_command_build_with_azblob_empty_account_key_with_sas() {
1317 let cmd = ExportCommand::parse_from([
1319 "export",
1320 "--addr",
1321 "127.0.0.1:4000",
1322 "--azblob",
1323 "--azblob-container",
1324 "test-container",
1325 "--azblob-root",
1326 "test-root",
1327 "--azblob-account-name",
1328 "test-account",
1329 "--azblob-account-key",
1330 "", "--azblob-endpoint",
1332 "https://account.blob.core.windows.net",
1333 "--azblob-sas-token",
1334 "test-sas-token",
1335 ]);
1336
1337 let result = cmd.build().await;
1338 assert!(
1339 result.is_ok(),
1340 "AzBlob with empty account_key but sas_token should succeed: {:?}",
1341 result.err()
1342 );
1343 }
1344}