cli/data/
export.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::sync::Arc;
17use std::time::Duration;
18
19use async_trait::async_trait;
20use clap::{Parser, ValueEnum};
21use common_error::ext::BoxedError;
22use common_telemetry::{debug, error, info};
23use object_store::ObjectStore;
24use serde_json::Value;
25use snafu::{OptionExt, ResultExt};
26use tokio::sync::Semaphore;
27use tokio::time::Instant;
28
29use crate::common::{ObjectStoreConfig, new_fs_object_store};
30use crate::data::storage_export::{
31    AzblobBackend, FsBackend, GcsBackend, OssBackend, S3Backend, StorageExport, StorageType,
32};
33use crate::data::{COPY_PATH_PLACEHOLDER, default_database};
34use crate::database::{DatabaseClient, parse_proxy_opts};
35use crate::error::{
36    EmptyResultSnafu, Error, OpenDalSnafu, OutputDirNotSetSnafu, Result, SchemaNotFoundSnafu,
37};
38use crate::{Tool, database};
39
40type TableReference = (String, String, String);
41
42#[derive(Debug, Default, Clone, ValueEnum)]
43enum ExportTarget {
44    /// Export all table schemas, corresponding to `SHOW CREATE TABLE`.
45    Schema,
46    /// Export all table data, corresponding to `COPY DATABASE TO`.
47    Data,
48    /// Export all table schemas and data at once.
49    #[default]
50    All,
51}
52
53/// Command for exporting data from the GreptimeDB.
54#[derive(Debug, Default, Parser)]
55pub struct ExportCommand {
56    /// Server address to connect
57    #[clap(long)]
58    addr: String,
59
60    /// Directory to put the exported data. E.g.: /tmp/greptimedb-export
61    /// for local export.
62    #[clap(long)]
63    output_dir: Option<String>,
64
65    /// The name of the catalog to export.
66    #[clap(long, default_value_t = default_database())]
67    database: String,
68
69    /// The number of databases exported in parallel.
70    /// For example, if there are 20 databases and `db_parallelism` is 4,
71    /// 4 databases will be exported concurrently.
72    #[clap(long, short = 'j', default_value = "1", alias = "export-jobs")]
73    db_parallelism: usize,
74
75    /// The number of tables exported in parallel within a single database.
76    /// For example, if a database has 30 tables and `parallelism` is 8,
77    /// 8 tables will be exported concurrently.
78    #[clap(long, default_value = "4")]
79    table_parallelism: usize,
80
81    /// Max retry times for each job.
82    #[clap(long, default_value = "3")]
83    max_retry: usize,
84
85    /// Things to export
86    #[clap(long, short = 't', value_enum, default_value = "all")]
87    target: ExportTarget,
88
89    /// A half-open time range: [start_time, end_time).
90    /// The start of the time range (time-index column) for data export.
91    #[clap(long)]
92    start_time: Option<String>,
93
94    /// A half-open time range: [start_time, end_time).
95    /// The end of the time range (time-index column) for data export.
96    #[clap(long)]
97    end_time: Option<String>,
98
99    /// The basic authentication for connecting to the server
100    #[clap(long)]
101    auth_basic: Option<String>,
102
103    /// The timeout of invoking the database.
104    ///
105    /// It is used to override the server-side timeout setting.
106    /// The default behavior will disable server-side default timeout(i.e. `0s`).
107    #[clap(long, value_parser = humantime::parse_duration)]
108    timeout: Option<Duration>,
109
110    /// The proxy server address to connect, if set, will override the system proxy.
111    ///
112    /// The default behavior will use the system proxy if neither `proxy` nor `no_proxy` is set.
113    #[clap(long)]
114    proxy: Option<String>,
115
116    /// Disable proxy server, if set, will not use any proxy.
117    #[clap(long)]
118    no_proxy: bool,
119
120    /// if both `ddl_local_dir` and remote storage are set, `ddl_local_dir` will be only used for
121    /// exported SQL files, and the data will be exported to remote storage.
122    ///
123    /// Note that `ddl_local_dir` export sql files to **LOCAL** file system, this is useful if export client don't have
124    /// direct access to remote storage.
125    ///
126    /// if remote storage is set but `ddl_local_dir` is not set, both SQL&data will be exported to remote storage.
127    #[clap(long)]
128    ddl_local_dir: Option<String>,
129
130    #[clap(flatten)]
131    storage: ObjectStoreConfig,
132}
133
134impl ExportCommand {
135    pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
136        // Determine storage type
137        let (storage_type, operator) = if self.storage.enable_s3 {
138            (
139                StorageType::S3(S3Backend::new(self.storage.s3.clone())?),
140                self.storage.build_s3()?,
141            )
142        } else if self.storage.enable_oss {
143            (
144                StorageType::Oss(OssBackend::new(self.storage.oss.clone())?),
145                self.storage.build_oss()?,
146            )
147        } else if self.storage.enable_gcs {
148            (
149                StorageType::Gcs(GcsBackend::new(self.storage.gcs.clone())?),
150                self.storage.build_gcs()?,
151            )
152        } else if self.storage.enable_azblob {
153            (
154                StorageType::Azblob(AzblobBackend::new(self.storage.azblob.clone())?),
155                self.storage.build_azblob()?,
156            )
157        } else if let Some(output_dir) = &self.output_dir {
158            (
159                StorageType::Fs(FsBackend::new(output_dir.clone())),
160                new_fs_object_store(output_dir)?,
161            )
162        } else {
163            return Err(BoxedError::new(OutputDirNotSetSnafu {}.build()));
164        };
165
166        let (catalog, schema) =
167            database::split_database(&self.database).map_err(BoxedError::new)?;
168        let proxy = parse_proxy_opts(self.proxy.clone(), self.no_proxy)?;
169        let database_client = DatabaseClient::new(
170            self.addr.clone(),
171            catalog.clone(),
172            self.auth_basic.clone(),
173            // Treats `None` as `0s` to disable server-side default timeout.
174            self.timeout.unwrap_or_default(),
175            proxy,
176        );
177
178        Ok(Box::new(Export {
179            catalog,
180            schema,
181            database_client,
182            export_jobs: self.db_parallelism,
183            target: self.target.clone(),
184            start_time: self.start_time.clone(),
185            end_time: self.end_time.clone(),
186            parallelism: self.table_parallelism,
187            storage_type,
188            ddl_local_dir: self.ddl_local_dir.clone(),
189            operator,
190        }))
191    }
192}
193
194#[derive(Clone)]
195pub struct Export {
196    catalog: String,
197    schema: Option<String>,
198    database_client: DatabaseClient,
199    export_jobs: usize,
200    target: ExportTarget,
201    start_time: Option<String>,
202    end_time: Option<String>,
203    parallelism: usize,
204    storage_type: StorageType,
205    ddl_local_dir: Option<String>,
206    operator: ObjectStore,
207}
208
209impl Export {
210    async fn get_db_names(&self) -> Result<Vec<String>> {
211        let db_names = self.all_db_names().await?;
212        let Some(schema) = &self.schema else {
213            return Ok(db_names);
214        };
215
216        // Check if the schema exists
217        db_names
218            .into_iter()
219            .find(|db_name| db_name.to_lowercase() == schema.to_lowercase())
220            .map(|name| vec![name])
221            .context(SchemaNotFoundSnafu {
222                catalog: &self.catalog,
223                schema,
224            })
225    }
226
227    /// Iterate over all db names.
228    async fn all_db_names(&self) -> Result<Vec<String>> {
229        let records = self
230            .database_client
231            .sql_in_public("SHOW DATABASES")
232            .await?
233            .context(EmptyResultSnafu)?;
234        let mut result = Vec::with_capacity(records.len());
235        for value in records {
236            let Value::String(schema) = &value[0] else {
237                unreachable!()
238            };
239            if schema == common_catalog::consts::INFORMATION_SCHEMA_NAME {
240                continue;
241            }
242            if schema == common_catalog::consts::PG_CATALOG_NAME {
243                continue;
244            }
245            result.push(schema.clone());
246        }
247        Ok(result)
248    }
249
250    /// Return a list of [`TableReference`] to be exported.
251    /// Includes all tables under the given `catalog` and `schema`.
252    async fn get_table_list(
253        &self,
254        catalog: &str,
255        schema: &str,
256    ) -> Result<(
257        Vec<TableReference>,
258        Vec<TableReference>,
259        Vec<TableReference>,
260    )> {
261        // Puts all metric table first
262        let sql = format!(
263            "SELECT table_catalog, table_schema, table_name \
264            FROM information_schema.columns \
265            WHERE column_name = '__tsid' \
266                and table_catalog = \'{catalog}\' \
267                and table_schema = \'{schema}\'"
268        );
269        let records = self
270            .database_client
271            .sql_in_public(&sql)
272            .await?
273            .context(EmptyResultSnafu)?;
274        let mut metric_physical_tables = HashSet::with_capacity(records.len());
275        for value in records {
276            let mut t = Vec::with_capacity(3);
277            for v in &value {
278                let Value::String(value) = v else {
279                    unreachable!()
280                };
281                t.push(value);
282            }
283            metric_physical_tables.insert((t[0].clone(), t[1].clone(), t[2].clone()));
284        }
285
286        let sql = format!(
287            "SELECT table_catalog, table_schema, table_name, table_type \
288            FROM information_schema.tables \
289            WHERE (table_type = \'BASE TABLE\' OR table_type = \'VIEW\') \
290                and table_catalog = \'{catalog}\' \
291                and table_schema = \'{schema}\'",
292        );
293        let records = self
294            .database_client
295            .sql_in_public(&sql)
296            .await?
297            .context(EmptyResultSnafu)?;
298
299        debug!("Fetched table/view list: {:?}", records);
300
301        if records.is_empty() {
302            return Ok((vec![], vec![], vec![]));
303        }
304
305        let mut remaining_tables = Vec::with_capacity(records.len());
306        let mut views = Vec::new();
307        for value in records {
308            let mut t = Vec::with_capacity(4);
309            for v in &value {
310                let Value::String(value) = v else {
311                    unreachable!()
312                };
313                t.push(value);
314            }
315            let table = (t[0].clone(), t[1].clone(), t[2].clone());
316            let table_type = t[3].as_str();
317            // Ignores the physical table
318            if !metric_physical_tables.contains(&table) {
319                if table_type == "VIEW" {
320                    views.push(table);
321                } else {
322                    remaining_tables.push(table);
323                }
324            }
325        }
326
327        Ok((
328            metric_physical_tables.into_iter().collect(),
329            remaining_tables,
330            views,
331        ))
332    }
333
334    async fn show_create(
335        &self,
336        show_type: &str,
337        catalog: &str,
338        schema: &str,
339        table: Option<&str>,
340    ) -> Result<String> {
341        let sql = match table {
342            Some(table) => format!(
343                r#"SHOW CREATE {} "{}"."{}"."{}""#,
344                show_type, catalog, schema, table
345            ),
346            None => format!(r#"SHOW CREATE {} "{}"."{}""#, show_type, catalog, schema),
347        };
348        let records = self
349            .database_client
350            .sql_in_public(&sql)
351            .await?
352            .context(EmptyResultSnafu)?;
353        let Value::String(create) = &records[0][1] else {
354            unreachable!()
355        };
356
357        Ok(format!("{};\n", create))
358    }
359
360    async fn export_create_database(&self) -> Result<()> {
361        let timer = Instant::now();
362        let db_names = self.get_db_names().await?;
363        let db_count = db_names.len();
364        let operator = self.build_prefer_fs_operator().await?;
365
366        for schema in db_names {
367            let create_database = self
368                .show_create("DATABASE", &self.catalog, &schema, None)
369                .await?;
370
371            let file_path = self.get_file_path(&schema, "create_database.sql");
372            self.write_to_storage(&operator, &file_path, create_database.into_bytes())
373                .await?;
374
375            info!(
376                "Exported {}.{} database creation SQL to {}",
377                self.catalog,
378                schema,
379                self.storage_type.format_output_path(&file_path)
380            );
381        }
382
383        let elapsed = timer.elapsed();
384        info!("Success {db_count} jobs, cost: {elapsed:?}");
385
386        Ok(())
387    }
388
389    async fn export_create_table(&self) -> Result<()> {
390        let timer = Instant::now();
391        let semaphore = Arc::new(Semaphore::new(self.export_jobs));
392        let db_names = self.get_db_names().await?;
393        let db_count = db_names.len();
394        let operator = Arc::new(self.build_prefer_fs_operator().await?);
395        let mut tasks = Vec::with_capacity(db_names.len());
396
397        for schema in db_names {
398            let semaphore_moved = semaphore.clone();
399            let export_self = self.clone();
400            let operator = operator.clone();
401            tasks.push(async move {
402                let _permit = semaphore_moved.acquire().await.unwrap();
403                let (metric_physical_tables, remaining_tables, views) = export_self
404                    .get_table_list(&export_self.catalog, &schema)
405                    .await?;
406
407                // Create directory if needed for file system storage
408                if !export_self.storage_type.is_remote_storage() {
409                    let db_dir = format!("{}/{}/", export_self.catalog, schema);
410                    operator.create_dir(&db_dir).await.context(OpenDalSnafu)?;
411                }
412
413                let file_path = export_self.get_file_path(&schema, "create_tables.sql");
414                let mut content = Vec::new();
415
416                // Add table creation SQL
417                for (c, s, t) in metric_physical_tables.iter().chain(&remaining_tables) {
418                    let create_table = export_self.show_create("TABLE", c, s, Some(t)).await?;
419                    content.extend_from_slice(create_table.as_bytes());
420                }
421
422                // Add view creation SQL
423                for (c, s, v) in &views {
424                    let create_view = export_self.show_create("VIEW", c, s, Some(v)).await?;
425                    content.extend_from_slice(create_view.as_bytes());
426                }
427
428                // Write to storage
429                export_self
430                    .write_to_storage(&operator, &file_path, content)
431                    .await?;
432
433                info!(
434                    "Finished exporting {}.{schema} with {} table schemas to path: {}",
435                    export_self.catalog,
436                    metric_physical_tables.len() + remaining_tables.len() + views.len(),
437                    export_self.storage_type.format_output_path(&file_path)
438                );
439
440                Ok::<(), Error>(())
441            });
442        }
443
444        let success = self.execute_tasks(tasks).await;
445        let elapsed = timer.elapsed();
446        info!("Success {success}/{db_count} jobs, cost: {elapsed:?}");
447
448        Ok(())
449    }
450
451    async fn build_operator(&self) -> Result<ObjectStore> {
452        Ok(self.operator.clone())
453    }
454
455    /// build operator with preference for file system
456    async fn build_prefer_fs_operator(&self) -> Result<ObjectStore> {
457        if self.storage_type.is_remote_storage() && self.ddl_local_dir.is_some() {
458            let root = self.ddl_local_dir.as_ref().unwrap().clone();
459            let op = new_fs_object_store(&root).map_err(|e| Error::Other {
460                source: e,
461                location: snafu::location!(),
462            })?;
463            Ok(op)
464        } else {
465            Ok(self.operator.clone())
466        }
467    }
468
469    async fn export_database_data(&self) -> Result<()> {
470        let timer = Instant::now();
471        let semaphore = Arc::new(Semaphore::new(self.export_jobs));
472        let db_names = self.get_db_names().await?;
473        let db_count = db_names.len();
474        let mut tasks = Vec::with_capacity(db_count);
475        let operator = Arc::new(self.build_operator().await?);
476        let fs_first_operator = Arc::new(self.build_prefer_fs_operator().await?);
477        let with_options = build_with_options(&self.start_time, &self.end_time, self.parallelism);
478
479        for schema in db_names {
480            let semaphore_moved = semaphore.clone();
481            let export_self = self.clone();
482            let with_options_clone = with_options.clone();
483            let operator = operator.clone();
484            let fs_first_operator = fs_first_operator.clone();
485
486            tasks.push(async move {
487                let _permit = semaphore_moved.acquire().await.unwrap();
488
489                // Create directory if not using remote storage
490                if !export_self.storage_type.is_remote_storage() {
491                    let db_dir = format!("{}/{}/", export_self.catalog, schema);
492                    operator.create_dir(&db_dir).await.context(OpenDalSnafu)?;
493                }
494
495                let (path, connection_part) = export_self
496                    .storage_type
497                    .get_storage_path(&export_self.catalog, &schema);
498
499                // Execute COPY DATABASE TO command
500                let sql = format!(
501                    r#"COPY DATABASE "{}"."{}" TO '{}' WITH ({}){};"#,
502                    export_self.catalog, schema, path, with_options_clone, connection_part
503                );
504
505                // Log SQL command but mask sensitive information
506                let safe_sql = export_self.storage_type.mask_sensitive_info(&sql);
507                info!("Executing sql: {}", safe_sql);
508
509                export_self.database_client.sql_in_public(&sql).await?;
510                info!(
511                    "Finished exporting {}.{} data to {}",
512                    export_self.catalog, schema, path
513                );
514
515                // Create copy_from.sql file
516                let copy_database_from_sql = {
517                    let command_without_connection = format!(
518                        r#"COPY DATABASE "{}"."{}" FROM '{}' WITH ({});"#,
519                        export_self.catalog, schema, COPY_PATH_PLACEHOLDER, with_options_clone
520                    );
521
522                    if connection_part.is_empty() {
523                        command_without_connection
524                    } else {
525                        let command_with_connection = format!(
526                            r#"COPY DATABASE "{}"."{}" FROM '{}' WITH ({}){};"#,
527                            export_self.catalog, schema, path, with_options_clone, connection_part
528                        );
529
530                        format!(
531                            "-- {}\n{}",
532                            command_with_connection, command_without_connection
533                        )
534                    }
535                };
536
537                let copy_from_path = export_self.get_file_path(&schema, "copy_from.sql");
538                export_self
539                    .write_to_storage(
540                        &fs_first_operator,
541                        &copy_from_path,
542                        copy_database_from_sql.into_bytes(),
543                    )
544                    .await?;
545
546                info!(
547                    "Finished exporting {}.{} copy_from.sql to {}",
548                    export_self.catalog,
549                    schema,
550                    export_self.storage_type.format_output_path(&copy_from_path)
551                );
552
553                Ok::<(), Error>(())
554            });
555        }
556
557        let success = self.execute_tasks(tasks).await;
558        let elapsed = timer.elapsed();
559        info!("Success {success}/{db_count} jobs, costs: {elapsed:?}");
560
561        Ok(())
562    }
563
564    fn get_file_path(&self, schema: &str, file_name: &str) -> String {
565        format!("{}/{}/{}", self.catalog, schema, file_name)
566    }
567
568    async fn write_to_storage(
569        &self,
570        op: &ObjectStore,
571        file_path: &str,
572        content: Vec<u8>,
573    ) -> Result<()> {
574        op.write(file_path, content)
575            .await
576            .context(OpenDalSnafu)
577            .map(|_| ())
578    }
579
580    async fn execute_tasks(
581        &self,
582        tasks: Vec<impl std::future::Future<Output = Result<()>>>,
583    ) -> usize {
584        futures::future::join_all(tasks)
585            .await
586            .into_iter()
587            .filter(|r| match r {
588                Ok(_) => true,
589                Err(e) => {
590                    error!(e; "export job failed");
591                    false
592                }
593            })
594            .count()
595    }
596}
597
598#[async_trait]
599impl Tool for Export {
600    async fn do_work(&self) -> std::result::Result<(), BoxedError> {
601        match self.target {
602            ExportTarget::Schema => {
603                self.export_create_database()
604                    .await
605                    .map_err(BoxedError::new)?;
606                self.export_create_table().await.map_err(BoxedError::new)
607            }
608            ExportTarget::Data => self.export_database_data().await.map_err(BoxedError::new),
609            ExportTarget::All => {
610                self.export_create_database()
611                    .await
612                    .map_err(BoxedError::new)?;
613                self.export_create_table().await.map_err(BoxedError::new)?;
614                self.export_database_data().await.map_err(BoxedError::new)
615            }
616        }
617    }
618}
619
620/// Builds the WITH options string for SQL commands, assuming consistent syntax across S3 and local exports.
621fn build_with_options(
622    start_time: &Option<String>,
623    end_time: &Option<String>,
624    parallelism: usize,
625) -> String {
626    let mut options = vec!["format = 'parquet'".to_string()];
627    if let Some(start) = start_time {
628        options.push(format!("start_time = '{}'", start));
629    }
630    if let Some(end) = end_time {
631        options.push(format!("end_time = '{}'", end));
632    }
633    options.push(format!("parallelism = {}", parallelism));
634    options.join(", ")
635}
636
637#[cfg(test)]
638mod tests {
639    use clap::Parser;
640    use common_test_util::temp_dir::create_temp_dir;
641
642    use super::*;
643
644    const MOCK_AZBLOB_ACCOUNT_KEY_B64: &str = "dGVzdC1rZXk=";
645
646    // ==================== Basic Success Cases ====================
647
648    #[tokio::test]
649    async fn test_export_command_build_with_local_fs() {
650        let temp_dir = create_temp_dir("test_export_local_fs");
651        let output_dir = temp_dir.path().to_str().unwrap();
652
653        let cmd = ExportCommand::parse_from([
654            "export",
655            "--addr",
656            "127.0.0.1:4000",
657            "--output-dir",
658            output_dir,
659        ]);
660
661        let result = cmd.build().await;
662        assert!(result.is_ok());
663    }
664
665    #[tokio::test]
666    async fn test_export_command_build_with_s3_success() {
667        let cmd = ExportCommand::parse_from([
668            "export",
669            "--addr",
670            "127.0.0.1:4000",
671            "--s3",
672            "--s3-bucket",
673            "test-bucket",
674            "--s3-root",
675            "test-root",
676            "--s3-access-key-id",
677            "test-key",
678            "--s3-secret-access-key",
679            "test-secret",
680            // Optional fields
681            "--s3-region",
682            "us-west-2",
683            "--s3-endpoint",
684            "https://s3.amazonaws.com",
685        ]);
686
687        let result = cmd.build().await;
688        assert!(result.is_ok());
689    }
690
691    #[tokio::test]
692    async fn test_export_command_build_with_oss_success() {
693        let cmd = ExportCommand::parse_from([
694            "export",
695            "--addr",
696            "127.0.0.1:4000",
697            "--oss",
698            "--oss-bucket",
699            "test-bucket",
700            "--oss-root",
701            "test-root",
702            "--oss-access-key-id",
703            "test-key-id",
704            "--oss-access-key-secret",
705            "test-secret",
706            "--oss-endpoint",
707            "https://oss.example.com",
708        ]);
709
710        let result = cmd.build().await;
711        assert!(result.is_ok());
712    }
713
714    #[tokio::test]
715    async fn test_export_command_build_with_gcs_success() {
716        let cmd = ExportCommand::parse_from([
717            "export",
718            "--addr",
719            "127.0.0.1:4000",
720            "--gcs",
721            "--gcs-bucket",
722            "test-bucket",
723            "--gcs-root",
724            "test-root",
725            "--gcs-scope",
726            "test-scope",
727            "--gcs-credential-path",
728            "/path/to/credential",
729            "--gcs-credential",
730            "test-credential-content",
731            "--gcs-endpoint",
732            "https://storage.googleapis.com",
733        ]);
734
735        let result = cmd.build().await;
736        assert!(result.is_ok());
737    }
738
739    #[tokio::test]
740    async fn test_export_command_build_with_gcs_adc_success() {
741        // Test GCS with Application Default Credentials (no explicit credentials provided)
742        let cmd = ExportCommand::parse_from([
743            "export",
744            "--addr",
745            "127.0.0.1:4000",
746            "--gcs",
747            "--gcs-bucket",
748            "test-bucket",
749            "--gcs-root",
750            "test-root",
751            "--gcs-scope",
752            "test-scope",
753            // No credential_path or credential
754            // No endpoint (optional)
755        ]);
756
757        let result = cmd.build().await;
758        assert!(result.is_ok());
759    }
760
761    #[tokio::test]
762    async fn test_export_command_build_with_azblob_success() {
763        let cmd = ExportCommand::parse_from([
764            "export",
765            "--addr",
766            "127.0.0.1:4000",
767            "--azblob",
768            "--azblob-container",
769            "test-container",
770            "--azblob-root",
771            "test-root",
772            "--azblob-account-name",
773            "test-account",
774            "--azblob-account-key",
775            MOCK_AZBLOB_ACCOUNT_KEY_B64,
776            "--azblob-endpoint",
777            "https://account.blob.core.windows.net",
778        ]);
779
780        let result = cmd.build().await;
781        assert!(result.is_ok());
782    }
783
784    #[tokio::test]
785    async fn test_export_command_build_with_azblob_with_sas_token() {
786        // Test Azure Blob with SAS token
787        let cmd = ExportCommand::parse_from([
788            "export",
789            "--addr",
790            "127.0.0.1:4000",
791            "--azblob",
792            "--azblob-container",
793            "test-container",
794            "--azblob-root",
795            "test-root",
796            "--azblob-account-name",
797            "test-account",
798            "--azblob-account-key",
799            MOCK_AZBLOB_ACCOUNT_KEY_B64,
800            "--azblob-endpoint",
801            "https://account.blob.core.windows.net",
802            "--azblob-sas-token",
803            "test-sas-token",
804        ]);
805
806        let result = cmd.build().await;
807        assert!(result.is_ok());
808    }
809
810    // ==================== Gap 1: Parse-time dependency checks ====================
811
812    #[test]
813    fn test_export_command_build_with_conflict() {
814        // Try to enable both S3 and OSS
815        let result =
816            ExportCommand::try_parse_from(["export", "--addr", "127.0.0.1:4000", "--s3", "--oss"]);
817
818        assert!(result.is_err());
819        let err = result.unwrap_err();
820        // clap error for conflicting arguments
821        assert!(err.kind() == clap::error::ErrorKind::ArgumentConflict);
822    }
823
824    #[tokio::test]
825    async fn test_export_command_build_with_s3_no_enable_flag() {
826        // Test that providing S3 config without --s3 flag fails
827        let result = ExportCommand::try_parse_from([
828            "export",
829            "--addr",
830            "127.0.0.1:4000",
831            // Note: no --s3 flag
832            "--s3-bucket",
833            "test-bucket",
834            "--s3-access-key-id",
835            "test-key",
836            "--output-dir",
837            "/tmp/test",
838        ]);
839
840        assert!(result.is_err());
841        let err = result.unwrap_err();
842        assert_eq!(err.kind(), clap::error::ErrorKind::MissingRequiredArgument);
843        assert!(err.to_string().contains("--s3"));
844    }
845
846    #[tokio::test]
847    async fn test_export_command_build_with_oss_no_enable_flag() {
848        // Test that providing OSS config without --oss flag fails at parse time
849        let result = ExportCommand::try_parse_from([
850            "export",
851            "--addr",
852            "127.0.0.1:4000",
853            "--oss-bucket",
854            "test-bucket",
855            "--output-dir",
856            "/tmp/test",
857        ]);
858
859        assert!(result.is_err());
860        let err = result.unwrap_err();
861        assert_eq!(err.kind(), clap::error::ErrorKind::MissingRequiredArgument);
862        assert!(err.to_string().contains("--oss"));
863    }
864
865    #[tokio::test]
866    async fn test_export_command_build_with_gcs_no_enable_flag() {
867        // Test that providing GCS config without --gcs flag fails at parse time
868        let result = ExportCommand::try_parse_from([
869            "export",
870            "--addr",
871            "127.0.0.1:4000",
872            "--gcs-bucket",
873            "test-bucket",
874            "--output-dir",
875            "/tmp/test",
876        ]);
877
878        assert!(result.is_err());
879        let err = result.unwrap_err();
880        assert_eq!(err.kind(), clap::error::ErrorKind::MissingRequiredArgument);
881        assert!(err.to_string().contains("--gcs"));
882    }
883
884    #[tokio::test]
885    async fn test_export_command_build_with_azblob_no_enable_flag() {
886        // Test that providing Azure Blob config without --azblob flag fails at parse time
887        let result = ExportCommand::try_parse_from([
888            "export",
889            "--addr",
890            "127.0.0.1:4000",
891            "--azblob-container",
892            "test-container",
893            "--output-dir",
894            "/tmp/test",
895        ]);
896
897        assert!(result.is_err());
898        let err = result.unwrap_err();
899        assert_eq!(err.kind(), clap::error::ErrorKind::MissingRequiredArgument);
900        assert!(err.to_string().contains("--azblob"));
901    }
902
903    // ==================== Gap 2: Empty string vs missing tests ====================
904
905    #[tokio::test]
906    async fn test_export_command_build_with_s3_empty_root() {
907        // Empty root should be allowed (it's optional path component)
908        let cmd = ExportCommand::parse_from([
909            "export",
910            "--addr",
911            "127.0.0.1:4000",
912            "--s3",
913            "--s3-bucket",
914            "test-bucket",
915            "--s3-root",
916            "", // Empty root is OK
917            "--s3-access-key-id",
918            "test-key",
919            "--s3-secret-access-key",
920            "test-secret",
921            "--s3-region",
922            "us-west-2",
923        ]);
924
925        let result = cmd.build().await;
926        // Should succeed because root is not a required field
927        assert!(
928            result.is_ok(),
929            "Expected success but got: {:?}",
930            result.err()
931        );
932    }
933
934    #[tokio::test]
935    async fn test_export_command_build_with_oss_empty_access_key_id() {
936        // Test OSS with empty access_key_id (empty string, not missing)
937        let cmd = ExportCommand::parse_from([
938            "export",
939            "--addr",
940            "127.0.0.1:4000",
941            "--oss",
942            "--oss-bucket",
943            "test-bucket",
944            "--oss-access-key-id",
945            "", // Empty string
946            "--oss-access-key-secret",
947            "test-secret",
948            "--oss-endpoint",
949            "https://oss.example.com",
950        ]);
951
952        let result = cmd.build().await;
953        assert!(result.is_err());
954        if let Err(err) = result {
955            assert!(
956                err.to_string().contains("OSS access key ID must be set"),
957                "Actual error: {}",
958                err
959            );
960        }
961    }
962
963    #[tokio::test]
964    async fn test_export_command_build_with_oss_missing_endpoint() {
965        // Missing endpoint
966        let cmd = ExportCommand::parse_from([
967            "export",
968            "--addr",
969            "127.0.0.1:4000",
970            "--oss",
971            "--oss-bucket",
972            "test-bucket",
973            "--oss-root",
974            "test-root",
975            "--oss-access-key-id",
976            "test-key-id",
977            "--oss-access-key-secret",
978            "test-secret",
979        ]);
980
981        let result = cmd.build().await;
982        assert!(result.is_err());
983        if let Err(err) = result {
984            assert!(
985                err.to_string().contains("OSS endpoint must be set"),
986                "Actual error: {}",
987                err
988            );
989        }
990    }
991
992    #[tokio::test]
993    async fn test_export_command_build_with_oss_multiple_missing_fields() {
994        // Test OSS with multiple missing required fields
995        let cmd = ExportCommand::parse_from([
996            "export",
997            "--addr",
998            "127.0.0.1:4000",
999            "--oss",
1000            "--oss-bucket",
1001            "test-bucket",
1002            // Missing: root, access_key_id, access_key_secret, endpoint
1003        ]);
1004
1005        let result = cmd.build().await;
1006        assert!(result.is_err());
1007        if let Err(err) = result {
1008            let err_str = err.to_string();
1009            // Should mention multiple missing fields
1010            assert!(
1011                err_str.contains("OSS"),
1012                "Error should mention OSS: {}",
1013                err_str
1014            );
1015            assert!(
1016                err_str.contains("must be set"),
1017                "Error should mention required fields: {}",
1018                err_str
1019            );
1020        }
1021    }
1022
1023    #[tokio::test]
1024    async fn test_export_command_build_with_gcs_empty_bucket() {
1025        // Test GCS with empty bucket
1026        let cmd = ExportCommand::parse_from([
1027            "export",
1028            "--addr",
1029            "127.0.0.1:4000",
1030            "--gcs",
1031            "--gcs-bucket",
1032            "", // Empty bucket
1033            "--gcs-root",
1034            "test-root",
1035            "--gcs-scope",
1036            "test-scope",
1037        ]);
1038
1039        let result = cmd.build().await;
1040        assert!(result.is_err());
1041        if let Err(err) = result {
1042            assert!(
1043                err.to_string().contains("GCS bucket must be set"),
1044                "Actual error: {}",
1045                err
1046            );
1047        }
1048    }
1049
1050    #[tokio::test]
1051    async fn test_export_command_build_with_gcs_empty_root() {
1052        // Test GCS when root is missing (should fail as it's required)
1053        let cmd = ExportCommand::parse_from([
1054            "export",
1055            "--addr",
1056            "127.0.0.1:4000",
1057            "--gcs",
1058            "--gcs-bucket",
1059            "test-bucket",
1060            "--gcs-root",
1061            "", // Empty root
1062            "--gcs-scope",
1063            "test-scope",
1064            "--gcs-credential-path",
1065            "/path/to/credential",
1066            "--gcs-credential",
1067            "test-credential",
1068            "--gcs-endpoint",
1069            "https://storage.googleapis.com",
1070        ]);
1071
1072        let result = cmd.build().await;
1073        assert!(result.is_err());
1074        if let Err(err) = result {
1075            assert!(
1076                err.to_string().contains("GCS root must be set"),
1077                "Actual error: {}",
1078                err
1079            );
1080        }
1081    }
1082
1083    #[tokio::test]
1084    async fn test_export_command_build_with_azblob_empty_account_name() {
1085        // Test Azure Blob with empty account_name
1086        let cmd = ExportCommand::parse_from([
1087            "export",
1088            "--addr",
1089            "127.0.0.1:4000",
1090            "--azblob",
1091            "--azblob-container",
1092            "test-container",
1093            "--azblob-root",
1094            "test-root",
1095            "--azblob-account-name",
1096            "", // Empty account name
1097            "--azblob-account-key",
1098            MOCK_AZBLOB_ACCOUNT_KEY_B64,
1099            "--azblob-endpoint",
1100            "https://account.blob.core.windows.net",
1101        ]);
1102
1103        let result = cmd.build().await;
1104        assert!(result.is_err());
1105        if let Err(err) = result {
1106            assert!(
1107                err.to_string().contains("AzBlob account name must be set"),
1108                "Actual error: {}",
1109                err
1110            );
1111        }
1112    }
1113
1114    #[tokio::test]
1115    async fn test_export_command_build_with_azblob_missing_account_key() {
1116        // Missing account key
1117        let cmd = ExportCommand::parse_from([
1118            "export",
1119            "--addr",
1120            "127.0.0.1:4000",
1121            "--azblob",
1122            "--azblob-container",
1123            "test-container",
1124            "--azblob-root",
1125            "test-root",
1126            "--azblob-account-name",
1127            "test-account",
1128            "--azblob-endpoint",
1129            "https://account.blob.core.windows.net",
1130        ]);
1131
1132        let result = cmd.build().await;
1133        assert!(result.is_err());
1134        if let Err(err) = result {
1135            assert!(
1136                err.to_string()
1137                    .contains("AzBlob account key (when sas_token is not provided) must be set"),
1138                "Actual error: {}",
1139                err
1140            );
1141        }
1142    }
1143
1144    // ==================== Gap 3: Boundary cases ====================
1145
1146    #[tokio::test]
1147    async fn test_export_command_build_with_no_storage() {
1148        // No output-dir and no backend - should fail
1149        let cmd = ExportCommand::parse_from(["export", "--addr", "127.0.0.1:4000"]);
1150
1151        let result = cmd.build().await;
1152        assert!(result.is_err());
1153        if let Err(err) = result {
1154            assert!(
1155                err.to_string().contains("Output directory not set"),
1156                "Actual error: {}",
1157                err
1158            );
1159        }
1160    }
1161
1162    #[tokio::test]
1163    async fn test_export_command_build_with_s3_minimal_config() {
1164        // S3 with only required fields (no optional fields)
1165        let cmd = ExportCommand::parse_from([
1166            "export",
1167            "--addr",
1168            "127.0.0.1:4000",
1169            "--s3",
1170            "--s3-bucket",
1171            "test-bucket",
1172            "--s3-access-key-id",
1173            "test-key",
1174            "--s3-secret-access-key",
1175            "test-secret",
1176            "--s3-region",
1177            "us-west-2",
1178            // No root, endpoint, or enable_virtual_host_style
1179        ]);
1180
1181        let result = cmd.build().await;
1182        assert!(result.is_ok(), "Minimal S3 config should succeed");
1183    }
1184
1185    #[tokio::test]
1186    async fn test_export_command_build_with_oss_minimal_config() {
1187        // OSS with only required fields
1188        let cmd = ExportCommand::parse_from([
1189            "export",
1190            "--addr",
1191            "127.0.0.1:4000",
1192            "--oss",
1193            "--oss-bucket",
1194            "test-bucket",
1195            "--oss-access-key-id",
1196            "test-key-id",
1197            "--oss-access-key-secret",
1198            "test-secret",
1199            "--oss-endpoint",
1200            "https://oss.example.com",
1201            // No root
1202        ]);
1203
1204        let result = cmd.build().await;
1205        assert!(result.is_ok(), "Minimal OSS config should succeed");
1206    }
1207
1208    #[tokio::test]
1209    async fn test_export_command_build_with_gcs_minimal_config() {
1210        // GCS with only required fields (using ADC)
1211        let cmd = ExportCommand::parse_from([
1212            "export",
1213            "--addr",
1214            "127.0.0.1:4000",
1215            "--gcs",
1216            "--gcs-bucket",
1217            "test-bucket",
1218            "--gcs-root",
1219            "test-root",
1220            "--gcs-scope",
1221            "test-scope",
1222            // No credential_path, credential, or endpoint
1223        ]);
1224
1225        let result = cmd.build().await;
1226        assert!(result.is_ok(), "Minimal GCS config should succeed");
1227    }
1228
1229    #[tokio::test]
1230    async fn test_export_command_build_with_azblob_minimal_config() {
1231        // Azure Blob with only required fields
1232        let cmd = ExportCommand::parse_from([
1233            "export",
1234            "--addr",
1235            "127.0.0.1:4000",
1236            "--azblob",
1237            "--azblob-container",
1238            "test-container",
1239            "--azblob-root",
1240            "test-root",
1241            "--azblob-account-name",
1242            "test-account",
1243            "--azblob-account-key",
1244            MOCK_AZBLOB_ACCOUNT_KEY_B64,
1245            "--azblob-endpoint",
1246            "https://account.blob.core.windows.net",
1247            // No sas_token
1248        ]);
1249
1250        let result = cmd.build().await;
1251        assert!(result.is_ok(), "Minimal AzBlob config should succeed");
1252    }
1253
1254    #[tokio::test]
1255    async fn test_export_command_build_with_local_and_s3() {
1256        // Both output-dir and S3 - S3 should take precedence
1257        let temp_dir = create_temp_dir("test_export_local_and_s3");
1258        let output_dir = temp_dir.path().to_str().unwrap();
1259
1260        let cmd = ExportCommand::parse_from([
1261            "export",
1262            "--addr",
1263            "127.0.0.1:4000",
1264            "--output-dir",
1265            output_dir,
1266            "--s3",
1267            "--s3-bucket",
1268            "test-bucket",
1269            "--s3-access-key-id",
1270            "test-key",
1271            "--s3-secret-access-key",
1272            "test-secret",
1273            "--s3-region",
1274            "us-west-2",
1275        ]);
1276
1277        let result = cmd.build().await;
1278        assert!(
1279            result.is_ok(),
1280            "S3 should be selected when both are provided"
1281        );
1282    }
1283
1284    // ==================== Gap 4: Custom validation (Azure Blob) ====================
1285
1286    #[tokio::test]
1287    async fn test_export_command_build_with_azblob_only_sas_token() {
1288        // Azure Blob with sas_token but no account_key - should succeed
1289        let cmd = ExportCommand::parse_from([
1290            "export",
1291            "--addr",
1292            "127.0.0.1:4000",
1293            "--azblob",
1294            "--azblob-container",
1295            "test-container",
1296            "--azblob-root",
1297            "test-root",
1298            "--azblob-account-name",
1299            "test-account",
1300            "--azblob-endpoint",
1301            "https://account.blob.core.windows.net",
1302            "--azblob-sas-token",
1303            "test-sas-token",
1304            // No account_key
1305        ]);
1306
1307        let result = cmd.build().await;
1308        assert!(
1309            result.is_ok(),
1310            "AzBlob with only sas_token should succeed: {:?}",
1311            result.err()
1312        );
1313    }
1314
1315    #[tokio::test]
1316    async fn test_export_command_build_with_azblob_empty_account_key_with_sas() {
1317        // Azure Blob with empty account_key but valid sas_token - should succeed
1318        let cmd = ExportCommand::parse_from([
1319            "export",
1320            "--addr",
1321            "127.0.0.1:4000",
1322            "--azblob",
1323            "--azblob-container",
1324            "test-container",
1325            "--azblob-root",
1326            "test-root",
1327            "--azblob-account-name",
1328            "test-account",
1329            "--azblob-account-key",
1330            "", // Empty account_key is OK if sas_token is provided
1331            "--azblob-endpoint",
1332            "https://account.blob.core.windows.net",
1333            "--azblob-sas-token",
1334            "test-sas-token",
1335        ]);
1336
1337        let result = cmd.build().await;
1338        assert!(
1339            result.is_ok(),
1340            "AzBlob with empty account_key but sas_token should succeed: {:?}",
1341            result.err()
1342        );
1343    }
1344}