1use std::collections::HashSet;
18use std::time::Duration;
19
20use async_trait::async_trait;
21use clap::{Parser, Subcommand};
22use common_error::ext::BoxedError;
23use common_telemetry::info;
24use serde_json::Value;
25use snafu::{OptionExt, ResultExt};
26
27use crate::Tool;
28use crate::common::ObjectStoreConfig;
29use crate::data::export_v2::coordinator::export_data;
30use crate::data::export_v2::error::{
31 ChunkTimeWindowRequiresBoundsSnafu, DatabaseSnafu, EmptyResultSnafu,
32 ManifestVersionMismatchSnafu, Result, ResumeConfigMismatchSnafu, SchemaOnlyArgsNotAllowedSnafu,
33 SchemaOnlyModeMismatchSnafu, UnexpectedValueTypeSnafu,
34};
35use crate::data::export_v2::extractor::SchemaExtractor;
36use crate::data::export_v2::manifest::{
37 ChunkMeta, DataFormat, MANIFEST_VERSION, Manifest, TimeRange,
38};
39use crate::data::path::ddl_path_for_schema;
40use crate::data::snapshot_storage::{OpenDalStorage, SnapshotStorage, validate_uri};
41use crate::data::sql::{escape_sql_identifier, escape_sql_literal};
42use crate::database::{DatabaseClient, parse_proxy_opts};
43
44#[derive(Debug, Subcommand)]
46pub enum ExportV2Command {
47 Create(ExportCreateCommand),
49}
50
51impl ExportV2Command {
52 pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
53 match self {
54 ExportV2Command::Create(cmd) => cmd.build().await,
55 }
56 }
57}
58
59#[derive(Debug, Parser)]
61pub struct ExportCreateCommand {
62 #[clap(long)]
64 addr: String,
65
66 #[clap(long)]
68 to: String,
69
70 #[clap(long, default_value = "greptime")]
72 catalog: String,
73
74 #[clap(long, value_delimiter = ',')]
77 schemas: Vec<String>,
78
79 #[clap(long)]
81 schema_only: bool,
82
83 #[clap(long)]
85 start_time: Option<String>,
86
87 #[clap(long)]
89 end_time: Option<String>,
90
91 #[clap(long, value_parser = humantime::parse_duration)]
94 chunk_time_window: Option<Duration>,
95
96 #[clap(long, value_enum, default_value = "parquet")]
98 format: DataFormat,
99
100 #[clap(long)]
102 force: bool,
103
104 #[clap(long, default_value = "1")]
106 parallelism: usize,
107
108 #[clap(long)]
110 auth_basic: Option<String>,
111
112 #[clap(long, value_parser = humantime::parse_duration)]
114 timeout: Option<Duration>,
115
116 #[clap(long)]
121 proxy: Option<String>,
122
123 #[clap(long)]
127 no_proxy: bool,
128
129 #[clap(flatten)]
131 storage: ObjectStoreConfig,
132}
133
134impl ExportCreateCommand {
135 pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
136 validate_uri(&self.to).map_err(BoxedError::new)?;
138
139 let time_range = TimeRange::parse(self.start_time.as_deref(), self.end_time.as_deref())
140 .map_err(BoxedError::new)?;
141 if self.chunk_time_window.is_some() && !time_range.is_bounded() {
142 return ChunkTimeWindowRequiresBoundsSnafu
143 .fail()
144 .map_err(BoxedError::new);
145 }
146 if self.schema_only {
147 let mut invalid_args = Vec::new();
148 if self.start_time.is_some() {
149 invalid_args.push("--start-time");
150 }
151 if self.end_time.is_some() {
152 invalid_args.push("--end-time");
153 }
154 if self.chunk_time_window.is_some() {
155 invalid_args.push("--chunk-time-window");
156 }
157 if self.format != DataFormat::Parquet {
158 invalid_args.push("--format");
159 }
160 if self.parallelism != 1 {
161 invalid_args.push("--parallelism");
162 }
163 if !invalid_args.is_empty() {
164 return SchemaOnlyArgsNotAllowedSnafu {
165 args: invalid_args.join(", "),
166 }
167 .fail()
168 .map_err(BoxedError::new);
169 }
170 }
171
172 let schemas = if self.schemas.is_empty() {
174 None
175 } else {
176 Some(self.schemas.clone())
177 };
178
179 let storage = OpenDalStorage::from_uri(&self.to, &self.storage).map_err(BoxedError::new)?;
181
182 let proxy = parse_proxy_opts(self.proxy.clone(), self.no_proxy)?;
184 let database_client = DatabaseClient::new(
185 self.addr.clone(),
186 self.catalog.clone(),
187 self.auth_basic.clone(),
188 self.timeout.unwrap_or(Duration::from_secs(60)),
189 proxy,
190 self.no_proxy,
191 );
192
193 Ok(Box::new(ExportCreate {
194 config: ExportConfig {
195 catalog: self.catalog.clone(),
196 schemas,
197 schema_only: self.schema_only,
198 format: self.format,
199 force: self.force,
200 time_range,
201 chunk_time_window: self.chunk_time_window,
202 parallelism: self.parallelism,
203 snapshot_uri: self.to.clone(),
204 storage_config: self.storage.clone(),
205 },
206 storage: Box::new(storage),
207 database_client,
208 }))
209 }
210}
211
212pub struct ExportCreate {
214 config: ExportConfig,
215 storage: Box<dyn SnapshotStorage>,
216 database_client: DatabaseClient,
217}
218
219struct ExportConfig {
220 catalog: String,
221 schemas: Option<Vec<String>>,
222 schema_only: bool,
223 format: DataFormat,
224 force: bool,
225 time_range: TimeRange,
226 chunk_time_window: Option<Duration>,
227 parallelism: usize,
228 snapshot_uri: String,
229 storage_config: ObjectStoreConfig,
230}
231
232#[async_trait]
233impl Tool for ExportCreate {
234 async fn do_work(&self) -> std::result::Result<(), BoxedError> {
235 self.run().await.map_err(BoxedError::new)
236 }
237}
238
239impl ExportCreate {
240 async fn run(&self) -> Result<()> {
241 let exists = self.storage.exists().await?;
243
244 if exists {
245 if self.config.force {
246 info!("Deleting existing snapshot (--force)");
247 self.storage.delete_snapshot().await?;
248 } else {
249 let mut manifest = self.storage.read_manifest().await?;
251
252 if manifest.version != MANIFEST_VERSION {
254 return ManifestVersionMismatchSnafu {
255 expected: MANIFEST_VERSION,
256 found: manifest.version,
257 }
258 .fail();
259 }
260
261 validate_resume_config(&manifest, &self.config)?;
262
263 info!(
264 "Resuming existing snapshot: {} (completed: {}/{} chunks)",
265 manifest.snapshot_id,
266 manifest.completed_count(),
267 manifest.chunks.len()
268 );
269
270 if manifest.is_complete() {
271 info!("Snapshot is already complete");
272 return Ok(());
273 }
274
275 if manifest.schema_only {
276 return Ok(());
277 }
278
279 export_data(
280 self.storage.as_ref(),
281 &self.database_client,
282 &self.config.snapshot_uri,
283 &self.config.storage_config,
284 &mut manifest,
285 self.config.parallelism,
286 )
287 .await?;
288 return Ok(());
289 }
290 }
291
292 let extractor = SchemaExtractor::new(&self.database_client, &self.config.catalog);
294 let schema_snapshot = extractor.extract(self.config.schemas.as_deref()).await?;
295
296 let schema_names: Vec<String> = schema_snapshot
297 .schemas
298 .iter()
299 .map(|s| s.name.clone())
300 .collect();
301 info!("Exporting schemas: {:?}", schema_names);
302
303 let mut manifest = Manifest::new_for_export(
305 self.config.catalog.clone(),
306 schema_names.clone(),
307 self.config.schema_only,
308 self.config.time_range.clone(),
309 self.config.format,
310 self.config.chunk_time_window,
311 )?;
312
313 self.storage.write_schema(&schema_snapshot).await?;
315 info!("Exported {} schemas", schema_snapshot.schemas.len());
316
317 let ddl_by_schema = self.build_ddl_by_schema(&schema_names).await?;
319 for (schema, ddl) in ddl_by_schema {
320 let ddl_path = ddl_path_for_schema(&schema);
321 self.storage.write_text(&ddl_path, &ddl).await?;
322 info!("Exported DDL for schema {} to {}", schema, ddl_path);
323 }
324
325 self.storage.write_manifest(&manifest).await?;
333 info!("Snapshot created: {}", manifest.snapshot_id);
334
335 if !self.config.schema_only {
336 export_data(
337 self.storage.as_ref(),
338 &self.database_client,
339 &self.config.snapshot_uri,
340 &self.config.storage_config,
341 &mut manifest,
342 self.config.parallelism,
343 )
344 .await?;
345 }
346
347 Ok(())
348 }
349
350 async fn build_ddl_by_schema(&self, schema_names: &[String]) -> Result<Vec<(String, String)>> {
351 let mut schemas = schema_names.to_vec();
352 schemas.sort();
353
354 let mut ddl_by_schema = Vec::with_capacity(schemas.len());
355 for schema in schemas {
356 let create_database = self.show_create("DATABASE", &schema, None).await?;
357
358 let (mut physical_tables, mut tables, mut views) =
359 self.get_schema_objects(&schema).await?;
360 physical_tables.sort();
361 let mut physical_ddls = Vec::with_capacity(physical_tables.len());
362 for table in physical_tables {
363 physical_ddls.push(self.show_create("TABLE", &schema, Some(&table)).await?);
364 }
365
366 tables.sort();
367 let mut table_ddls = Vec::with_capacity(tables.len());
368 for table in tables {
369 table_ddls.push(self.show_create("TABLE", &schema, Some(&table)).await?);
370 }
371
372 views.sort();
373 let mut view_ddls = Vec::with_capacity(views.len());
374 for view in views {
375 view_ddls.push(self.show_create("VIEW", &schema, Some(&view)).await?);
376 }
377
378 let ddl = build_schema_ddl(
379 &schema,
380 create_database,
381 physical_ddls,
382 table_ddls,
383 view_ddls,
384 );
385 ddl_by_schema.push((schema, ddl));
386 }
387
388 Ok(ddl_by_schema)
389 }
390
391 async fn get_schema_objects(
392 &self,
393 schema: &str,
394 ) -> Result<(Vec<String>, Vec<String>, Vec<String>)> {
395 let physical_tables = self.get_metric_physical_tables(schema).await?;
396 let physical_set: HashSet<&str> = physical_tables.iter().map(String::as_str).collect();
397 let sql = format!(
398 "SELECT table_name, table_type FROM information_schema.tables \
399 WHERE table_catalog = '{}' AND table_schema = '{}' \
400 AND (table_type = 'BASE TABLE' OR table_type = 'VIEW')",
401 escape_sql_literal(&self.config.catalog),
402 escape_sql_literal(schema)
403 );
404 let records: Option<Vec<Vec<Value>>> = self
405 .database_client
406 .sql_in_public(&sql)
407 .await
408 .context(DatabaseSnafu)?;
409
410 let mut tables = Vec::new();
411 let mut views = Vec::new();
412 if let Some(rows) = records {
413 for row in rows {
414 let name = match row.first() {
415 Some(Value::String(name)) => name.clone(),
416 _ => return UnexpectedValueTypeSnafu.fail(),
417 };
418 let table_type = match row.get(1) {
419 Some(Value::String(table_type)) => table_type.as_str(),
420 _ => return UnexpectedValueTypeSnafu.fail(),
421 };
422 if !physical_set.contains(name.as_str()) {
423 if table_type == "VIEW" {
424 views.push(name);
425 } else {
426 tables.push(name);
427 }
428 }
429 }
430 }
431
432 Ok((physical_tables, tables, views))
433 }
434
435 async fn get_metric_physical_tables(&self, schema: &str) -> Result<Vec<String>> {
436 let sql = format!(
437 "SELECT DISTINCT table_name FROM information_schema.columns \
438 WHERE table_catalog = '{}' AND table_schema = '{}' AND column_name = '__tsid'",
439 escape_sql_literal(&self.config.catalog),
440 escape_sql_literal(schema)
441 );
442 let records: Option<Vec<Vec<Value>>> = self
443 .database_client
444 .sql_in_public(&sql)
445 .await
446 .context(DatabaseSnafu)?;
447
448 let mut tables = HashSet::new();
449 if let Some(rows) = records {
450 for row in rows {
451 let name = match row.first() {
452 Some(Value::String(name)) => name.clone(),
453 _ => return UnexpectedValueTypeSnafu.fail(),
454 };
455 tables.insert(name);
456 }
457 }
458
459 Ok(tables.into_iter().collect())
460 }
461
462 async fn show_create(
463 &self,
464 show_type: &str,
465 schema: &str,
466 table: Option<&str>,
467 ) -> Result<String> {
468 let sql = match table {
469 Some(table) => format!(
470 r#"SHOW CREATE {} "{}"."{}"."{}""#,
471 show_type,
472 escape_sql_identifier(&self.config.catalog),
473 escape_sql_identifier(schema),
474 escape_sql_identifier(table)
475 ),
476 None => format!(
477 r#"SHOW CREATE {} "{}"."{}""#,
478 show_type,
479 escape_sql_identifier(&self.config.catalog),
480 escape_sql_identifier(schema)
481 ),
482 };
483
484 let records: Option<Vec<Vec<Value>>> = self
485 .database_client
486 .sql_in_public(&sql)
487 .await
488 .context(DatabaseSnafu)?;
489 let rows = records.context(EmptyResultSnafu)?;
490 let row = rows.first().context(EmptyResultSnafu)?;
491 let Some(Value::String(create)) = row.get(1) else {
492 return UnexpectedValueTypeSnafu.fail();
493 };
494
495 Ok(format!("{};\n", create))
496 }
497}
498
499fn build_schema_ddl(
500 schema: &str,
501 create_database: String,
502 physical_tables: Vec<String>,
503 tables: Vec<String>,
504 views: Vec<String>,
505) -> String {
506 let mut ddl = String::new();
507 ddl.push_str(&format!("-- Schema: {}\n", schema));
508 ddl.push_str(&create_database);
509 for stmt in physical_tables {
510 ddl.push_str(&stmt);
511 }
512 for stmt in tables {
513 ddl.push_str(&stmt);
514 }
515 for stmt in views {
516 ddl.push_str(&stmt);
517 }
518 ddl.push('\n');
519 ddl
520}
521
522fn validate_resume_config(manifest: &Manifest, config: &ExportConfig) -> Result<()> {
523 if manifest.schema_only != config.schema_only {
524 return SchemaOnlyModeMismatchSnafu {
525 existing_schema_only: manifest.schema_only,
526 requested_schema_only: config.schema_only,
527 }
528 .fail();
529 }
530
531 if manifest.catalog != config.catalog {
532 return ResumeConfigMismatchSnafu {
533 field: "catalog",
534 existing: manifest.catalog.clone(),
535 requested: config.catalog.clone(),
536 }
537 .fail();
538 }
539
540 if let Some(requested_schemas) = &config.schemas
543 && !schema_selection_matches(&manifest.schemas, requested_schemas)
544 {
545 return ResumeConfigMismatchSnafu {
546 field: "schemas",
547 existing: format_schema_selection(&manifest.schemas),
548 requested: format_schema_selection(requested_schemas),
549 }
550 .fail();
551 }
552
553 if manifest.time_range != config.time_range {
554 return ResumeConfigMismatchSnafu {
555 field: "time_range",
556 existing: format!("{:?}", manifest.time_range),
557 requested: format!("{:?}", config.time_range),
558 }
559 .fail();
560 }
561
562 if manifest.format != config.format {
563 return ResumeConfigMismatchSnafu {
564 field: "format",
565 existing: manifest.format.to_string(),
566 requested: config.format.to_string(),
567 }
568 .fail();
569 }
570
571 let expected_plan = Manifest::new_for_export(
572 manifest.catalog.clone(),
573 manifest.schemas.clone(),
574 config.schema_only,
575 config.time_range.clone(),
576 config.format,
577 config.chunk_time_window,
578 )?;
579 if !chunk_plan_matches(manifest, &expected_plan) {
580 return ResumeConfigMismatchSnafu {
581 field: "chunk plan",
582 existing: format_chunk_plan(&manifest.chunks),
583 requested: format_chunk_plan(&expected_plan.chunks),
584 }
585 .fail();
586 }
587
588 Ok(())
589}
590
591fn schema_selection_matches(existing: &[String], requested: &[String]) -> bool {
592 canonical_schema_selection(existing) == canonical_schema_selection(requested)
593}
594
595fn canonical_schema_selection(schemas: &[String]) -> Vec<String> {
596 let mut canonicalized = Vec::new();
597 let mut seen = HashSet::new();
598
599 for schema in schemas {
600 let normalized = schema.to_ascii_lowercase();
601 if seen.insert(normalized.clone()) {
602 canonicalized.push(normalized);
603 }
604 }
605
606 canonicalized.sort();
607 canonicalized
608}
609
610fn format_schema_selection(schemas: &[String]) -> String {
611 format!("[{}]", schemas.join(", "))
612}
613
614fn chunk_plan_matches(existing: &Manifest, expected: &Manifest) -> bool {
615 existing.chunks.len() == expected.chunks.len()
616 && existing
617 .chunks
618 .iter()
619 .zip(&expected.chunks)
620 .all(|(left, right)| left.id == right.id && left.time_range == right.time_range)
621}
622
623fn format_chunk_plan(chunks: &[ChunkMeta]) -> String {
624 let items = chunks
625 .iter()
626 .map(|chunk| format!("#{}:{:?}", chunk.id, chunk.time_range))
627 .collect::<Vec<_>>();
628 format!("[{}]", items.join(", "))
629}
630
631#[cfg(test)]
632mod tests {
633 use chrono::TimeZone;
634 use clap::Parser;
635
636 use super::*;
637 use crate::data::path::ddl_path_for_schema;
638
639 #[test]
640 fn test_ddl_path_for_schema() {
641 assert_eq!(ddl_path_for_schema("public"), "schema/ddl/public.sql");
642 assert_eq!(
643 ddl_path_for_schema("../evil"),
644 "schema/ddl/%2E%2E%2Fevil.sql"
645 );
646 }
647
648 #[test]
649 fn test_build_schema_ddl_order() {
650 let ddl = build_schema_ddl(
651 "public",
652 "CREATE DATABASE public;\n".to_string(),
653 vec!["PHYSICAL;\n".to_string()],
654 vec!["TABLE;\n".to_string()],
655 vec!["VIEW;\n".to_string()],
656 );
657
658 let db_pos = ddl.find("CREATE DATABASE").unwrap();
659 let physical_pos = ddl.find("PHYSICAL;").unwrap();
660 let table_pos = ddl.find("TABLE;").unwrap();
661 let view_pos = ddl.find("VIEW;").unwrap();
662 assert!(db_pos < physical_pos);
663 assert!(physical_pos < table_pos);
664 assert!(table_pos < view_pos);
665 }
666
667 #[tokio::test]
668 async fn test_build_rejects_chunk_window_without_bounds() {
669 let cmd = ExportCreateCommand::parse_from([
670 "export-v2-create",
671 "--addr",
672 "127.0.0.1:4000",
673 "--to",
674 "file:///tmp/export-v2-test",
675 "--chunk-time-window",
676 "1h",
677 ]);
678
679 let result = cmd.build().await;
680 assert!(result.is_err());
681 let error = result.err().unwrap().to_string();
682
683 assert!(error.contains("chunk_time_window requires both --start-time and --end-time"));
684 }
685
686 #[tokio::test]
687 async fn test_build_rejects_data_export_args_in_schema_only_mode() {
688 let cmd = ExportCreateCommand::parse_from([
689 "export-v2-create",
690 "--addr",
691 "127.0.0.1:4000",
692 "--to",
693 "file:///tmp/export-v2-test",
694 "--schema-only",
695 "--start-time",
696 "2024-01-01T00:00:00Z",
697 "--end-time",
698 "2024-01-02T00:00:00Z",
699 "--chunk-time-window",
700 "1h",
701 "--format",
702 "csv",
703 "--parallelism",
704 "2",
705 ]);
706
707 let error = cmd.build().await.err().unwrap().to_string();
708
709 assert!(error.contains("--schema-only cannot be used with data export arguments"));
710 assert!(error.contains("--start-time"));
711 assert!(error.contains("--end-time"));
712 assert!(error.contains("--chunk-time-window"));
713 assert!(error.contains("--format"));
714 assert!(error.contains("--parallelism"));
715 }
716
717 #[test]
718 fn test_schema_only_mode_mismatch_error_message() {
719 let error = crate::data::export_v2::error::SchemaOnlyModeMismatchSnafu {
720 existing_schema_only: false,
721 requested_schema_only: true,
722 }
723 .build()
724 .to_string();
725
726 assert!(error.contains("existing: false"));
727 assert!(error.contains("requested: true"));
728 }
729
730 #[test]
731 fn test_validate_resume_config_rejects_catalog_mismatch() {
732 let manifest = Manifest::new_for_export(
733 "greptime".to_string(),
734 vec!["public".to_string()],
735 false,
736 TimeRange::unbounded(),
737 DataFormat::Parquet,
738 None,
739 )
740 .unwrap();
741 let config = ExportConfig {
742 catalog: "other".to_string(),
743 schemas: None,
744 schema_only: false,
745 format: DataFormat::Parquet,
746 force: false,
747 time_range: TimeRange::unbounded(),
748 chunk_time_window: None,
749 parallelism: 1,
750 snapshot_uri: "file:///tmp/snapshot".to_string(),
751 storage_config: ObjectStoreConfig::default(),
752 };
753
754 let error = validate_resume_config(&manifest, &config)
755 .err()
756 .unwrap()
757 .to_string();
758 assert!(error.contains("catalog"));
759 }
760
761 #[test]
762 fn test_validate_resume_config_accepts_schema_selection_with_different_case_and_order() {
763 let manifest = Manifest::new_for_export(
764 "greptime".to_string(),
765 vec!["public".to_string(), "analytics".to_string()],
766 false,
767 TimeRange::unbounded(),
768 DataFormat::Parquet,
769 None,
770 )
771 .unwrap();
772 let config = ExportConfig {
773 catalog: "greptime".to_string(),
774 schemas: Some(vec![
775 "ANALYTICS".to_string(),
776 "PUBLIC".to_string(),
777 "public".to_string(),
778 ]),
779 schema_only: false,
780 format: DataFormat::Parquet,
781 force: false,
782 time_range: TimeRange::unbounded(),
783 chunk_time_window: None,
784 parallelism: 1,
785 snapshot_uri: "file:///tmp/snapshot".to_string(),
786 storage_config: ObjectStoreConfig::default(),
787 };
788
789 assert!(validate_resume_config(&manifest, &config).is_ok());
790 }
791
792 #[test]
793 fn test_validate_resume_config_rejects_chunk_plan_mismatch() {
794 let start = chrono::Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
795 let end = chrono::Utc.with_ymd_and_hms(2025, 1, 1, 2, 0, 0).unwrap();
796 let time_range = TimeRange::new(Some(start), Some(end));
797 let manifest = Manifest::new_for_export(
798 "greptime".to_string(),
799 vec!["public".to_string()],
800 false,
801 time_range.clone(),
802 DataFormat::Parquet,
803 None,
804 )
805 .unwrap();
806 let config = ExportConfig {
807 catalog: "greptime".to_string(),
808 schemas: None,
809 schema_only: false,
810 format: DataFormat::Parquet,
811 force: false,
812 time_range,
813 chunk_time_window: Some(Duration::from_secs(3600)),
814 parallelism: 1,
815 snapshot_uri: "file:///tmp/snapshot".to_string(),
816 storage_config: ObjectStoreConfig::default(),
817 };
818
819 let error = validate_resume_config(&manifest, &config)
820 .err()
821 .unwrap()
822 .to_string();
823 assert!(error.contains("chunk plan"));
824 }
825
826 #[test]
827 fn test_validate_resume_config_rejects_format_mismatch() {
828 let manifest = Manifest::new_for_export(
829 "greptime".to_string(),
830 vec!["public".to_string()],
831 false,
832 TimeRange::unbounded(),
833 DataFormat::Parquet,
834 None,
835 )
836 .unwrap();
837 let config = ExportConfig {
838 catalog: "greptime".to_string(),
839 schemas: None,
840 schema_only: false,
841 format: DataFormat::Csv,
842 force: false,
843 time_range: TimeRange::unbounded(),
844 chunk_time_window: None,
845 parallelism: 1,
846 snapshot_uri: "file:///tmp/snapshot".to_string(),
847 storage_config: ObjectStoreConfig::default(),
848 };
849
850 let error = validate_resume_config(&manifest, &config)
851 .err()
852 .unwrap()
853 .to_string();
854 assert!(error.contains("format"));
855 }
856
857 #[test]
858 fn test_validate_resume_config_rejects_time_range_mismatch() {
859 let start = chrono::Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
860 let end = chrono::Utc.with_ymd_and_hms(2025, 1, 1, 1, 0, 0).unwrap();
861 let manifest = Manifest::new_for_export(
862 "greptime".to_string(),
863 vec!["public".to_string()],
864 false,
865 TimeRange::new(Some(start), Some(end)),
866 DataFormat::Parquet,
867 None,
868 )
869 .unwrap();
870 let config = ExportConfig {
871 catalog: "greptime".to_string(),
872 schemas: None,
873 schema_only: false,
874 format: DataFormat::Parquet,
875 force: false,
876 time_range: TimeRange::new(Some(start), Some(start)),
877 chunk_time_window: None,
878 parallelism: 1,
879 snapshot_uri: "file:///tmp/snapshot".to_string(),
880 storage_config: ObjectStoreConfig::default(),
881 };
882
883 let error = validate_resume_config(&manifest, &config)
884 .err()
885 .unwrap()
886 .to_string();
887 assert!(error.contains("time_range"));
888 }
889}