Skip to main content

cli/data/
export.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::sync::Arc;
17use std::time::Duration;
18
19use async_trait::async_trait;
20use clap::{Parser, ValueEnum};
21use common_error::ext::BoxedError;
22use common_telemetry::{debug, error, info};
23use object_store::ObjectStore;
24use serde_json::Value;
25use snafu::{OptionExt, ResultExt};
26use tokio::sync::Semaphore;
27use tokio::time::Instant;
28
29use crate::common::{ObjectStoreConfig, new_fs_object_store};
30use crate::data::storage_export::{
31    AzblobBackend, FsBackend, GcsBackend, OssBackend, S3Backend, StorageExport, StorageType,
32};
33use crate::data::{COPY_PATH_PLACEHOLDER, default_database};
34use crate::database::{DatabaseClient, parse_proxy_opts};
35use crate::error::{
36    EmptyResultSnafu, Error, OpenDalSnafu, OutputDirNotSetSnafu, Result, SchemaNotFoundSnafu,
37};
38use crate::{Tool, database};
39
40type TableReference = (String, String, String);
41
42#[derive(Debug, Default, Clone, ValueEnum)]
43enum ExportTarget {
44    /// Export all table schemas, corresponding to `SHOW CREATE TABLE`.
45    Schema,
46    /// Export all table data, corresponding to `COPY DATABASE TO`.
47    Data,
48    /// Export all table schemas and data at once.
49    #[default]
50    All,
51}
52
53/// Command for exporting data from the GreptimeDB.
54#[derive(Debug, Default, Parser)]
55pub struct ExportCommand {
56    /// Server address to connect
57    #[clap(long)]
58    addr: String,
59
60    /// Directory to put the exported data. E.g.: /tmp/greptimedb-export
61    /// for local export.
62    #[clap(long)]
63    output_dir: Option<String>,
64
65    /// The name of the catalog to export.
66    #[clap(long, default_value_t = default_database())]
67    database: String,
68
69    /// The number of databases exported in parallel.
70    /// For example, if there are 20 databases and `db_parallelism` is 4,
71    /// 4 databases will be exported concurrently.
72    #[clap(long, short = 'j', default_value = "1", alias = "export-jobs")]
73    db_parallelism: usize,
74
75    /// The number of tables exported in parallel within a single database.
76    /// For example, if a database has 30 tables and `parallelism` is 8,
77    /// 8 tables will be exported concurrently.
78    #[clap(long, default_value = "4")]
79    table_parallelism: usize,
80
81    /// Max retry times for each job.
82    #[clap(long, default_value = "3")]
83    max_retry: usize,
84
85    /// Things to export
86    #[clap(long, short = 't', value_enum, default_value = "all")]
87    target: ExportTarget,
88
89    /// A half-open time range: [start_time, end_time).
90    /// The start of the time range (time-index column) for data export.
91    #[clap(long)]
92    start_time: Option<String>,
93
94    /// A half-open time range: [start_time, end_time).
95    /// The end of the time range (time-index column) for data export.
96    #[clap(long)]
97    end_time: Option<String>,
98
99    /// The basic authentication for connecting to the server
100    #[clap(long)]
101    auth_basic: Option<String>,
102
103    /// The timeout of invoking the database.
104    ///
105    /// It is used to override the server-side timeout setting.
106    /// The default behavior will disable server-side default timeout(i.e. `0s`).
107    #[clap(long, value_parser = humantime::parse_duration)]
108    timeout: Option<Duration>,
109
110    /// The proxy server address to connect.
111    ///
112    /// If set, it overrides the system proxy unless `--no-proxy` is specified.
113    /// If neither `--proxy` nor `--no-proxy` is set, system proxy (env) may be used.
114    #[clap(long)]
115    proxy: Option<String>,
116
117    /// Disable all proxy usage (ignores `--proxy` and system proxy).
118    ///
119    /// When set and `--proxy` is not provided, this explicitly disables system proxy.
120    #[clap(long)]
121    no_proxy: bool,
122
123    /// if both `ddl_local_dir` and remote storage are set, `ddl_local_dir` will be only used for
124    /// exported SQL files, and the data will be exported to remote storage.
125    ///
126    /// Note that `ddl_local_dir` export sql files to **LOCAL** file system, this is useful if export client don't have
127    /// direct access to remote storage.
128    ///
129    /// if remote storage is set but `ddl_local_dir` is not set, both SQL&data will be exported to remote storage.
130    #[clap(long)]
131    ddl_local_dir: Option<String>,
132
133    #[clap(flatten)]
134    storage: ObjectStoreConfig,
135}
136
137impl ExportCommand {
138    pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
139        // Determine storage type
140        let (storage_type, operator) = if self.storage.enable_s3 {
141            (
142                StorageType::S3(S3Backend::new(self.storage.s3.clone())?),
143                self.storage.build_s3()?,
144            )
145        } else if self.storage.enable_oss {
146            (
147                StorageType::Oss(OssBackend::new(self.storage.oss.clone())?),
148                self.storage.build_oss()?,
149            )
150        } else if self.storage.enable_gcs {
151            (
152                StorageType::Gcs(GcsBackend::new(self.storage.gcs.clone())?),
153                self.storage.build_gcs()?,
154            )
155        } else if self.storage.enable_azblob {
156            (
157                StorageType::Azblob(AzblobBackend::new(self.storage.azblob.clone())?),
158                self.storage.build_azblob()?,
159            )
160        } else if let Some(output_dir) = &self.output_dir {
161            (
162                StorageType::Fs(FsBackend::new(output_dir.clone())),
163                new_fs_object_store(output_dir)?,
164            )
165        } else {
166            return Err(BoxedError::new(OutputDirNotSetSnafu {}.build()));
167        };
168
169        let (catalog, schema) =
170            database::split_database(&self.database).map_err(BoxedError::new)?;
171        let proxy = parse_proxy_opts(self.proxy.clone(), self.no_proxy)?;
172        let database_client = DatabaseClient::new(
173            self.addr.clone(),
174            catalog.clone(),
175            self.auth_basic.clone(),
176            // Treats `None` as `0s` to disable server-side default timeout.
177            self.timeout.unwrap_or_default(),
178            proxy,
179            self.no_proxy,
180        );
181
182        Ok(Box::new(Export {
183            catalog,
184            schema,
185            database_client,
186            export_jobs: self.db_parallelism,
187            target: self.target.clone(),
188            start_time: self.start_time.clone(),
189            end_time: self.end_time.clone(),
190            parallelism: self.table_parallelism,
191            storage_type,
192            ddl_local_dir: self.ddl_local_dir.clone(),
193            operator,
194        }))
195    }
196}
197
198#[derive(Clone)]
199pub struct Export {
200    catalog: String,
201    schema: Option<String>,
202    database_client: DatabaseClient,
203    export_jobs: usize,
204    target: ExportTarget,
205    start_time: Option<String>,
206    end_time: Option<String>,
207    parallelism: usize,
208    storage_type: StorageType,
209    ddl_local_dir: Option<String>,
210    operator: ObjectStore,
211}
212
213impl Export {
214    async fn get_db_names(&self) -> Result<Vec<String>> {
215        let db_names = self.all_db_names().await?;
216        let Some(schema) = &self.schema else {
217            return Ok(db_names);
218        };
219
220        // Check if the schema exists
221        db_names
222            .into_iter()
223            .find(|db_name| db_name.to_lowercase() == schema.to_lowercase())
224            .map(|name| vec![name])
225            .context(SchemaNotFoundSnafu {
226                catalog: &self.catalog,
227                schema,
228            })
229    }
230
231    /// Iterate over all db names.
232    async fn all_db_names(&self) -> Result<Vec<String>> {
233        let records = self
234            .database_client
235            .sql_in_public("SHOW DATABASES")
236            .await?
237            .context(EmptyResultSnafu)?;
238        let mut result = Vec::with_capacity(records.len());
239        for value in records {
240            let Value::String(schema) = &value[0] else {
241                unreachable!()
242            };
243            if schema == common_catalog::consts::INFORMATION_SCHEMA_NAME {
244                continue;
245            }
246            if schema == common_catalog::consts::PG_CATALOG_NAME {
247                continue;
248            }
249            result.push(schema.clone());
250        }
251        Ok(result)
252    }
253
254    /// Return a list of [`TableReference`] to be exported.
255    /// Includes all tables under the given `catalog` and `schema`.
256    async fn get_table_list(
257        &self,
258        catalog: &str,
259        schema: &str,
260    ) -> Result<(
261        Vec<TableReference>,
262        Vec<TableReference>,
263        Vec<TableReference>,
264    )> {
265        // Puts all metric table first
266        let sql = format!(
267            "SELECT table_catalog, table_schema, table_name \
268            FROM information_schema.columns \
269            WHERE column_name = '__tsid' \
270                and table_catalog = \'{catalog}\' \
271                and table_schema = \'{schema}\'"
272        );
273        let records = self
274            .database_client
275            .sql_in_public(&sql)
276            .await?
277            .context(EmptyResultSnafu)?;
278        let mut metric_physical_tables = HashSet::with_capacity(records.len());
279        for value in records {
280            let mut t = Vec::with_capacity(3);
281            for v in &value {
282                let Value::String(value) = v else {
283                    unreachable!()
284                };
285                t.push(value);
286            }
287            metric_physical_tables.insert((t[0].clone(), t[1].clone(), t[2].clone()));
288        }
289
290        let sql = format!(
291            "SELECT table_catalog, table_schema, table_name, table_type \
292            FROM information_schema.tables \
293            WHERE (table_type = \'BASE TABLE\' OR table_type = \'VIEW\') \
294                and table_catalog = \'{catalog}\' \
295                and table_schema = \'{schema}\'",
296        );
297        let records = self
298            .database_client
299            .sql_in_public(&sql)
300            .await?
301            .context(EmptyResultSnafu)?;
302
303        debug!("Fetched table/view list: {:?}", records);
304
305        if records.is_empty() {
306            return Ok((vec![], vec![], vec![]));
307        }
308
309        let mut remaining_tables = Vec::with_capacity(records.len());
310        let mut views = Vec::new();
311        for value in records {
312            let mut t = Vec::with_capacity(4);
313            for v in &value {
314                let Value::String(value) = v else {
315                    unreachable!()
316                };
317                t.push(value);
318            }
319            let table = (t[0].clone(), t[1].clone(), t[2].clone());
320            let table_type = t[3].as_str();
321            // Ignores the physical table
322            if !metric_physical_tables.contains(&table) {
323                if table_type == "VIEW" {
324                    views.push(table);
325                } else {
326                    remaining_tables.push(table);
327                }
328            }
329        }
330
331        Ok((
332            metric_physical_tables.into_iter().collect(),
333            remaining_tables,
334            views,
335        ))
336    }
337
338    async fn show_create(
339        &self,
340        show_type: &str,
341        catalog: &str,
342        schema: &str,
343        table: Option<&str>,
344    ) -> Result<String> {
345        let sql = match table {
346            Some(table) => format!(
347                r#"SHOW CREATE {} "{}"."{}"."{}""#,
348                show_type, catalog, schema, table
349            ),
350            None => format!(r#"SHOW CREATE {} "{}"."{}""#, show_type, catalog, schema),
351        };
352        let records = self
353            .database_client
354            .sql_in_public(&sql)
355            .await?
356            .context(EmptyResultSnafu)?;
357        let Value::String(create) = &records[0][1] else {
358            unreachable!()
359        };
360
361        Ok(format!("{};\n", create))
362    }
363
364    async fn export_create_database(&self) -> Result<()> {
365        let timer = Instant::now();
366        let db_names = self.get_db_names().await?;
367        let db_count = db_names.len();
368        let operator = self.build_prefer_fs_operator().await?;
369
370        for schema in db_names {
371            let create_database = self
372                .show_create("DATABASE", &self.catalog, &schema, None)
373                .await?;
374
375            let file_path = self.get_file_path(&schema, "create_database.sql");
376            self.write_to_storage(&operator, &file_path, create_database.into_bytes())
377                .await?;
378
379            info!(
380                "Exported {}.{} database creation SQL to {}",
381                self.catalog,
382                schema,
383                self.storage_type.format_output_path(&file_path)
384            );
385        }
386
387        let elapsed = timer.elapsed();
388        info!("Success {db_count} jobs, cost: {elapsed:?}");
389
390        Ok(())
391    }
392
393    async fn export_create_table(&self) -> Result<()> {
394        let timer = Instant::now();
395        let semaphore = Arc::new(Semaphore::new(self.export_jobs));
396        let db_names = self.get_db_names().await?;
397        let db_count = db_names.len();
398        let operator = Arc::new(self.build_prefer_fs_operator().await?);
399        let mut tasks = Vec::with_capacity(db_names.len());
400
401        for schema in db_names {
402            let semaphore_moved = semaphore.clone();
403            let export_self = self.clone();
404            let operator = operator.clone();
405            tasks.push(async move {
406                let _permit = semaphore_moved.acquire().await.unwrap();
407                let (metric_physical_tables, remaining_tables, views) = export_self
408                    .get_table_list(&export_self.catalog, &schema)
409                    .await?;
410
411                // Create directory if needed for file system storage
412                if !export_self.storage_type.is_remote_storage() {
413                    let db_dir = format!("{}/{}/", export_self.catalog, schema);
414                    operator.create_dir(&db_dir).await.context(OpenDalSnafu)?;
415                }
416
417                let file_path = export_self.get_file_path(&schema, "create_tables.sql");
418                let mut content = Vec::new();
419
420                // Add table creation SQL
421                for (c, s, t) in metric_physical_tables.iter().chain(&remaining_tables) {
422                    let create_table = export_self.show_create("TABLE", c, s, Some(t)).await?;
423                    content.extend_from_slice(create_table.as_bytes());
424                }
425
426                // Add view creation SQL
427                for (c, s, v) in &views {
428                    let create_view = export_self.show_create("VIEW", c, s, Some(v)).await?;
429                    content.extend_from_slice(create_view.as_bytes());
430                }
431
432                // Write to storage
433                export_self
434                    .write_to_storage(&operator, &file_path, content)
435                    .await?;
436
437                info!(
438                    "Finished exporting {}.{schema} with {} table schemas to path: {}",
439                    export_self.catalog,
440                    metric_physical_tables.len() + remaining_tables.len() + views.len(),
441                    export_self.storage_type.format_output_path(&file_path)
442                );
443
444                Ok::<(), Error>(())
445            });
446        }
447
448        let success = self.execute_tasks(tasks).await;
449        let elapsed = timer.elapsed();
450        info!("Success {success}/{db_count} jobs, cost: {elapsed:?}");
451
452        Ok(())
453    }
454
455    async fn build_operator(&self) -> Result<ObjectStore> {
456        Ok(self.operator.clone())
457    }
458
459    /// build operator with preference for file system
460    async fn build_prefer_fs_operator(&self) -> Result<ObjectStore> {
461        if self.storage_type.is_remote_storage()
462            && let Some(ddl_local_dir) = &self.ddl_local_dir
463        {
464            let root = ddl_local_dir.clone();
465            let op = new_fs_object_store(&root).map_err(|e| Error::Other {
466                source: e,
467                location: snafu::location!(),
468            })?;
469            Ok(op)
470        } else {
471            Ok(self.operator.clone())
472        }
473    }
474
475    async fn export_database_data(&self) -> Result<()> {
476        let timer = Instant::now();
477        let semaphore = Arc::new(Semaphore::new(self.export_jobs));
478        let db_names = self.get_db_names().await?;
479        let db_count = db_names.len();
480        let mut tasks = Vec::with_capacity(db_count);
481        let operator = Arc::new(self.build_operator().await?);
482        let fs_first_operator = Arc::new(self.build_prefer_fs_operator().await?);
483        let with_options = build_with_options(&self.start_time, &self.end_time, self.parallelism);
484
485        for schema in db_names {
486            let semaphore_moved = semaphore.clone();
487            let export_self = self.clone();
488            let with_options_clone = with_options.clone();
489            let operator = operator.clone();
490            let fs_first_operator = fs_first_operator.clone();
491
492            tasks.push(async move {
493                let _permit = semaphore_moved.acquire().await.unwrap();
494
495                // Create directory if not using remote storage
496                if !export_self.storage_type.is_remote_storage() {
497                    let db_dir = format!("{}/{}/", export_self.catalog, schema);
498                    operator.create_dir(&db_dir).await.context(OpenDalSnafu)?;
499                }
500
501                let (path, connection_part) = export_self
502                    .storage_type
503                    .get_storage_path(&export_self.catalog, &schema);
504
505                // Execute COPY DATABASE TO command
506                let sql = format!(
507                    r#"COPY DATABASE "{}"."{}" TO '{}' WITH ({}){};"#,
508                    export_self.catalog, schema, path, with_options_clone, connection_part
509                );
510
511                // Log SQL command but mask sensitive information
512                let safe_sql = export_self.storage_type.mask_sensitive_info(&sql);
513                info!("Executing sql: {}", safe_sql);
514
515                export_self.database_client.sql_in_public(&sql).await?;
516                info!(
517                    "Finished exporting {}.{} data to {}",
518                    export_self.catalog, schema, path
519                );
520
521                // Create copy_from.sql file
522                let copy_database_from_sql = {
523                    let command_without_connection = format!(
524                        r#"COPY DATABASE "{}"."{}" FROM '{}' WITH ({});"#,
525                        export_self.catalog, schema, COPY_PATH_PLACEHOLDER, with_options_clone
526                    );
527
528                    if connection_part.is_empty() {
529                        command_without_connection
530                    } else {
531                        let command_with_connection = format!(
532                            r#"COPY DATABASE "{}"."{}" FROM '{}' WITH ({}){};"#,
533                            export_self.catalog, schema, path, with_options_clone, connection_part
534                        );
535
536                        format!(
537                            "-- {}\n{}",
538                            command_with_connection, command_without_connection
539                        )
540                    }
541                };
542
543                let copy_from_path = export_self.get_file_path(&schema, "copy_from.sql");
544                export_self
545                    .write_to_storage(
546                        &fs_first_operator,
547                        &copy_from_path,
548                        copy_database_from_sql.into_bytes(),
549                    )
550                    .await?;
551
552                info!(
553                    "Finished exporting {}.{} copy_from.sql to {}",
554                    export_self.catalog,
555                    schema,
556                    export_self.storage_type.format_output_path(&copy_from_path)
557                );
558
559                Ok::<(), Error>(())
560            });
561        }
562
563        let success = self.execute_tasks(tasks).await;
564        let elapsed = timer.elapsed();
565        info!("Success {success}/{db_count} jobs, costs: {elapsed:?}");
566
567        Ok(())
568    }
569
570    fn get_file_path(&self, schema: &str, file_name: &str) -> String {
571        format!("{}/{}/{}", self.catalog, schema, file_name)
572    }
573
574    async fn write_to_storage(
575        &self,
576        op: &ObjectStore,
577        file_path: &str,
578        content: Vec<u8>,
579    ) -> Result<()> {
580        op.write(file_path, content)
581            .await
582            .context(OpenDalSnafu)
583            .map(|_| ())
584    }
585
586    async fn execute_tasks(
587        &self,
588        tasks: Vec<impl std::future::Future<Output = Result<()>>>,
589    ) -> usize {
590        futures::future::join_all(tasks)
591            .await
592            .into_iter()
593            .filter(|r| match r {
594                Ok(_) => true,
595                Err(e) => {
596                    error!(e; "export job failed");
597                    false
598                }
599            })
600            .count()
601    }
602}
603
604#[async_trait]
605impl Tool for Export {
606    async fn do_work(&self) -> std::result::Result<(), BoxedError> {
607        match self.target {
608            ExportTarget::Schema => {
609                self.export_create_database()
610                    .await
611                    .map_err(BoxedError::new)?;
612                self.export_create_table().await.map_err(BoxedError::new)
613            }
614            ExportTarget::Data => self.export_database_data().await.map_err(BoxedError::new),
615            ExportTarget::All => {
616                self.export_create_database()
617                    .await
618                    .map_err(BoxedError::new)?;
619                self.export_create_table().await.map_err(BoxedError::new)?;
620                self.export_database_data().await.map_err(BoxedError::new)
621            }
622        }
623    }
624}
625
626/// Builds the WITH options string for SQL commands, assuming consistent syntax across S3 and local exports.
627fn build_with_options(
628    start_time: &Option<String>,
629    end_time: &Option<String>,
630    parallelism: usize,
631) -> String {
632    let mut options = vec!["format = 'parquet'".to_string()];
633    if let Some(start) = start_time {
634        options.push(format!("start_time = '{}'", start));
635    }
636    if let Some(end) = end_time {
637        options.push(format!("end_time = '{}'", end));
638    }
639    options.push(format!("parallelism = {}", parallelism));
640    options.join(", ")
641}
642
643#[cfg(test)]
644mod tests {
645    use clap::Parser;
646    use common_test_util::temp_dir::create_temp_dir;
647
648    use super::*;
649
650    const MOCK_AZBLOB_ACCOUNT_KEY_B64: &str = "dGVzdC1rZXk=";
651
652    // ==================== Basic Success Cases ====================
653
654    #[tokio::test]
655    async fn test_export_command_build_with_local_fs() {
656        let temp_dir = create_temp_dir("test_export_local_fs");
657        let output_dir = temp_dir.path().to_str().unwrap();
658
659        let cmd = ExportCommand::parse_from([
660            "export",
661            "--addr",
662            "127.0.0.1:4000",
663            "--output-dir",
664            output_dir,
665        ]);
666
667        let result = cmd.build().await;
668        assert!(result.is_ok());
669    }
670
671    #[tokio::test]
672    async fn test_export_command_build_with_s3_success() {
673        let cmd = ExportCommand::parse_from([
674            "export",
675            "--addr",
676            "127.0.0.1:4000",
677            "--s3",
678            "--s3-bucket",
679            "test-bucket",
680            "--s3-root",
681            "test-root",
682            "--s3-access-key-id",
683            "test-key",
684            "--s3-secret-access-key",
685            "test-secret",
686            // Optional fields
687            "--s3-region",
688            "us-west-2",
689            "--s3-endpoint",
690            "https://s3.amazonaws.com",
691        ]);
692
693        let result = cmd.build().await;
694        assert!(result.is_ok());
695    }
696
697    #[tokio::test]
698    async fn test_export_command_build_with_oss_success() {
699        let cmd = ExportCommand::parse_from([
700            "export",
701            "--addr",
702            "127.0.0.1:4000",
703            "--oss",
704            "--oss-bucket",
705            "test-bucket",
706            "--oss-root",
707            "test-root",
708            "--oss-access-key-id",
709            "test-key-id",
710            "--oss-access-key-secret",
711            "test-secret",
712            "--oss-endpoint",
713            "https://oss.example.com",
714        ]);
715
716        let result = cmd.build().await;
717        assert!(result.is_ok());
718    }
719
720    #[tokio::test]
721    async fn test_export_command_build_with_gcs_success() {
722        let cmd = ExportCommand::parse_from([
723            "export",
724            "--addr",
725            "127.0.0.1:4000",
726            "--gcs",
727            "--gcs-bucket",
728            "test-bucket",
729            "--gcs-root",
730            "test-root",
731            "--gcs-scope",
732            "test-scope",
733            "--gcs-credential",
734            "test-credential-content",
735            "--gcs-endpoint",
736            "https://storage.googleapis.com",
737        ]);
738
739        let result = cmd.build().await;
740        assert!(result.is_ok());
741    }
742
743    #[tokio::test]
744    async fn test_export_command_build_with_gcs_adc_success() {
745        // Test GCS with Application Default Credentials (no explicit credentials provided)
746        let cmd = ExportCommand::parse_from([
747            "export",
748            "--addr",
749            "127.0.0.1:4000",
750            "--gcs",
751            "--gcs-bucket",
752            "test-bucket",
753            "--gcs-root",
754            "test-root",
755            "--gcs-scope",
756            "test-scope",
757            // No credential
758            // No endpoint (optional)
759        ]);
760
761        let result = cmd.build().await;
762        assert!(result.is_ok());
763    }
764
765    #[tokio::test]
766    async fn test_export_command_build_with_azblob_success() {
767        let cmd = ExportCommand::parse_from([
768            "export",
769            "--addr",
770            "127.0.0.1:4000",
771            "--azblob",
772            "--azblob-container",
773            "test-container",
774            "--azblob-root",
775            "test-root",
776            "--azblob-account-name",
777            "test-account",
778            "--azblob-account-key",
779            MOCK_AZBLOB_ACCOUNT_KEY_B64,
780            "--azblob-endpoint",
781            "https://account.blob.core.windows.net",
782        ]);
783
784        let result = cmd.build().await;
785        assert!(result.is_ok());
786    }
787
788    #[tokio::test]
789    async fn test_export_command_build_with_azblob_with_sas_token() {
790        // Test Azure Blob with SAS token
791        let cmd = ExportCommand::parse_from([
792            "export",
793            "--addr",
794            "127.0.0.1:4000",
795            "--azblob",
796            "--azblob-container",
797            "test-container",
798            "--azblob-root",
799            "test-root",
800            "--azblob-account-name",
801            "test-account",
802            "--azblob-account-key",
803            MOCK_AZBLOB_ACCOUNT_KEY_B64,
804            "--azblob-endpoint",
805            "https://account.blob.core.windows.net",
806            "--azblob-sas-token",
807            "test-sas-token",
808        ]);
809
810        let result = cmd.build().await;
811        assert!(result.is_ok());
812    }
813
814    // ==================== Gap 1: Parse-time dependency checks ====================
815
816    #[test]
817    fn test_export_command_build_with_conflict() {
818        // Try to enable both S3 and OSS
819        let result =
820            ExportCommand::try_parse_from(["export", "--addr", "127.0.0.1:4000", "--s3", "--oss"]);
821
822        assert!(result.is_err());
823        let err = result.unwrap_err();
824        // clap error for conflicting arguments
825        assert!(err.kind() == clap::error::ErrorKind::ArgumentConflict);
826    }
827
828    #[tokio::test]
829    async fn test_export_command_build_with_s3_no_enable_flag() {
830        // Test that providing S3 config without --s3 flag fails
831        let result = ExportCommand::try_parse_from([
832            "export",
833            "--addr",
834            "127.0.0.1:4000",
835            // Note: no --s3 flag
836            "--s3-bucket",
837            "test-bucket",
838            "--s3-access-key-id",
839            "test-key",
840            "--output-dir",
841            "/tmp/test",
842        ]);
843
844        assert!(result.is_err());
845        let err = result.unwrap_err();
846        assert_eq!(err.kind(), clap::error::ErrorKind::MissingRequiredArgument);
847        assert!(err.to_string().contains("--s3"));
848    }
849
850    #[tokio::test]
851    async fn test_export_command_build_with_oss_no_enable_flag() {
852        // Test that providing OSS config without --oss flag fails at parse time
853        let result = ExportCommand::try_parse_from([
854            "export",
855            "--addr",
856            "127.0.0.1:4000",
857            "--oss-bucket",
858            "test-bucket",
859            "--output-dir",
860            "/tmp/test",
861        ]);
862
863        assert!(result.is_err());
864        let err = result.unwrap_err();
865        assert_eq!(err.kind(), clap::error::ErrorKind::MissingRequiredArgument);
866        assert!(err.to_string().contains("--oss"));
867    }
868
869    #[tokio::test]
870    async fn test_export_command_build_with_gcs_no_enable_flag() {
871        // Test that providing GCS config without --gcs flag fails at parse time
872        let result = ExportCommand::try_parse_from([
873            "export",
874            "--addr",
875            "127.0.0.1:4000",
876            "--gcs-bucket",
877            "test-bucket",
878            "--output-dir",
879            "/tmp/test",
880        ]);
881
882        assert!(result.is_err());
883        let err = result.unwrap_err();
884        assert_eq!(err.kind(), clap::error::ErrorKind::MissingRequiredArgument);
885        assert!(err.to_string().contains("--gcs"));
886    }
887
888    #[tokio::test]
889    async fn test_export_command_build_with_azblob_no_enable_flag() {
890        // Test that providing Azure Blob config without --azblob flag fails at parse time
891        let result = ExportCommand::try_parse_from([
892            "export",
893            "--addr",
894            "127.0.0.1:4000",
895            "--azblob-container",
896            "test-container",
897            "--output-dir",
898            "/tmp/test",
899        ]);
900
901        assert!(result.is_err());
902        let err = result.unwrap_err();
903        assert_eq!(err.kind(), clap::error::ErrorKind::MissingRequiredArgument);
904        assert!(err.to_string().contains("--azblob"));
905    }
906
907    // ==================== Gap 2: Empty string vs missing tests ====================
908
909    #[tokio::test]
910    async fn test_export_command_build_with_s3_empty_root() {
911        // Empty root should be allowed (it's optional path component)
912        let cmd = ExportCommand::parse_from([
913            "export",
914            "--addr",
915            "127.0.0.1:4000",
916            "--s3",
917            "--s3-bucket",
918            "test-bucket",
919            "--s3-root",
920            "", // Empty root is OK
921            "--s3-access-key-id",
922            "test-key",
923            "--s3-secret-access-key",
924            "test-secret",
925            "--s3-region",
926            "us-west-2",
927        ]);
928
929        let result = cmd.build().await;
930        // Should succeed because root is not a required field
931        assert!(
932            result.is_ok(),
933            "Expected success but got: {:?}",
934            result.err()
935        );
936    }
937
938    #[tokio::test]
939    async fn test_export_command_build_with_oss_empty_access_key_id() {
940        // Test OSS with empty access_key_id (empty string, not missing)
941        let cmd = ExportCommand::parse_from([
942            "export",
943            "--addr",
944            "127.0.0.1:4000",
945            "--oss",
946            "--oss-bucket",
947            "test-bucket",
948            "--oss-access-key-id",
949            "", // Empty string
950            "--oss-access-key-secret",
951            "test-secret",
952            "--oss-endpoint",
953            "https://oss.example.com",
954        ]);
955
956        let result = cmd.build().await;
957        assert!(result.is_err());
958        if let Err(err) = result {
959            assert!(
960                err.to_string().contains("OSS access key ID must be set"),
961                "Actual error: {}",
962                err
963            );
964        }
965    }
966
967    #[tokio::test]
968    async fn test_export_command_build_with_oss_missing_endpoint() {
969        // Missing endpoint
970        let cmd = ExportCommand::parse_from([
971            "export",
972            "--addr",
973            "127.0.0.1:4000",
974            "--oss",
975            "--oss-bucket",
976            "test-bucket",
977            "--oss-root",
978            "test-root",
979            "--oss-access-key-id",
980            "test-key-id",
981            "--oss-access-key-secret",
982            "test-secret",
983        ]);
984
985        let result = cmd.build().await;
986        assert!(result.is_err());
987        if let Err(err) = result {
988            assert!(
989                err.to_string().contains("OSS endpoint must be set"),
990                "Actual error: {}",
991                err
992            );
993        }
994    }
995
996    #[tokio::test]
997    async fn test_export_command_build_with_oss_multiple_missing_fields() {
998        // Test OSS with multiple missing required fields
999        let cmd = ExportCommand::parse_from([
1000            "export",
1001            "--addr",
1002            "127.0.0.1:4000",
1003            "--oss",
1004            "--oss-bucket",
1005            "test-bucket",
1006            // Missing: root, access_key_id, access_key_secret, endpoint
1007        ]);
1008
1009        let result = cmd.build().await;
1010        assert!(result.is_err());
1011        if let Err(err) = result {
1012            let err_str = err.to_string();
1013            // Should mention multiple missing fields
1014            assert!(
1015                err_str.contains("OSS"),
1016                "Error should mention OSS: {}",
1017                err_str
1018            );
1019            assert!(
1020                err_str.contains("must be set"),
1021                "Error should mention required fields: {}",
1022                err_str
1023            );
1024        }
1025    }
1026
1027    #[tokio::test]
1028    async fn test_export_command_build_with_gcs_empty_bucket() {
1029        // Test GCS with empty bucket
1030        let cmd = ExportCommand::parse_from([
1031            "export",
1032            "--addr",
1033            "127.0.0.1:4000",
1034            "--gcs",
1035            "--gcs-bucket",
1036            "", // Empty bucket
1037            "--gcs-root",
1038            "test-root",
1039            "--gcs-scope",
1040            "test-scope",
1041        ]);
1042
1043        let result = cmd.build().await;
1044        assert!(result.is_err());
1045        if let Err(err) = result {
1046            assert!(
1047                err.to_string().contains("GCS bucket must be set"),
1048                "Actual error: {}",
1049                err
1050            );
1051        }
1052    }
1053
1054    #[tokio::test]
1055    async fn test_export_command_build_with_gcs_empty_root() {
1056        // Test GCS when root is missing (should fail as it's required)
1057        let cmd = ExportCommand::parse_from([
1058            "export",
1059            "--addr",
1060            "127.0.0.1:4000",
1061            "--gcs",
1062            "--gcs-bucket",
1063            "test-bucket",
1064            "--gcs-root",
1065            "", // Empty root
1066            "--gcs-scope",
1067            "test-scope",
1068            "--gcs-credential",
1069            "test-credential",
1070            "--gcs-endpoint",
1071            "https://storage.googleapis.com",
1072        ]);
1073
1074        let result = cmd.build().await;
1075        assert!(result.is_err());
1076        if let Err(err) = result {
1077            assert!(
1078                err.to_string().contains("GCS root must be set"),
1079                "Actual error: {}",
1080                err
1081            );
1082        }
1083    }
1084
1085    #[tokio::test]
1086    async fn test_export_command_build_with_azblob_empty_account_name() {
1087        // Test Azure Blob with empty account_name
1088        let cmd = ExportCommand::parse_from([
1089            "export",
1090            "--addr",
1091            "127.0.0.1:4000",
1092            "--azblob",
1093            "--azblob-container",
1094            "test-container",
1095            "--azblob-root",
1096            "test-root",
1097            "--azblob-account-name",
1098            "", // Empty account name
1099            "--azblob-account-key",
1100            MOCK_AZBLOB_ACCOUNT_KEY_B64,
1101            "--azblob-endpoint",
1102            "https://account.blob.core.windows.net",
1103        ]);
1104
1105        let result = cmd.build().await;
1106        assert!(result.is_err());
1107        if let Err(err) = result {
1108            assert!(
1109                err.to_string().contains("AzBlob account name must be set"),
1110                "Actual error: {}",
1111                err
1112            );
1113        }
1114    }
1115
1116    #[tokio::test]
1117    async fn test_export_command_build_with_azblob_missing_account_key() {
1118        // Missing account key
1119        let cmd = ExportCommand::parse_from([
1120            "export",
1121            "--addr",
1122            "127.0.0.1:4000",
1123            "--azblob",
1124            "--azblob-container",
1125            "test-container",
1126            "--azblob-root",
1127            "test-root",
1128            "--azblob-account-name",
1129            "test-account",
1130            "--azblob-endpoint",
1131            "https://account.blob.core.windows.net",
1132        ]);
1133
1134        let result = cmd.build().await;
1135        assert!(result.is_err());
1136        if let Err(err) = result {
1137            assert!(
1138                err.to_string()
1139                    .contains("AzBlob account key (when sas_token is not provided) must be set"),
1140                "Actual error: {}",
1141                err
1142            );
1143        }
1144    }
1145
1146    // ==================== Gap 3: Boundary cases ====================
1147
1148    #[tokio::test]
1149    async fn test_export_command_build_with_no_storage() {
1150        // No output-dir and no backend - should fail
1151        let cmd = ExportCommand::parse_from(["export", "--addr", "127.0.0.1:4000"]);
1152
1153        let result = cmd.build().await;
1154        assert!(result.is_err());
1155        if let Err(err) = result {
1156            assert!(
1157                err.to_string().contains("Output directory not set"),
1158                "Actual error: {}",
1159                err
1160            );
1161        }
1162    }
1163
1164    #[tokio::test]
1165    async fn test_export_command_build_with_s3_minimal_config() {
1166        // S3 with only required fields (no optional fields)
1167        let cmd = ExportCommand::parse_from([
1168            "export",
1169            "--addr",
1170            "127.0.0.1:4000",
1171            "--s3",
1172            "--s3-bucket",
1173            "test-bucket",
1174            "--s3-access-key-id",
1175            "test-key",
1176            "--s3-secret-access-key",
1177            "test-secret",
1178            "--s3-region",
1179            "us-west-2",
1180            // No root, endpoint, or enable_virtual_host_style
1181        ]);
1182
1183        let result = cmd.build().await;
1184        assert!(result.is_ok(), "Minimal S3 config should succeed");
1185    }
1186
1187    #[tokio::test]
1188    async fn test_export_command_build_with_oss_minimal_config() {
1189        // OSS with only required fields
1190        let cmd = ExportCommand::parse_from([
1191            "export",
1192            "--addr",
1193            "127.0.0.1:4000",
1194            "--oss",
1195            "--oss-bucket",
1196            "test-bucket",
1197            "--oss-access-key-id",
1198            "test-key-id",
1199            "--oss-access-key-secret",
1200            "test-secret",
1201            "--oss-endpoint",
1202            "https://oss.example.com",
1203            // No root
1204        ]);
1205
1206        let result = cmd.build().await;
1207        assert!(result.is_ok(), "Minimal OSS config should succeed");
1208    }
1209
1210    #[tokio::test]
1211    async fn test_export_command_build_with_gcs_minimal_config() {
1212        // GCS with only required fields (using ADC)
1213        let cmd = ExportCommand::parse_from([
1214            "export",
1215            "--addr",
1216            "127.0.0.1:4000",
1217            "--gcs",
1218            "--gcs-bucket",
1219            "test-bucket",
1220            "--gcs-root",
1221            "test-root",
1222            "--gcs-scope",
1223            "test-scope",
1224            // No credential, or endpoint
1225        ]);
1226
1227        let result = cmd.build().await;
1228        assert!(result.is_ok(), "Minimal GCS config should succeed");
1229    }
1230
1231    #[tokio::test]
1232    async fn test_export_command_build_with_azblob_minimal_config() {
1233        // Azure Blob with only required fields
1234        let cmd = ExportCommand::parse_from([
1235            "export",
1236            "--addr",
1237            "127.0.0.1:4000",
1238            "--azblob",
1239            "--azblob-container",
1240            "test-container",
1241            "--azblob-root",
1242            "test-root",
1243            "--azblob-account-name",
1244            "test-account",
1245            "--azblob-account-key",
1246            MOCK_AZBLOB_ACCOUNT_KEY_B64,
1247            "--azblob-endpoint",
1248            "https://account.blob.core.windows.net",
1249            // No sas_token
1250        ]);
1251
1252        let result = cmd.build().await;
1253        assert!(result.is_ok(), "Minimal AzBlob config should succeed");
1254    }
1255
1256    #[tokio::test]
1257    async fn test_export_command_build_with_local_and_s3() {
1258        // Both output-dir and S3 - S3 should take precedence
1259        let temp_dir = create_temp_dir("test_export_local_and_s3");
1260        let output_dir = temp_dir.path().to_str().unwrap();
1261
1262        let cmd = ExportCommand::parse_from([
1263            "export",
1264            "--addr",
1265            "127.0.0.1:4000",
1266            "--output-dir",
1267            output_dir,
1268            "--s3",
1269            "--s3-bucket",
1270            "test-bucket",
1271            "--s3-access-key-id",
1272            "test-key",
1273            "--s3-secret-access-key",
1274            "test-secret",
1275            "--s3-region",
1276            "us-west-2",
1277        ]);
1278
1279        let result = cmd.build().await;
1280        assert!(
1281            result.is_ok(),
1282            "S3 should be selected when both are provided"
1283        );
1284    }
1285
1286    // ==================== Gap 4: Custom validation (Azure Blob) ====================
1287
1288    #[tokio::test]
1289    async fn test_export_command_build_with_azblob_only_sas_token() {
1290        // Azure Blob with sas_token but no account_key - should succeed
1291        let cmd = ExportCommand::parse_from([
1292            "export",
1293            "--addr",
1294            "127.0.0.1:4000",
1295            "--azblob",
1296            "--azblob-container",
1297            "test-container",
1298            "--azblob-root",
1299            "test-root",
1300            "--azblob-account-name",
1301            "test-account",
1302            "--azblob-endpoint",
1303            "https://account.blob.core.windows.net",
1304            "--azblob-sas-token",
1305            "test-sas-token",
1306            // No account_key
1307        ]);
1308
1309        let result = cmd.build().await;
1310        assert!(
1311            result.is_ok(),
1312            "AzBlob with only sas_token should succeed: {:?}",
1313            result.err()
1314        );
1315    }
1316
1317    #[tokio::test]
1318    async fn test_export_command_build_with_azblob_empty_account_key_with_sas() {
1319        // Azure Blob with empty account_key but valid sas_token - should succeed
1320        let cmd = ExportCommand::parse_from([
1321            "export",
1322            "--addr",
1323            "127.0.0.1:4000",
1324            "--azblob",
1325            "--azblob-container",
1326            "test-container",
1327            "--azblob-root",
1328            "test-root",
1329            "--azblob-account-name",
1330            "test-account",
1331            "--azblob-account-key",
1332            "", // Empty account_key is OK if sas_token is provided
1333            "--azblob-endpoint",
1334            "https://account.blob.core.windows.net",
1335            "--azblob-sas-token",
1336            "test-sas-token",
1337        ]);
1338
1339        let result = cmd.build().await;
1340        assert!(
1341            result.is_ok(),
1342            "AzBlob with empty account_key but sas_token should succeed: {:?}",
1343            result.err()
1344        );
1345    }
1346}