1use std::collections::HashSet;
18use std::time::{Duration, Instant};
19
20use async_trait::async_trait;
21use clap::Parser;
22use common_error::ext::BoxedError;
23use common_telemetry::info;
24use snafu::ResultExt;
25
26use crate::Tool;
27use crate::common::ObjectStoreConfig;
28use crate::data::export_v2::data::{build_copy_source, execute_copy_database_from};
29use crate::data::export_v2::manifest::{ChunkMeta, ChunkStatus, DataFormat, MANIFEST_VERSION};
30use crate::data::import_v2::error::{
31 ChunkImportFailedSnafu, EmptyChunkManifestSnafu, IncompleteSnapshotSnafu,
32 ManifestVersionMismatchSnafu, MissingChunkDataSnafu, Result, SchemaNotInSnapshotSnafu,
33 SnapshotStorageSnafu,
34};
35use crate::data::import_v2::executor::{DdlExecutor, DdlStatement};
36use crate::data::path::{data_dir_for_schema_chunk, ddl_path_for_schema};
37use crate::data::snapshot_storage::{OpenDalStorage, SnapshotStorage, validate_uri};
38use crate::database::{DatabaseClient, parse_proxy_opts};
39
40#[derive(Debug, Parser)]
42pub struct ImportV2Command {
43 #[clap(long)]
45 addr: String,
46
47 #[clap(long)]
49 from: String,
50
51 #[clap(long, default_value = "greptime")]
53 catalog: String,
54
55 #[clap(long, value_delimiter = ',')]
58 schemas: Vec<String>,
59
60 #[clap(long)]
62 dry_run: bool,
63
64 #[clap(long)]
66 auth_basic: Option<String>,
67
68 #[clap(long, value_parser = humantime::parse_duration)]
70 timeout: Option<Duration>,
71
72 #[clap(long)]
77 proxy: Option<String>,
78
79 #[clap(long)]
83 no_proxy: bool,
84
85 #[clap(flatten)]
87 storage: ObjectStoreConfig,
88}
89
90impl ImportV2Command {
91 pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
92 validate_uri(&self.from)
94 .context(SnapshotStorageSnafu)
95 .map_err(BoxedError::new)?;
96
97 let schemas = if self.schemas.is_empty() {
99 None
100 } else {
101 Some(self.schemas.clone())
102 };
103
104 let storage = OpenDalStorage::from_uri(&self.from, &self.storage)
106 .context(SnapshotStorageSnafu)
107 .map_err(BoxedError::new)?;
108
109 let proxy = parse_proxy_opts(self.proxy.clone(), self.no_proxy)?;
111 let database_client = DatabaseClient::new(
112 self.addr.clone(),
113 self.catalog.clone(),
114 self.auth_basic.clone(),
115 self.timeout.unwrap_or(Duration::from_secs(60)),
116 proxy,
117 self.no_proxy,
118 );
119
120 Ok(Box::new(Import {
121 catalog: self.catalog.clone(),
122 schemas,
123 dry_run: self.dry_run,
124 snapshot_uri: self.from.clone(),
125 storage_config: self.storage.clone(),
126 storage: Box::new(storage),
127 database_client,
128 }))
129 }
130}
131
132pub struct Import {
134 catalog: String,
135 schemas: Option<Vec<String>>,
136 dry_run: bool,
137 snapshot_uri: String,
138 storage_config: ObjectStoreConfig,
139 storage: Box<dyn SnapshotStorage>,
140 database_client: DatabaseClient,
141}
142
143#[async_trait]
144impl Tool for Import {
145 async fn do_work(&self) -> std::result::Result<(), BoxedError> {
146 self.run().await.map_err(BoxedError::new)
147 }
148}
149
150impl Import {
151 async fn run(&self) -> Result<()> {
152 let manifest = self
154 .storage
155 .read_manifest()
156 .await
157 .context(SnapshotStorageSnafu)?;
158
159 info!(
160 "Loading snapshot: {} (version: {}, schema_only: {})",
161 manifest.snapshot_id, manifest.version, manifest.schema_only
162 );
163
164 if manifest.version != MANIFEST_VERSION {
166 return ManifestVersionMismatchSnafu {
167 expected: MANIFEST_VERSION,
168 found: manifest.version,
169 }
170 .fail();
171 }
172
173 info!("Snapshot contains {} schema(s)", manifest.schemas.len());
174
175 let schemas_to_import = match &self.schemas {
177 Some(filter) => canonicalize_schema_filter(filter, &manifest.schemas)?,
178 None => manifest.schemas.clone(),
179 };
180
181 info!("Importing schemas: {:?}", schemas_to_import);
182
183 let ddl_statements = self.read_ddl_statements(&schemas_to_import).await?;
185
186 info!("Generated {} DDL statements", ddl_statements.len());
187
188 let data_prefixes = if !manifest.schema_only && !manifest.chunks.is_empty() {
189 Some(
190 validate_data_snapshot(self.storage.as_ref(), &manifest.chunks, &schemas_to_import)
191 .await?,
192 )
193 } else {
194 None
195 };
196
197 if self.dry_run {
199 info!("Dry-run mode - DDL statements to execute:");
200 println!();
201 for (i, stmt) in ddl_statements.iter().enumerate() {
202 println!("-- Statement {}", i + 1);
203 println!("{};", stmt.sql);
204 println!();
205 }
206 if !manifest.schema_only && !manifest.chunks.is_empty() {
207 for line in format_data_import_plan(&manifest.chunks, &schemas_to_import) {
208 println!("{line}");
209 }
210 println!();
211 }
212 return Ok(());
213 }
214
215 let executor = DdlExecutor::new(&self.database_client);
217 executor.execute_strict(&ddl_statements).await?;
218
219 if !manifest.schema_only && !manifest.chunks.is_empty() {
220 self.import_data(
221 &manifest.chunks,
222 &schemas_to_import,
223 manifest.format,
224 data_prefixes.expect("validated full snapshot must provide data prefixes"),
225 )
226 .await?;
227 }
228
229 info!(
230 "Import completed: {} DDL statements executed",
231 ddl_statements.len()
232 );
233
234 Ok(())
235 }
236
237 async fn read_ddl_statements(&self, schemas: &[String]) -> Result<Vec<DdlStatement>> {
238 let mut statements = Vec::new();
239 for schema in schemas {
240 let path = ddl_path_for_schema(schema);
241 let content = self
242 .storage
243 .read_text(&path)
244 .await
245 .context(SnapshotStorageSnafu)?;
246 statements.extend(
247 parse_ddl_statements(&content)
248 .into_iter()
249 .map(|sql| ddl_statement_for_schema(schema, sql)),
250 );
251 }
252
253 Ok(statements)
254 }
255
256 async fn import_data(
257 &self,
258 chunks: &[ChunkMeta],
259 schemas: &[String],
260 format: DataFormat,
261 actual_prefixes: HashSet<String>,
262 ) -> Result<()> {
263 let import_start = Instant::now();
264 let total_chunks = chunks
265 .iter()
266 .filter(|chunk| chunk.status == ChunkStatus::Completed)
267 .count();
268 info!(
269 "Importing data: {} chunks, {} schemas",
270 total_chunks,
271 schemas.len()
272 );
273
274 for (idx, chunk) in chunks.iter().enumerate() {
275 if chunk.status == ChunkStatus::Skipped {
276 info!(
277 "[{}/{}] Chunk {}: skipped (no data)",
278 idx + 1,
279 chunks.len(),
280 chunk.id
281 );
282 continue;
283 }
284
285 info!(
286 "[{}/{}] Chunk {} ({:?} ~ {:?})",
287 idx + 1,
288 chunks.len(),
289 chunk.id,
290 chunk.time_range.start,
291 chunk.time_range.end
292 );
293
294 for schema in schemas {
295 if !validate_chunk_schema_files(chunk, schema, &actual_prefixes)? {
296 info!(" {}: no data, skipped", schema);
297 continue;
298 }
299
300 info!(" {}: importing...", schema);
301 let copy_start = Instant::now();
302 let source =
303 build_copy_source(&self.snapshot_uri, &self.storage_config, schema, chunk.id)
304 .context(ChunkImportFailedSnafu {
305 chunk_id: chunk.id,
306 schema: schema.clone(),
307 })?;
308
309 execute_copy_database_from(
310 &self.database_client,
311 &self.catalog,
312 schema,
313 &source,
314 format,
315 )
316 .await
317 .context(ChunkImportFailedSnafu {
318 chunk_id: chunk.id,
319 schema: schema.clone(),
320 })?;
321
322 info!(" {}: done in {:?}", schema, copy_start.elapsed());
323 }
324 }
325
326 info!("Data import finished in {:?}", import_start.elapsed());
327 Ok(())
328 }
329}
330
331fn parse_ddl_statements(content: &str) -> Vec<String> {
332 let mut statements = Vec::new();
333 let mut current = String::new();
334 let mut chars = content.chars().peekable();
335 let mut in_single_quote = false;
336 let mut in_double_quote = false;
337 let mut in_line_comment = false;
338 let mut in_block_comment = false;
339
340 while let Some(ch) = chars.next() {
341 if in_line_comment {
342 if ch == '\n' {
343 in_line_comment = false;
344 current.push('\n');
345 }
346 continue;
347 }
348
349 if in_block_comment {
350 if ch == '*' && chars.peek() == Some(&'/') {
351 chars.next();
352 in_block_comment = false;
353 }
354 continue;
355 }
356
357 if in_single_quote {
358 current.push(ch);
359 if ch == '\'' {
360 if chars.peek() == Some(&'\'') {
361 current.push(chars.next().expect("peeked quote must exist"));
362 } else {
363 in_single_quote = false;
364 }
365 }
366 continue;
367 }
368
369 if in_double_quote {
370 current.push(ch);
371 if ch == '"' {
372 if chars.peek() == Some(&'"') {
373 current.push(chars.next().expect("peeked quote must exist"));
374 } else {
375 in_double_quote = false;
376 }
377 }
378 continue;
379 }
380
381 match ch {
382 '-' if chars.peek() == Some(&'-') => {
383 chars.next();
384 in_line_comment = true;
385 }
386 '/' if chars.peek() == Some(&'*') => {
387 chars.next();
388 in_block_comment = true;
389 }
390 '\'' => {
391 in_single_quote = true;
392 current.push(ch);
393 }
394 '"' => {
395 in_double_quote = true;
396 current.push(ch);
397 }
398 ';' => {
399 let statement = current.trim();
400 if !statement.is_empty() {
401 statements.push(statement.to_string());
402 }
403 current.clear();
404 }
405 _ => current.push(ch),
406 }
407 }
408
409 let statement = current.trim();
410 if !statement.is_empty() {
411 statements.push(statement.to_string());
412 }
413
414 statements
415}
416
417fn ddl_statement_for_schema(schema: &str, sql: String) -> DdlStatement {
418 if is_schema_scoped_statement(&sql) {
419 DdlStatement::with_execution_schema(sql, schema.to_string())
420 } else {
421 DdlStatement::new(sql)
422 }
423}
424
425fn is_schema_scoped_statement(sql: &str) -> bool {
426 let trimmed = sql.trim_start();
427 if !starts_with_keyword(trimmed, "CREATE") {
428 return false;
429 }
430
431 let Some(rest) = trimmed.get("CREATE".len()..) else {
432 return false;
433 };
434 let mut rest = rest.trim_start();
435 if starts_with_keyword(rest, "OR") {
436 let Some(next) = rest.get("OR".len()..) else {
437 return false;
438 };
439 rest = next.trim_start();
440 if !starts_with_keyword(rest, "REPLACE") {
441 return false;
442 }
443 let Some(next) = rest.get("REPLACE".len()..) else {
444 return false;
445 };
446 rest = next.trim_start();
447 }
448
449 if starts_with_keyword(rest, "EXTERNAL") {
450 let Some(next) = rest.get("EXTERNAL".len()..) else {
451 return false;
452 };
453 rest = next.trim_start();
454 }
455
456 starts_with_keyword(rest, "TABLE") || starts_with_keyword(rest, "VIEW")
457}
458
459fn starts_with_keyword(input: &str, keyword: &str) -> bool {
460 input
461 .get(0..keyword.len())
462 .map(|s| s.eq_ignore_ascii_case(keyword))
463 .unwrap_or(false)
464 && input
465 .as_bytes()
466 .get(keyword.len())
467 .map(|b| !b.is_ascii_alphanumeric() && *b != b'_')
468 .unwrap_or(true)
469}
470
471fn canonicalize_schema_filter(
472 filter: &[String],
473 manifest_schemas: &[String],
474) -> Result<Vec<String>> {
475 let mut canonicalized = Vec::new();
476 let mut seen = HashSet::new();
477
478 for schema in filter {
479 let canonical = manifest_schemas
480 .iter()
481 .find(|candidate| candidate.eq_ignore_ascii_case(schema))
482 .cloned()
483 .ok_or_else(|| {
484 SchemaNotInSnapshotSnafu {
485 schema: schema.clone(),
486 }
487 .build()
488 })?;
489
490 if seen.insert(canonical.to_ascii_lowercase()) {
491 canonicalized.push(canonical);
492 }
493 }
494
495 Ok(canonicalized)
496}
497
498fn validate_chunk_statuses(chunks: &[ChunkMeta]) -> Result<()> {
499 let invalid_chunk = chunks
500 .iter()
501 .find(|chunk| !matches!(chunk.status, ChunkStatus::Completed | ChunkStatus::Skipped));
502
503 if let Some(chunk) = invalid_chunk {
504 return IncompleteSnapshotSnafu {
505 chunk_id: chunk.id,
506 status: chunk.status,
507 }
508 .fail();
509 }
510
511 Ok(())
512}
513
514fn chunk_has_schema_files(chunk: &ChunkMeta, schema: &str) -> bool {
515 let prefix = data_dir_for_schema_chunk(schema, chunk.id);
516 chunk.files.iter().any(|path| {
517 let normalized = path.trim_start_matches('/');
518 normalized.starts_with(&prefix)
519 })
520}
521
522fn format_data_import_plan(chunks: &[ChunkMeta], schemas: &[String]) -> Vec<String> {
523 let mut lines = vec!["-- Data import plan:".to_string()];
524 for chunk in chunks {
525 lines.push(format!("-- Chunk {}: {:?}", chunk.id, chunk.status));
526 for schema in schemas {
527 if chunk_has_schema_files(chunk, schema) {
528 lines.push(format!("-- {} -> COPY DATABASE FROM", schema));
529 }
530 }
531 }
532 lines
533}
534
535async fn validate_data_snapshot(
536 storage: &dyn SnapshotStorage,
537 chunks: &[ChunkMeta],
538 schemas: &[String],
539) -> Result<HashSet<String>> {
540 validate_chunk_statuses(chunks)?;
541 let actual_prefixes = collect_chunk_data_prefixes(storage).await?;
542
543 for chunk in chunks {
544 if chunk.status == ChunkStatus::Skipped {
545 continue;
546 }
547 if chunk.files.is_empty() {
548 return EmptyChunkManifestSnafu { chunk_id: chunk.id }.fail();
549 }
550 for schema in schemas {
551 validate_chunk_schema_files(chunk, schema, &actual_prefixes)?;
552 }
553 }
554
555 Ok(actual_prefixes)
556}
557
558async fn collect_chunk_data_prefixes(storage: &dyn SnapshotStorage) -> Result<HashSet<String>> {
559 let files = storage
560 .list_files_recursive("data/")
561 .await
562 .context(SnapshotStorageSnafu)?;
563 let mut prefixes = HashSet::new();
564
565 for path in files {
566 let normalized = path.trim_start_matches('/');
567 let mut parts = normalized.splitn(4, '/');
568 let Some(root) = parts.next() else {
569 continue;
570 };
571 let Some(schema) = parts.next() else {
572 continue;
573 };
574 let Some(chunk_id) = parts.next() else {
575 continue;
576 };
577 if root != "data" {
578 continue;
579 }
580 prefixes.insert(format!("data/{schema}/{chunk_id}/"));
581 }
582
583 Ok(prefixes)
584}
585
586fn validate_chunk_schema_files(
587 chunk: &ChunkMeta,
588 schema: &str,
589 actual_prefixes: &HashSet<String>,
590) -> Result<bool> {
591 if !chunk_has_schema_files(chunk, schema) {
592 return Ok(false);
593 }
594
595 let prefix = data_dir_for_schema_chunk(schema, chunk.id);
596 if !actual_prefixes.contains(&prefix) {
597 return MissingChunkDataSnafu {
598 chunk_id: chunk.id,
599 schema: schema.to_string(),
600 path: prefix,
601 }
602 .fail();
603 }
604
605 Ok(true)
606}
607
608#[cfg(test)]
609mod tests {
610 use std::collections::{HashMap, HashSet};
611
612 use async_trait::async_trait;
613
614 use super::*;
615 use crate::data::export_v2::manifest::{ChunkMeta, ChunkStatus, Manifest, TimeRange};
616 use crate::data::export_v2::schema::SchemaSnapshot;
617 use crate::data::snapshot_storage::SnapshotStorage;
618
619 struct StubStorage {
620 manifest: Manifest,
621 files_by_prefix: HashMap<String, Vec<String>>,
622 }
623
624 #[async_trait]
625 impl SnapshotStorage for StubStorage {
626 async fn exists(&self) -> crate::data::export_v2::error::Result<bool> {
627 Ok(true)
628 }
629
630 async fn read_manifest(&self) -> crate::data::export_v2::error::Result<Manifest> {
631 Ok(self.manifest.clone())
632 }
633
634 async fn write_manifest(
635 &self,
636 _manifest: &Manifest,
637 ) -> crate::data::export_v2::error::Result<()> {
638 unimplemented!("not needed in import_v2::command tests")
639 }
640
641 async fn read_text(&self, _path: &str) -> crate::data::export_v2::error::Result<String> {
642 unimplemented!("not needed in import_v2::command tests")
643 }
644
645 async fn write_text(
646 &self,
647 _path: &str,
648 _content: &str,
649 ) -> crate::data::export_v2::error::Result<()> {
650 unimplemented!("not needed in import_v2::command tests")
651 }
652
653 async fn write_schema(
654 &self,
655 _snapshot: &SchemaSnapshot,
656 ) -> crate::data::export_v2::error::Result<()> {
657 unimplemented!("not needed in import_v2::command tests")
658 }
659
660 async fn create_dir_all(&self, _path: &str) -> crate::data::export_v2::error::Result<()> {
661 unimplemented!("not needed in import_v2::command tests")
662 }
663
664 async fn list_files_recursive(
665 &self,
666 prefix: &str,
667 ) -> crate::data::export_v2::error::Result<Vec<String>> {
668 Ok(self
669 .files_by_prefix
670 .iter()
671 .filter(|(candidate, _)| candidate.starts_with(prefix))
672 .flat_map(|(_, files)| files.clone())
673 .collect())
674 }
675
676 async fn delete_snapshot(&self) -> crate::data::export_v2::error::Result<()> {
677 unimplemented!("not needed in import_v2::command tests")
678 }
679 }
680
681 #[test]
682 fn test_parse_ddl_statements() {
683 let content = r#"
684-- Schema: public
685CREATE DATABASE public;
686CREATE TABLE t (ts TIMESTAMP TIME INDEX, host STRING, PRIMARY KEY (host)) ENGINE=mito;
687
688-- comment
689CREATE VIEW v AS SELECT * FROM t;
690"#;
691 let statements = parse_ddl_statements(content);
692 assert_eq!(statements.len(), 3);
693 assert!(statements[0].starts_with("CREATE DATABASE public"));
694 assert!(statements[1].starts_with("CREATE TABLE t"));
695 assert!(statements[2].starts_with("CREATE VIEW v"));
696 }
697
698 #[test]
699 fn test_parse_ddl_statements_preserves_semicolons_in_string_literals() {
700 let content = r#"
701CREATE TABLE t (
702 host STRING DEFAULT 'a;b'
703);
704CREATE VIEW v AS SELECT ';' AS marker;
705"#;
706
707 let statements = parse_ddl_statements(content);
708
709 assert_eq!(statements.len(), 2);
710 assert!(statements[0].contains("'a;b'"));
711 assert!(statements[1].contains("';' AS marker"));
712 }
713
714 #[test]
715 fn test_parse_ddl_statements_handles_comments_without_splitting() {
716 let content = r#"
717-- leading comment
718CREATE TABLE t (ts TIMESTAMP TIME INDEX); /* block; comment */
719CREATE VIEW v AS SELECT 1;
720"#;
721
722 let statements = parse_ddl_statements(content);
723
724 assert_eq!(statements.len(), 2);
725 assert!(statements[0].starts_with("CREATE TABLE t"));
726 assert!(statements[1].starts_with("CREATE VIEW v"));
727 }
728
729 #[test]
730 fn test_canonicalize_schema_filter_uses_manifest_casing() {
731 let filter = vec!["TEST_DB".to_string(), "PUBLIC".to_string()];
732 let manifest_schemas = vec!["test_db".to_string(), "public".to_string()];
733
734 let canonicalized = canonicalize_schema_filter(&filter, &manifest_schemas).unwrap();
735
736 assert_eq!(canonicalized, vec!["test_db", "public"]);
737 }
738
739 #[test]
740 fn test_canonicalize_schema_filter_dedupes_case_insensitive_matches() {
741 let filter = vec![
742 "TEST_DB".to_string(),
743 "test_db".to_string(),
744 "PUBLIC".to_string(),
745 "public".to_string(),
746 ];
747 let manifest_schemas = vec!["test_db".to_string(), "public".to_string()];
748
749 let canonicalized = canonicalize_schema_filter(&filter, &manifest_schemas).unwrap();
750
751 assert_eq!(canonicalized, vec!["test_db", "public"]);
752 }
753
754 #[test]
755 fn test_canonicalize_schema_filter_rejects_missing_schema() {
756 let filter = vec!["missing".to_string()];
757 let manifest_schemas = vec!["test_db".to_string()];
758
759 let error = canonicalize_schema_filter(&filter, &manifest_schemas)
760 .expect_err("missing schema should fail")
761 .to_string();
762
763 assert!(error.contains("missing"));
764 }
765
766 #[test]
767 fn test_ddl_statement_for_schema_create_table_uses_execution_schema() {
768 let stmt = ddl_statement_for_schema(
769 "test_db",
770 "CREATE TABLE metrics (ts TIMESTAMP TIME INDEX) ENGINE=mito".to_string(),
771 );
772 assert_eq!(stmt.execution_schema.as_deref(), Some("test_db"));
773 }
774
775 #[test]
776 fn test_ddl_statement_for_schema_create_view_uses_execution_schema() {
777 let stmt = ddl_statement_for_schema(
778 "test_db",
779 "CREATE VIEW metrics_view AS SELECT * FROM metrics".to_string(),
780 );
781 assert_eq!(stmt.execution_schema.as_deref(), Some("test_db"));
782 }
783
784 #[test]
785 fn test_ddl_statement_for_schema_create_or_replace_view_uses_execution_schema() {
786 let stmt = ddl_statement_for_schema(
787 "test_db",
788 "CREATE OR REPLACE VIEW metrics_view AS SELECT * FROM metrics".to_string(),
789 );
790 assert_eq!(stmt.execution_schema.as_deref(), Some("test_db"));
791 }
792
793 #[test]
794 fn test_ddl_statement_for_schema_create_external_table_uses_execution_schema() {
795 let stmt = ddl_statement_for_schema(
796 "test_db",
797 "CREATE EXTERNAL TABLE IF NOT EXISTS ext_metrics (ts TIMESTAMP TIME INDEX) ENGINE=file"
798 .to_string(),
799 );
800 assert_eq!(stmt.execution_schema.as_deref(), Some("test_db"));
801 }
802
803 #[test]
804 fn test_ddl_statement_for_schema_create_database_uses_public_context() {
805 let stmt = ddl_statement_for_schema("test_db", "CREATE DATABASE test_db".to_string());
806 assert_eq!(stmt.execution_schema, None);
807 }
808
809 #[test]
810 fn test_starts_with_keyword_requires_word_boundary() {
811 assert!(starts_with_keyword("CREATE TABLE t", "CREATE"));
812 assert!(!starts_with_keyword("CREATED TABLE t", "CREATE"));
813 assert!(!starts_with_keyword("TABLESPACE foo", "TABLE"));
814 }
815
816 #[test]
817 fn test_validate_chunk_statuses_rejects_failed_chunk() {
818 let mut failed = ChunkMeta::new(3, TimeRange::unbounded());
819 failed.status = ChunkStatus::Failed;
820
821 let error = validate_chunk_statuses(&[failed]).expect_err("failed chunk should error");
822 assert!(error.to_string().contains("Incomplete snapshot"));
823 }
824
825 #[test]
826 fn test_validate_chunk_statuses_accepts_completed_and_skipped_chunks() {
827 let mut completed = ChunkMeta::new(1, TimeRange::unbounded());
828 completed.status = ChunkStatus::Completed;
829 let skipped = ChunkMeta::skipped(2, TimeRange::unbounded());
830
831 assert!(validate_chunk_statuses(&[completed, skipped]).is_ok());
832 }
833
834 #[test]
835 fn test_chunk_has_schema_files_matches_encoded_schema_prefix() {
836 let mut chunk = ChunkMeta::new(7, TimeRange::unbounded());
837 chunk.files = vec![
838 "data/public/7/a.parquet".to_string(),
839 "data/%E6%B5%8B%E8%AF%95/7/b.parquet".to_string(),
840 ];
841
842 assert!(chunk_has_schema_files(&chunk, "public"));
843 assert!(chunk_has_schema_files(&chunk, "测试"));
844 assert!(!chunk_has_schema_files(&chunk, "metrics"));
845 }
846
847 #[test]
848 fn test_format_data_import_plan_includes_matching_schemas_only() {
849 let mut completed = ChunkMeta::new(1, TimeRange::unbounded());
850 completed.status = ChunkStatus::Completed;
851 completed.files = vec![
852 "data/public/1/a.parquet".to_string(),
853 "data/%E6%B5%8B%E8%AF%95/1/b.parquet".to_string(),
854 ];
855 let skipped = ChunkMeta::skipped(2, TimeRange::unbounded());
856
857 let lines = format_data_import_plan(
858 &[completed, skipped],
859 &[
860 "public".to_string(),
861 "测试".to_string(),
862 "metrics".to_string(),
863 ],
864 );
865
866 assert_eq!(lines[0], "-- Data import plan:");
867 assert!(lines.contains(&"-- Chunk 1: Completed".to_string()));
868 assert!(lines.contains(&"-- public -> COPY DATABASE FROM".to_string()));
869 assert!(lines.contains(&"-- 测试 -> COPY DATABASE FROM".to_string()));
870 assert!(!lines.contains(&"-- metrics -> COPY DATABASE FROM".to_string()));
871 assert!(lines.contains(&"-- Chunk 2: Skipped".to_string()));
872 }
873
874 #[tokio::test]
875 async fn test_collect_chunk_data_prefixes_indexes_present_prefixes() {
876 let storage = StubStorage {
877 manifest: Manifest::new_schema_only("greptime".to_string(), vec!["public".to_string()]),
878 files_by_prefix: HashMap::from([
879 (
880 "data/public/7/".to_string(),
881 vec!["data/public/7/a.parquet".to_string()],
882 ),
883 (
884 "data/%E6%B5%8B%E8%AF%95/9/".to_string(),
885 vec!["data/%E6%B5%8B%E8%AF%95/9/b.parquet".to_string()],
886 ),
887 ]),
888 };
889
890 let prefixes = collect_chunk_data_prefixes(&storage).await.unwrap();
891
892 assert!(prefixes.contains("data/public/7/"));
893 assert!(prefixes.contains("data/%E6%B5%8B%E8%AF%95/9/"));
894 }
895
896 #[test]
897 fn test_validate_chunk_schema_files_accepts_present_prefix() {
898 let mut chunk = ChunkMeta::new(7, TimeRange::unbounded());
899 chunk.files = vec!["data/public/7/a.parquet".to_string()];
900 let actual_prefixes = HashSet::from(["data/public/7/".to_string()]);
901
902 assert!(validate_chunk_schema_files(&chunk, "public", &actual_prefixes).unwrap());
903 }
904
905 #[test]
906 fn test_validate_chunk_schema_files_rejects_missing_prefix() {
907 let mut chunk = ChunkMeta::new(7, TimeRange::unbounded());
908 chunk.files = vec!["data/public/7/a.parquet".to_string()];
909
910 let error = validate_chunk_schema_files(&chunk, "public", &HashSet::new())
911 .expect_err("missing chunk prefix should fail")
912 .to_string();
913 assert!(error.contains("marked completed but no files were found"));
914 }
915
916 #[test]
917 fn test_validate_chunk_schema_files_skips_absent_schema() {
918 let mut chunk = ChunkMeta::new(7, TimeRange::unbounded());
919 chunk.files = vec!["data/public/7/a.parquet".to_string()];
920
921 assert!(!validate_chunk_schema_files(&chunk, "metrics", &HashSet::new()).unwrap());
922 }
923
924 #[tokio::test]
925 async fn test_validate_data_snapshot_rejects_failed_chunk_before_dry_run() {
926 let mut failed = ChunkMeta::new(3, TimeRange::unbounded());
927 failed.status = ChunkStatus::Failed;
928
929 let storage = StubStorage {
930 manifest: Manifest::new_schema_only("greptime".to_string(), vec!["public".to_string()]),
931 files_by_prefix: HashMap::new(),
932 };
933
934 let error = validate_data_snapshot(&storage, &[failed], &["public".to_string()])
935 .await
936 .expect_err("failed chunk should reject dry-run validation")
937 .to_string();
938 assert!(error.contains("Incomplete snapshot"));
939 }
940
941 #[tokio::test]
942 async fn test_validate_data_snapshot_rejects_missing_chunk_prefix_before_dry_run() {
943 let mut completed = ChunkMeta::new(7, TimeRange::unbounded());
944 completed.status = ChunkStatus::Completed;
945 completed.files = vec!["data/public/7/a.parquet".to_string()];
946
947 let storage = StubStorage {
948 manifest: Manifest::new_schema_only("greptime".to_string(), vec!["public".to_string()]),
949 files_by_prefix: HashMap::new(),
950 };
951
952 let error = validate_data_snapshot(&storage, &[completed], &["public".to_string()])
953 .await
954 .expect_err("missing chunk prefix should reject dry-run validation")
955 .to_string();
956 assert!(error.contains("marked completed but no files were found"));
957 }
958
959 #[tokio::test]
960 async fn test_validate_data_snapshot_rejects_completed_chunk_with_empty_manifest() {
961 let mut completed = ChunkMeta::new(7, TimeRange::unbounded());
962 completed.status = ChunkStatus::Completed;
963
964 let storage = StubStorage {
965 manifest: Manifest::new_schema_only("greptime".to_string(), vec!["public".to_string()]),
966 files_by_prefix: HashMap::new(),
967 };
968
969 let error = validate_data_snapshot(&storage, &[completed], &["public".to_string()])
970 .await
971 .expect_err("empty completed chunk should reject validation")
972 .to_string();
973 assert!(error.contains("file manifest is empty"));
974 }
975}