Skip to main content

cli/data/export_v2/
command.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Export V2 CLI commands.
16
17use std::collections::HashSet;
18use std::time::Duration;
19
20use async_trait::async_trait;
21use clap::{Parser, Subcommand};
22use common_error::ext::BoxedError;
23use common_telemetry::info;
24use serde_json::Value;
25use snafu::{OptionExt, ResultExt};
26
27use crate::Tool;
28use crate::common::ObjectStoreConfig;
29use crate::data::export_v2::coordinator::export_data;
30use crate::data::export_v2::error::{
31    ChunkTimeWindowRequiresBoundsSnafu, DatabaseSnafu, EmptyResultSnafu,
32    ManifestVersionMismatchSnafu, Result, ResumeConfigMismatchSnafu, SchemaOnlyArgsNotAllowedSnafu,
33    SchemaOnlyModeMismatchSnafu, UnexpectedValueTypeSnafu,
34};
35use crate::data::export_v2::extractor::SchemaExtractor;
36use crate::data::export_v2::manifest::{
37    ChunkMeta, DataFormat, MANIFEST_VERSION, Manifest, TimeRange,
38};
39use crate::data::path::ddl_path_for_schema;
40use crate::data::snapshot_storage::{OpenDalStorage, SnapshotStorage, validate_uri};
41use crate::data::sql::{escape_sql_identifier, escape_sql_literal};
42use crate::database::{DatabaseClient, parse_proxy_opts};
43
44/// Export V2 commands.
45#[derive(Debug, Subcommand)]
46pub enum ExportV2Command {
47    /// Create a new snapshot.
48    Create(ExportCreateCommand),
49}
50
51impl ExportV2Command {
52    pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
53        match self {
54            ExportV2Command::Create(cmd) => cmd.build().await,
55        }
56    }
57}
58
59/// Create a new snapshot.
60#[derive(Debug, Parser)]
61pub struct ExportCreateCommand {
62    /// Server address to connect (e.g., 127.0.0.1:4000).
63    #[clap(long)]
64    addr: String,
65
66    /// Target storage location (e.g., s3://bucket/path, file:///tmp/backup).
67    #[clap(long)]
68    to: String,
69
70    /// Catalog name.
71    #[clap(long, default_value = "greptime")]
72    catalog: String,
73
74    /// Schema list to export (default: all non-system schemas).
75    /// Can be specified multiple times or comma-separated.
76    #[clap(long, value_delimiter = ',')]
77    schemas: Vec<String>,
78
79    /// Export schema only, no data.
80    #[clap(long)]
81    schema_only: bool,
82
83    /// Time range start (ISO 8601 format, e.g., 2024-01-01T00:00:00Z).
84    #[clap(long)]
85    start_time: Option<String>,
86
87    /// Time range end (ISO 8601 format, e.g., 2024-12-31T23:59:59Z).
88    #[clap(long)]
89    end_time: Option<String>,
90
91    /// Chunk time window (e.g., 1h, 6h, 1d, 7d).
92    /// Requires both --start-time and --end-time when specified.
93    #[clap(long, value_parser = humantime::parse_duration)]
94    chunk_time_window: Option<Duration>,
95
96    /// Data format: parquet, csv, json.
97    #[clap(long, value_enum, default_value = "parquet")]
98    format: DataFormat,
99
100    /// Delete existing snapshot and recreate.
101    #[clap(long)]
102    force: bool,
103
104    /// Parallelism for COPY DATABASE execution (server-side, per schema per chunk).
105    #[clap(long, default_value = "1")]
106    parallelism: usize,
107
108    /// Basic authentication (user:password).
109    #[clap(long)]
110    auth_basic: Option<String>,
111
112    /// Request timeout.
113    #[clap(long, value_parser = humantime::parse_duration)]
114    timeout: Option<Duration>,
115
116    /// Proxy server address.
117    ///
118    /// If set, it overrides the system proxy unless `--no-proxy` is specified.
119    /// If neither `--proxy` nor `--no-proxy` is set, system proxy (env) may be used.
120    #[clap(long)]
121    proxy: Option<String>,
122
123    /// Disable all proxy usage (ignores `--proxy` and system proxy).
124    ///
125    /// When set and `--proxy` is not provided, this explicitly disables system proxy.
126    #[clap(long)]
127    no_proxy: bool,
128
129    /// Object store configuration for remote storage backends.
130    #[clap(flatten)]
131    storage: ObjectStoreConfig,
132}
133
134impl ExportCreateCommand {
135    pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
136        // Validate URI format
137        validate_uri(&self.to).map_err(BoxedError::new)?;
138
139        let time_range = TimeRange::parse(self.start_time.as_deref(), self.end_time.as_deref())
140            .map_err(BoxedError::new)?;
141        if self.chunk_time_window.is_some() && !time_range.is_bounded() {
142            return ChunkTimeWindowRequiresBoundsSnafu
143                .fail()
144                .map_err(BoxedError::new);
145        }
146        if self.schema_only {
147            let mut invalid_args = Vec::new();
148            if self.start_time.is_some() {
149                invalid_args.push("--start-time");
150            }
151            if self.end_time.is_some() {
152                invalid_args.push("--end-time");
153            }
154            if self.chunk_time_window.is_some() {
155                invalid_args.push("--chunk-time-window");
156            }
157            if self.format != DataFormat::Parquet {
158                invalid_args.push("--format");
159            }
160            if self.parallelism != 1 {
161                invalid_args.push("--parallelism");
162            }
163            if !invalid_args.is_empty() {
164                return SchemaOnlyArgsNotAllowedSnafu {
165                    args: invalid_args.join(", "),
166                }
167                .fail()
168                .map_err(BoxedError::new);
169            }
170        }
171
172        // Parse schemas (empty vec means all schemas)
173        let schemas = if self.schemas.is_empty() {
174            None
175        } else {
176            Some(self.schemas.clone())
177        };
178
179        // Build storage
180        let storage = OpenDalStorage::from_uri(&self.to, &self.storage).map_err(BoxedError::new)?;
181
182        // Build database client
183        let proxy = parse_proxy_opts(self.proxy.clone(), self.no_proxy)?;
184        let database_client = DatabaseClient::new(
185            self.addr.clone(),
186            self.catalog.clone(),
187            self.auth_basic.clone(),
188            self.timeout.unwrap_or(Duration::from_secs(60)),
189            proxy,
190            self.no_proxy,
191        );
192
193        Ok(Box::new(ExportCreate {
194            config: ExportConfig {
195                catalog: self.catalog.clone(),
196                schemas,
197                schema_only: self.schema_only,
198                format: self.format,
199                force: self.force,
200                time_range,
201                chunk_time_window: self.chunk_time_window,
202                parallelism: self.parallelism,
203                snapshot_uri: self.to.clone(),
204                storage_config: self.storage.clone(),
205            },
206            storage: Box::new(storage),
207            database_client,
208        }))
209    }
210}
211
212/// Export tool implementation.
213pub struct ExportCreate {
214    config: ExportConfig,
215    storage: Box<dyn SnapshotStorage>,
216    database_client: DatabaseClient,
217}
218
219struct ExportConfig {
220    catalog: String,
221    schemas: Option<Vec<String>>,
222    schema_only: bool,
223    format: DataFormat,
224    force: bool,
225    time_range: TimeRange,
226    chunk_time_window: Option<Duration>,
227    parallelism: usize,
228    snapshot_uri: String,
229    storage_config: ObjectStoreConfig,
230}
231
232#[async_trait]
233impl Tool for ExportCreate {
234    async fn do_work(&self) -> std::result::Result<(), BoxedError> {
235        self.run().await.map_err(BoxedError::new)
236    }
237}
238
239impl ExportCreate {
240    async fn run(&self) -> Result<()> {
241        // 1. Check if snapshot exists
242        let exists = self.storage.exists().await?;
243
244        if exists {
245            if self.config.force {
246                info!("Deleting existing snapshot (--force)");
247                self.storage.delete_snapshot().await?;
248            } else {
249                // Resume mode - read existing manifest
250                let mut manifest = self.storage.read_manifest().await?;
251
252                // Check version compatibility
253                if manifest.version != MANIFEST_VERSION {
254                    return ManifestVersionMismatchSnafu {
255                        expected: MANIFEST_VERSION,
256                        found: manifest.version,
257                    }
258                    .fail();
259                }
260
261                validate_resume_config(&manifest, &self.config)?;
262
263                info!(
264                    "Resuming existing snapshot: {} (completed: {}/{} chunks)",
265                    manifest.snapshot_id,
266                    manifest.completed_count(),
267                    manifest.chunks.len()
268                );
269
270                if manifest.is_complete() {
271                    info!("Snapshot is already complete");
272                    return Ok(());
273                }
274
275                if manifest.schema_only {
276                    return Ok(());
277                }
278
279                export_data(
280                    self.storage.as_ref(),
281                    &self.database_client,
282                    &self.config.snapshot_uri,
283                    &self.config.storage_config,
284                    &mut manifest,
285                    self.config.parallelism,
286                )
287                .await?;
288                return Ok(());
289            }
290        }
291
292        // 2. Get schema list
293        let extractor = SchemaExtractor::new(&self.database_client, &self.config.catalog);
294        let schema_snapshot = extractor.extract(self.config.schemas.as_deref()).await?;
295
296        let schema_names: Vec<String> = schema_snapshot
297            .schemas
298            .iter()
299            .map(|s| s.name.clone())
300            .collect();
301        info!("Exporting schemas: {:?}", schema_names);
302
303        // 3. Create manifest
304        let mut manifest = Manifest::new_for_export(
305            self.config.catalog.clone(),
306            schema_names.clone(),
307            self.config.schema_only,
308            self.config.time_range.clone(),
309            self.config.format,
310            self.config.chunk_time_window,
311        )?;
312
313        // 4. Write schema files
314        self.storage.write_schema(&schema_snapshot).await?;
315        info!("Exported {} schemas", schema_snapshot.schemas.len());
316
317        // 5. Export DDL files for import recovery.
318        let ddl_by_schema = self.build_ddl_by_schema(&schema_names).await?;
319        for (schema, ddl) in ddl_by_schema {
320            let ddl_path = ddl_path_for_schema(&schema);
321            self.storage.write_text(&ddl_path, &ddl).await?;
322            info!("Exported DDL for schema {} to {}", schema, ddl_path);
323        }
324
325        // 6. Write manifest after schema artifacts and before any data export.
326        //
327        // The manifest is the snapshot commit point: only write it after the schema
328        // index and all DDL files are durable, so a crash cannot leave a "valid"
329        // snapshot that is missing required schema artifacts. For full exports we
330        // still need the manifest before data copy starts, because chunk resume is
331        // tracked by updating this manifest in place.
332        self.storage.write_manifest(&manifest).await?;
333        info!("Snapshot created: {}", manifest.snapshot_id);
334
335        if !self.config.schema_only {
336            export_data(
337                self.storage.as_ref(),
338                &self.database_client,
339                &self.config.snapshot_uri,
340                &self.config.storage_config,
341                &mut manifest,
342                self.config.parallelism,
343            )
344            .await?;
345        }
346
347        Ok(())
348    }
349
350    async fn build_ddl_by_schema(&self, schema_names: &[String]) -> Result<Vec<(String, String)>> {
351        let mut schemas = schema_names.to_vec();
352        schemas.sort();
353
354        let mut ddl_by_schema = Vec::with_capacity(schemas.len());
355        for schema in schemas {
356            let create_database = self.show_create("DATABASE", &schema, None).await?;
357
358            let (mut physical_tables, mut tables, mut views) =
359                self.get_schema_objects(&schema).await?;
360            physical_tables.sort();
361            let mut physical_ddls = Vec::with_capacity(physical_tables.len());
362            for table in physical_tables {
363                physical_ddls.push(self.show_create("TABLE", &schema, Some(&table)).await?);
364            }
365
366            tables.sort();
367            let mut table_ddls = Vec::with_capacity(tables.len());
368            for table in tables {
369                table_ddls.push(self.show_create("TABLE", &schema, Some(&table)).await?);
370            }
371
372            views.sort();
373            let mut view_ddls = Vec::with_capacity(views.len());
374            for view in views {
375                view_ddls.push(self.show_create("VIEW", &schema, Some(&view)).await?);
376            }
377
378            let ddl = build_schema_ddl(
379                &schema,
380                create_database,
381                physical_ddls,
382                table_ddls,
383                view_ddls,
384            );
385            ddl_by_schema.push((schema, ddl));
386        }
387
388        Ok(ddl_by_schema)
389    }
390
391    async fn get_schema_objects(
392        &self,
393        schema: &str,
394    ) -> Result<(Vec<String>, Vec<String>, Vec<String>)> {
395        let physical_tables = self.get_metric_physical_tables(schema).await?;
396        let physical_set: HashSet<&str> = physical_tables.iter().map(String::as_str).collect();
397        let sql = format!(
398            "SELECT table_name, table_type FROM information_schema.tables \
399             WHERE table_catalog = '{}' AND table_schema = '{}' \
400             AND (table_type = 'BASE TABLE' OR table_type = 'VIEW')",
401            escape_sql_literal(&self.config.catalog),
402            escape_sql_literal(schema)
403        );
404        let records: Option<Vec<Vec<Value>>> = self
405            .database_client
406            .sql_in_public(&sql)
407            .await
408            .context(DatabaseSnafu)?;
409
410        let mut tables = Vec::new();
411        let mut views = Vec::new();
412        if let Some(rows) = records {
413            for row in rows {
414                let name = match row.first() {
415                    Some(Value::String(name)) => name.clone(),
416                    _ => return UnexpectedValueTypeSnafu.fail(),
417                };
418                let table_type = match row.get(1) {
419                    Some(Value::String(table_type)) => table_type.as_str(),
420                    _ => return UnexpectedValueTypeSnafu.fail(),
421                };
422                if !physical_set.contains(name.as_str()) {
423                    if table_type == "VIEW" {
424                        views.push(name);
425                    } else {
426                        tables.push(name);
427                    }
428                }
429            }
430        }
431
432        Ok((physical_tables, tables, views))
433    }
434
435    async fn get_metric_physical_tables(&self, schema: &str) -> Result<Vec<String>> {
436        let sql = format!(
437            "SELECT DISTINCT table_name FROM information_schema.columns \
438             WHERE table_catalog = '{}' AND table_schema = '{}' AND column_name = '__tsid'",
439            escape_sql_literal(&self.config.catalog),
440            escape_sql_literal(schema)
441        );
442        let records: Option<Vec<Vec<Value>>> = self
443            .database_client
444            .sql_in_public(&sql)
445            .await
446            .context(DatabaseSnafu)?;
447
448        let mut tables = HashSet::new();
449        if let Some(rows) = records {
450            for row in rows {
451                let name = match row.first() {
452                    Some(Value::String(name)) => name.clone(),
453                    _ => return UnexpectedValueTypeSnafu.fail(),
454                };
455                tables.insert(name);
456            }
457        }
458
459        Ok(tables.into_iter().collect())
460    }
461
462    async fn show_create(
463        &self,
464        show_type: &str,
465        schema: &str,
466        table: Option<&str>,
467    ) -> Result<String> {
468        let sql = match table {
469            Some(table) => format!(
470                r#"SHOW CREATE {} "{}"."{}"."{}""#,
471                show_type,
472                escape_sql_identifier(&self.config.catalog),
473                escape_sql_identifier(schema),
474                escape_sql_identifier(table)
475            ),
476            None => format!(
477                r#"SHOW CREATE {} "{}"."{}""#,
478                show_type,
479                escape_sql_identifier(&self.config.catalog),
480                escape_sql_identifier(schema)
481            ),
482        };
483
484        let records: Option<Vec<Vec<Value>>> = self
485            .database_client
486            .sql_in_public(&sql)
487            .await
488            .context(DatabaseSnafu)?;
489        let rows = records.context(EmptyResultSnafu)?;
490        let row = rows.first().context(EmptyResultSnafu)?;
491        let Some(Value::String(create)) = row.get(1) else {
492            return UnexpectedValueTypeSnafu.fail();
493        };
494
495        Ok(format!("{};\n", create))
496    }
497}
498
499fn build_schema_ddl(
500    schema: &str,
501    create_database: String,
502    physical_tables: Vec<String>,
503    tables: Vec<String>,
504    views: Vec<String>,
505) -> String {
506    let mut ddl = String::new();
507    ddl.push_str(&format!("-- Schema: {}\n", schema));
508    ddl.push_str(&create_database);
509    for stmt in physical_tables {
510        ddl.push_str(&stmt);
511    }
512    for stmt in tables {
513        ddl.push_str(&stmt);
514    }
515    for stmt in views {
516        ddl.push_str(&stmt);
517    }
518    ddl.push('\n');
519    ddl
520}
521
522fn validate_resume_config(manifest: &Manifest, config: &ExportConfig) -> Result<()> {
523    if manifest.schema_only != config.schema_only {
524        return SchemaOnlyModeMismatchSnafu {
525            existing_schema_only: manifest.schema_only,
526            requested_schema_only: config.schema_only,
527        }
528        .fail();
529    }
530
531    if manifest.catalog != config.catalog {
532        return ResumeConfigMismatchSnafu {
533            field: "catalog",
534            existing: manifest.catalog.clone(),
535            requested: config.catalog.clone(),
536        }
537        .fail();
538    }
539
540    // If no schema filter is provided on resume, inherit the existing snapshot
541    // selection instead of reinterpreting the request as "all schemas".
542    if let Some(requested_schemas) = &config.schemas
543        && !schema_selection_matches(&manifest.schemas, requested_schemas)
544    {
545        return ResumeConfigMismatchSnafu {
546            field: "schemas",
547            existing: format_schema_selection(&manifest.schemas),
548            requested: format_schema_selection(requested_schemas),
549        }
550        .fail();
551    }
552
553    if manifest.time_range != config.time_range {
554        return ResumeConfigMismatchSnafu {
555            field: "time_range",
556            existing: format!("{:?}", manifest.time_range),
557            requested: format!("{:?}", config.time_range),
558        }
559        .fail();
560    }
561
562    if manifest.format != config.format {
563        return ResumeConfigMismatchSnafu {
564            field: "format",
565            existing: manifest.format.to_string(),
566            requested: config.format.to_string(),
567        }
568        .fail();
569    }
570
571    let expected_plan = Manifest::new_for_export(
572        manifest.catalog.clone(),
573        manifest.schemas.clone(),
574        config.schema_only,
575        config.time_range.clone(),
576        config.format,
577        config.chunk_time_window,
578    )?;
579    if !chunk_plan_matches(manifest, &expected_plan) {
580        return ResumeConfigMismatchSnafu {
581            field: "chunk plan",
582            existing: format_chunk_plan(&manifest.chunks),
583            requested: format_chunk_plan(&expected_plan.chunks),
584        }
585        .fail();
586    }
587
588    Ok(())
589}
590
591fn schema_selection_matches(existing: &[String], requested: &[String]) -> bool {
592    canonical_schema_selection(existing) == canonical_schema_selection(requested)
593}
594
595fn canonical_schema_selection(schemas: &[String]) -> Vec<String> {
596    let mut canonicalized = Vec::new();
597    let mut seen = HashSet::new();
598
599    for schema in schemas {
600        let normalized = schema.to_ascii_lowercase();
601        if seen.insert(normalized.clone()) {
602            canonicalized.push(normalized);
603        }
604    }
605
606    canonicalized.sort();
607    canonicalized
608}
609
610fn format_schema_selection(schemas: &[String]) -> String {
611    format!("[{}]", schemas.join(", "))
612}
613
614fn chunk_plan_matches(existing: &Manifest, expected: &Manifest) -> bool {
615    existing.chunks.len() == expected.chunks.len()
616        && existing
617            .chunks
618            .iter()
619            .zip(&expected.chunks)
620            .all(|(left, right)| left.id == right.id && left.time_range == right.time_range)
621}
622
623fn format_chunk_plan(chunks: &[ChunkMeta]) -> String {
624    let items = chunks
625        .iter()
626        .map(|chunk| format!("#{}:{:?}", chunk.id, chunk.time_range))
627        .collect::<Vec<_>>();
628    format!("[{}]", items.join(", "))
629}
630
631#[cfg(test)]
632mod tests {
633    use chrono::TimeZone;
634    use clap::Parser;
635
636    use super::*;
637    use crate::data::path::ddl_path_for_schema;
638
639    #[test]
640    fn test_ddl_path_for_schema() {
641        assert_eq!(ddl_path_for_schema("public"), "schema/ddl/public.sql");
642        assert_eq!(
643            ddl_path_for_schema("../evil"),
644            "schema/ddl/%2E%2E%2Fevil.sql"
645        );
646    }
647
648    #[test]
649    fn test_build_schema_ddl_order() {
650        let ddl = build_schema_ddl(
651            "public",
652            "CREATE DATABASE public;\n".to_string(),
653            vec!["PHYSICAL;\n".to_string()],
654            vec!["TABLE;\n".to_string()],
655            vec!["VIEW;\n".to_string()],
656        );
657
658        let db_pos = ddl.find("CREATE DATABASE").unwrap();
659        let physical_pos = ddl.find("PHYSICAL;").unwrap();
660        let table_pos = ddl.find("TABLE;").unwrap();
661        let view_pos = ddl.find("VIEW;").unwrap();
662        assert!(db_pos < physical_pos);
663        assert!(physical_pos < table_pos);
664        assert!(table_pos < view_pos);
665    }
666
667    #[tokio::test]
668    async fn test_build_rejects_chunk_window_without_bounds() {
669        let cmd = ExportCreateCommand::parse_from([
670            "export-v2-create",
671            "--addr",
672            "127.0.0.1:4000",
673            "--to",
674            "file:///tmp/export-v2-test",
675            "--chunk-time-window",
676            "1h",
677        ]);
678
679        let result = cmd.build().await;
680        assert!(result.is_err());
681        let error = result.err().unwrap().to_string();
682
683        assert!(error.contains("chunk_time_window requires both --start-time and --end-time"));
684    }
685
686    #[tokio::test]
687    async fn test_build_rejects_data_export_args_in_schema_only_mode() {
688        let cmd = ExportCreateCommand::parse_from([
689            "export-v2-create",
690            "--addr",
691            "127.0.0.1:4000",
692            "--to",
693            "file:///tmp/export-v2-test",
694            "--schema-only",
695            "--start-time",
696            "2024-01-01T00:00:00Z",
697            "--end-time",
698            "2024-01-02T00:00:00Z",
699            "--chunk-time-window",
700            "1h",
701            "--format",
702            "csv",
703            "--parallelism",
704            "2",
705        ]);
706
707        let error = cmd.build().await.err().unwrap().to_string();
708
709        assert!(error.contains("--schema-only cannot be used with data export arguments"));
710        assert!(error.contains("--start-time"));
711        assert!(error.contains("--end-time"));
712        assert!(error.contains("--chunk-time-window"));
713        assert!(error.contains("--format"));
714        assert!(error.contains("--parallelism"));
715    }
716
717    #[test]
718    fn test_schema_only_mode_mismatch_error_message() {
719        let error = crate::data::export_v2::error::SchemaOnlyModeMismatchSnafu {
720            existing_schema_only: false,
721            requested_schema_only: true,
722        }
723        .build()
724        .to_string();
725
726        assert!(error.contains("existing: false"));
727        assert!(error.contains("requested: true"));
728    }
729
730    #[test]
731    fn test_validate_resume_config_rejects_catalog_mismatch() {
732        let manifest = Manifest::new_for_export(
733            "greptime".to_string(),
734            vec!["public".to_string()],
735            false,
736            TimeRange::unbounded(),
737            DataFormat::Parquet,
738            None,
739        )
740        .unwrap();
741        let config = ExportConfig {
742            catalog: "other".to_string(),
743            schemas: None,
744            schema_only: false,
745            format: DataFormat::Parquet,
746            force: false,
747            time_range: TimeRange::unbounded(),
748            chunk_time_window: None,
749            parallelism: 1,
750            snapshot_uri: "file:///tmp/snapshot".to_string(),
751            storage_config: ObjectStoreConfig::default(),
752        };
753
754        let error = validate_resume_config(&manifest, &config)
755            .err()
756            .unwrap()
757            .to_string();
758        assert!(error.contains("catalog"));
759    }
760
761    #[test]
762    fn test_validate_resume_config_accepts_schema_selection_with_different_case_and_order() {
763        let manifest = Manifest::new_for_export(
764            "greptime".to_string(),
765            vec!["public".to_string(), "analytics".to_string()],
766            false,
767            TimeRange::unbounded(),
768            DataFormat::Parquet,
769            None,
770        )
771        .unwrap();
772        let config = ExportConfig {
773            catalog: "greptime".to_string(),
774            schemas: Some(vec![
775                "ANALYTICS".to_string(),
776                "PUBLIC".to_string(),
777                "public".to_string(),
778            ]),
779            schema_only: false,
780            format: DataFormat::Parquet,
781            force: false,
782            time_range: TimeRange::unbounded(),
783            chunk_time_window: None,
784            parallelism: 1,
785            snapshot_uri: "file:///tmp/snapshot".to_string(),
786            storage_config: ObjectStoreConfig::default(),
787        };
788
789        assert!(validate_resume_config(&manifest, &config).is_ok());
790    }
791
792    #[test]
793    fn test_validate_resume_config_rejects_chunk_plan_mismatch() {
794        let start = chrono::Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
795        let end = chrono::Utc.with_ymd_and_hms(2025, 1, 1, 2, 0, 0).unwrap();
796        let time_range = TimeRange::new(Some(start), Some(end));
797        let manifest = Manifest::new_for_export(
798            "greptime".to_string(),
799            vec!["public".to_string()],
800            false,
801            time_range.clone(),
802            DataFormat::Parquet,
803            None,
804        )
805        .unwrap();
806        let config = ExportConfig {
807            catalog: "greptime".to_string(),
808            schemas: None,
809            schema_only: false,
810            format: DataFormat::Parquet,
811            force: false,
812            time_range,
813            chunk_time_window: Some(Duration::from_secs(3600)),
814            parallelism: 1,
815            snapshot_uri: "file:///tmp/snapshot".to_string(),
816            storage_config: ObjectStoreConfig::default(),
817        };
818
819        let error = validate_resume_config(&manifest, &config)
820            .err()
821            .unwrap()
822            .to_string();
823        assert!(error.contains("chunk plan"));
824    }
825
826    #[test]
827    fn test_validate_resume_config_rejects_format_mismatch() {
828        let manifest = Manifest::new_for_export(
829            "greptime".to_string(),
830            vec!["public".to_string()],
831            false,
832            TimeRange::unbounded(),
833            DataFormat::Parquet,
834            None,
835        )
836        .unwrap();
837        let config = ExportConfig {
838            catalog: "greptime".to_string(),
839            schemas: None,
840            schema_only: false,
841            format: DataFormat::Csv,
842            force: false,
843            time_range: TimeRange::unbounded(),
844            chunk_time_window: None,
845            parallelism: 1,
846            snapshot_uri: "file:///tmp/snapshot".to_string(),
847            storage_config: ObjectStoreConfig::default(),
848        };
849
850        let error = validate_resume_config(&manifest, &config)
851            .err()
852            .unwrap()
853            .to_string();
854        assert!(error.contains("format"));
855    }
856
857    #[test]
858    fn test_validate_resume_config_rejects_time_range_mismatch() {
859        let start = chrono::Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
860        let end = chrono::Utc.with_ymd_and_hms(2025, 1, 1, 1, 0, 0).unwrap();
861        let manifest = Manifest::new_for_export(
862            "greptime".to_string(),
863            vec!["public".to_string()],
864            false,
865            TimeRange::new(Some(start), Some(end)),
866            DataFormat::Parquet,
867            None,
868        )
869        .unwrap();
870        let config = ExportConfig {
871            catalog: "greptime".to_string(),
872            schemas: None,
873            schema_only: false,
874            format: DataFormat::Parquet,
875            force: false,
876            time_range: TimeRange::new(Some(start), Some(start)),
877            chunk_time_window: None,
878            parallelism: 1,
879            snapshot_uri: "file:///tmp/snapshot".to_string(),
880            storage_config: ObjectStoreConfig::default(),
881        };
882
883        let error = validate_resume_config(&manifest, &config)
884            .err()
885            .unwrap()
886            .to_string();
887        assert!(error.contains("time_range"));
888    }
889}