Skip to main content

cli/data/import_v2/
command.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Import V2 CLI command.
16
17use std::collections::HashSet;
18use std::time::{Duration, Instant};
19
20use async_trait::async_trait;
21use clap::Parser;
22use common_error::ext::BoxedError;
23use common_telemetry::info;
24use snafu::ResultExt;
25
26use crate::Tool;
27use crate::common::ObjectStoreConfig;
28use crate::data::export_v2::data::{build_copy_source, execute_copy_database_from};
29use crate::data::export_v2::manifest::{ChunkMeta, ChunkStatus, DataFormat, MANIFEST_VERSION};
30use crate::data::import_v2::error::{
31    ChunkImportFailedSnafu, EmptyChunkManifestSnafu, IncompleteSnapshotSnafu,
32    ManifestVersionMismatchSnafu, MissingChunkDataSnafu, Result, SchemaNotInSnapshotSnafu,
33    SnapshotStorageSnafu,
34};
35use crate::data::import_v2::executor::{DdlExecutor, DdlStatement};
36use crate::data::path::{data_dir_for_schema_chunk, ddl_path_for_schema};
37use crate::data::snapshot_storage::{OpenDalStorage, SnapshotStorage, validate_uri};
38use crate::database::{DatabaseClient, parse_proxy_opts};
39
40/// Import from a snapshot.
41#[derive(Debug, Parser)]
42pub struct ImportV2Command {
43    /// Server address to connect (e.g., 127.0.0.1:4000).
44    #[clap(long)]
45    addr: String,
46
47    /// Source snapshot location (e.g., s3://bucket/path, file:///tmp/backup).
48    #[clap(long)]
49    from: String,
50
51    /// Target catalog name.
52    #[clap(long, default_value = "greptime")]
53    catalog: String,
54
55    /// Schema list to import (default: all in snapshot).
56    /// Can be specified multiple times or comma-separated.
57    #[clap(long, value_delimiter = ',')]
58    schemas: Vec<String>,
59
60    /// Verify without importing (dry-run).
61    #[clap(long)]
62    dry_run: bool,
63
64    /// Basic authentication (user:password).
65    #[clap(long)]
66    auth_basic: Option<String>,
67
68    /// Request timeout.
69    #[clap(long, value_parser = humantime::parse_duration)]
70    timeout: Option<Duration>,
71
72    /// Proxy server address.
73    ///
74    /// If set, it overrides the system proxy unless `--no-proxy` is specified.
75    /// If neither `--proxy` nor `--no-proxy` is set, system proxy (env) may be used.
76    #[clap(long)]
77    proxy: Option<String>,
78
79    /// Disable all proxy usage (ignores `--proxy` and system proxy).
80    ///
81    /// When set and `--proxy` is not provided, this explicitly disables system proxy.
82    #[clap(long)]
83    no_proxy: bool,
84
85    /// Object store configuration for remote storage backends.
86    #[clap(flatten)]
87    storage: ObjectStoreConfig,
88}
89
90impl ImportV2Command {
91    pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
92        // Validate URI format
93        validate_uri(&self.from)
94            .context(SnapshotStorageSnafu)
95            .map_err(BoxedError::new)?;
96
97        // Parse schemas (empty vec means all schemas)
98        let schemas = if self.schemas.is_empty() {
99            None
100        } else {
101            Some(self.schemas.clone())
102        };
103
104        // Build storage
105        let storage = OpenDalStorage::from_uri(&self.from, &self.storage)
106            .context(SnapshotStorageSnafu)
107            .map_err(BoxedError::new)?;
108
109        // Build database client
110        let proxy = parse_proxy_opts(self.proxy.clone(), self.no_proxy)?;
111        let database_client = DatabaseClient::new(
112            self.addr.clone(),
113            self.catalog.clone(),
114            self.auth_basic.clone(),
115            self.timeout.unwrap_or(Duration::from_secs(60)),
116            proxy,
117            self.no_proxy,
118        );
119
120        Ok(Box::new(Import {
121            catalog: self.catalog.clone(),
122            schemas,
123            dry_run: self.dry_run,
124            snapshot_uri: self.from.clone(),
125            storage_config: self.storage.clone(),
126            storage: Box::new(storage),
127            database_client,
128        }))
129    }
130}
131
132/// Import tool implementation.
133pub struct Import {
134    catalog: String,
135    schemas: Option<Vec<String>>,
136    dry_run: bool,
137    snapshot_uri: String,
138    storage_config: ObjectStoreConfig,
139    storage: Box<dyn SnapshotStorage>,
140    database_client: DatabaseClient,
141}
142
143#[async_trait]
144impl Tool for Import {
145    async fn do_work(&self) -> std::result::Result<(), BoxedError> {
146        self.run().await.map_err(BoxedError::new)
147    }
148}
149
150impl Import {
151    async fn run(&self) -> Result<()> {
152        // 1. Read manifest
153        let manifest = self
154            .storage
155            .read_manifest()
156            .await
157            .context(SnapshotStorageSnafu)?;
158
159        info!(
160            "Loading snapshot: {} (version: {}, schema_only: {})",
161            manifest.snapshot_id, manifest.version, manifest.schema_only
162        );
163
164        // Check version compatibility
165        if manifest.version != MANIFEST_VERSION {
166            return ManifestVersionMismatchSnafu {
167                expected: MANIFEST_VERSION,
168                found: manifest.version,
169            }
170            .fail();
171        }
172
173        info!("Snapshot contains {} schema(s)", manifest.schemas.len());
174
175        // 2. Determine schemas to import
176        let schemas_to_import = match &self.schemas {
177            Some(filter) => canonicalize_schema_filter(filter, &manifest.schemas)?,
178            None => manifest.schemas.clone(),
179        };
180
181        info!("Importing schemas: {:?}", schemas_to_import);
182
183        // 3. Read DDL statements
184        let ddl_statements = self.read_ddl_statements(&schemas_to_import).await?;
185
186        info!("Generated {} DDL statements", ddl_statements.len());
187
188        let data_prefixes = if !manifest.schema_only && !manifest.chunks.is_empty() {
189            Some(
190                validate_data_snapshot(self.storage.as_ref(), &manifest.chunks, &schemas_to_import)
191                    .await?,
192            )
193        } else {
194            None
195        };
196
197        // 4. Dry-run mode: print DDL and exit
198        if self.dry_run {
199            info!("Dry-run mode - DDL statements to execute:");
200            println!();
201            for (i, stmt) in ddl_statements.iter().enumerate() {
202                println!("-- Statement {}", i + 1);
203                println!("{};", stmt.sql);
204                println!();
205            }
206            if !manifest.schema_only && !manifest.chunks.is_empty() {
207                for line in format_data_import_plan(&manifest.chunks, &schemas_to_import) {
208                    println!("{line}");
209                }
210                println!();
211            }
212            return Ok(());
213        }
214
215        // 5. Execute DDL
216        let executor = DdlExecutor::new(&self.database_client);
217        executor.execute_strict(&ddl_statements).await?;
218
219        if !manifest.schema_only && !manifest.chunks.is_empty() {
220            self.import_data(
221                &manifest.chunks,
222                &schemas_to_import,
223                manifest.format,
224                data_prefixes.expect("validated full snapshot must provide data prefixes"),
225            )
226            .await?;
227        }
228
229        info!(
230            "Import completed: {} DDL statements executed",
231            ddl_statements.len()
232        );
233
234        Ok(())
235    }
236
237    async fn read_ddl_statements(&self, schemas: &[String]) -> Result<Vec<DdlStatement>> {
238        let mut statements = Vec::new();
239        for schema in schemas {
240            let path = ddl_path_for_schema(schema);
241            let content = self
242                .storage
243                .read_text(&path)
244                .await
245                .context(SnapshotStorageSnafu)?;
246            statements.extend(
247                parse_ddl_statements(&content)
248                    .into_iter()
249                    .map(|sql| ddl_statement_for_schema(schema, sql)),
250            );
251        }
252
253        Ok(statements)
254    }
255
256    async fn import_data(
257        &self,
258        chunks: &[ChunkMeta],
259        schemas: &[String],
260        format: DataFormat,
261        actual_prefixes: HashSet<String>,
262    ) -> Result<()> {
263        let import_start = Instant::now();
264        let total_chunks = chunks
265            .iter()
266            .filter(|chunk| chunk.status == ChunkStatus::Completed)
267            .count();
268        info!(
269            "Importing data: {} chunks, {} schemas",
270            total_chunks,
271            schemas.len()
272        );
273
274        for (idx, chunk) in chunks.iter().enumerate() {
275            if chunk.status == ChunkStatus::Skipped {
276                info!(
277                    "[{}/{}] Chunk {}: skipped (no data)",
278                    idx + 1,
279                    chunks.len(),
280                    chunk.id
281                );
282                continue;
283            }
284
285            info!(
286                "[{}/{}] Chunk {} ({:?} ~ {:?})",
287                idx + 1,
288                chunks.len(),
289                chunk.id,
290                chunk.time_range.start,
291                chunk.time_range.end
292            );
293
294            for schema in schemas {
295                if !validate_chunk_schema_files(chunk, schema, &actual_prefixes)? {
296                    info!("  {}: no data, skipped", schema);
297                    continue;
298                }
299
300                info!("  {}: importing...", schema);
301                let copy_start = Instant::now();
302                let source =
303                    build_copy_source(&self.snapshot_uri, &self.storage_config, schema, chunk.id)
304                        .context(ChunkImportFailedSnafu {
305                        chunk_id: chunk.id,
306                        schema: schema.clone(),
307                    })?;
308
309                execute_copy_database_from(
310                    &self.database_client,
311                    &self.catalog,
312                    schema,
313                    &source,
314                    format,
315                )
316                .await
317                .context(ChunkImportFailedSnafu {
318                    chunk_id: chunk.id,
319                    schema: schema.clone(),
320                })?;
321
322                info!("  {}: done in {:?}", schema, copy_start.elapsed());
323            }
324        }
325
326        info!("Data import finished in {:?}", import_start.elapsed());
327        Ok(())
328    }
329}
330
331fn parse_ddl_statements(content: &str) -> Vec<String> {
332    let mut statements = Vec::new();
333    let mut current = String::new();
334    let mut chars = content.chars().peekable();
335    let mut in_single_quote = false;
336    let mut in_double_quote = false;
337    let mut in_line_comment = false;
338    let mut in_block_comment = false;
339
340    while let Some(ch) = chars.next() {
341        if in_line_comment {
342            if ch == '\n' {
343                in_line_comment = false;
344                current.push('\n');
345            }
346            continue;
347        }
348
349        if in_block_comment {
350            if ch == '*' && chars.peek() == Some(&'/') {
351                chars.next();
352                in_block_comment = false;
353            }
354            continue;
355        }
356
357        if in_single_quote {
358            current.push(ch);
359            if ch == '\'' {
360                if chars.peek() == Some(&'\'') {
361                    current.push(chars.next().expect("peeked quote must exist"));
362                } else {
363                    in_single_quote = false;
364                }
365            }
366            continue;
367        }
368
369        if in_double_quote {
370            current.push(ch);
371            if ch == '"' {
372                if chars.peek() == Some(&'"') {
373                    current.push(chars.next().expect("peeked quote must exist"));
374                } else {
375                    in_double_quote = false;
376                }
377            }
378            continue;
379        }
380
381        match ch {
382            '-' if chars.peek() == Some(&'-') => {
383                chars.next();
384                in_line_comment = true;
385            }
386            '/' if chars.peek() == Some(&'*') => {
387                chars.next();
388                in_block_comment = true;
389            }
390            '\'' => {
391                in_single_quote = true;
392                current.push(ch);
393            }
394            '"' => {
395                in_double_quote = true;
396                current.push(ch);
397            }
398            ';' => {
399                let statement = current.trim();
400                if !statement.is_empty() {
401                    statements.push(statement.to_string());
402                }
403                current.clear();
404            }
405            _ => current.push(ch),
406        }
407    }
408
409    let statement = current.trim();
410    if !statement.is_empty() {
411        statements.push(statement.to_string());
412    }
413
414    statements
415}
416
417fn ddl_statement_for_schema(schema: &str, sql: String) -> DdlStatement {
418    if is_schema_scoped_statement(&sql) {
419        DdlStatement::with_execution_schema(sql, schema.to_string())
420    } else {
421        DdlStatement::new(sql)
422    }
423}
424
425fn is_schema_scoped_statement(sql: &str) -> bool {
426    let trimmed = sql.trim_start();
427    if !starts_with_keyword(trimmed, "CREATE") {
428        return false;
429    }
430
431    let Some(rest) = trimmed.get("CREATE".len()..) else {
432        return false;
433    };
434    let mut rest = rest.trim_start();
435    if starts_with_keyword(rest, "OR") {
436        let Some(next) = rest.get("OR".len()..) else {
437            return false;
438        };
439        rest = next.trim_start();
440        if !starts_with_keyword(rest, "REPLACE") {
441            return false;
442        }
443        let Some(next) = rest.get("REPLACE".len()..) else {
444            return false;
445        };
446        rest = next.trim_start();
447    }
448
449    if starts_with_keyword(rest, "EXTERNAL") {
450        let Some(next) = rest.get("EXTERNAL".len()..) else {
451            return false;
452        };
453        rest = next.trim_start();
454    }
455
456    starts_with_keyword(rest, "TABLE") || starts_with_keyword(rest, "VIEW")
457}
458
459fn starts_with_keyword(input: &str, keyword: &str) -> bool {
460    input
461        .get(0..keyword.len())
462        .map(|s| s.eq_ignore_ascii_case(keyword))
463        .unwrap_or(false)
464        && input
465            .as_bytes()
466            .get(keyword.len())
467            .map(|b| !b.is_ascii_alphanumeric() && *b != b'_')
468            .unwrap_or(true)
469}
470
471fn canonicalize_schema_filter(
472    filter: &[String],
473    manifest_schemas: &[String],
474) -> Result<Vec<String>> {
475    let mut canonicalized = Vec::new();
476    let mut seen = HashSet::new();
477
478    for schema in filter {
479        let canonical = manifest_schemas
480            .iter()
481            .find(|candidate| candidate.eq_ignore_ascii_case(schema))
482            .cloned()
483            .ok_or_else(|| {
484                SchemaNotInSnapshotSnafu {
485                    schema: schema.clone(),
486                }
487                .build()
488            })?;
489
490        if seen.insert(canonical.to_ascii_lowercase()) {
491            canonicalized.push(canonical);
492        }
493    }
494
495    Ok(canonicalized)
496}
497
498fn validate_chunk_statuses(chunks: &[ChunkMeta]) -> Result<()> {
499    let invalid_chunk = chunks
500        .iter()
501        .find(|chunk| !matches!(chunk.status, ChunkStatus::Completed | ChunkStatus::Skipped));
502
503    if let Some(chunk) = invalid_chunk {
504        return IncompleteSnapshotSnafu {
505            chunk_id: chunk.id,
506            status: chunk.status,
507        }
508        .fail();
509    }
510
511    Ok(())
512}
513
514fn chunk_has_schema_files(chunk: &ChunkMeta, schema: &str) -> bool {
515    let prefix = data_dir_for_schema_chunk(schema, chunk.id);
516    chunk.files.iter().any(|path| {
517        let normalized = path.trim_start_matches('/');
518        normalized.starts_with(&prefix)
519    })
520}
521
522fn format_data_import_plan(chunks: &[ChunkMeta], schemas: &[String]) -> Vec<String> {
523    let mut lines = vec!["-- Data import plan:".to_string()];
524    for chunk in chunks {
525        lines.push(format!("-- Chunk {}: {:?}", chunk.id, chunk.status));
526        for schema in schemas {
527            if chunk_has_schema_files(chunk, schema) {
528                lines.push(format!("--   {} -> COPY DATABASE FROM", schema));
529            }
530        }
531    }
532    lines
533}
534
535async fn validate_data_snapshot(
536    storage: &dyn SnapshotStorage,
537    chunks: &[ChunkMeta],
538    schemas: &[String],
539) -> Result<HashSet<String>> {
540    validate_chunk_statuses(chunks)?;
541    let actual_prefixes = collect_chunk_data_prefixes(storage).await?;
542
543    for chunk in chunks {
544        if chunk.status == ChunkStatus::Skipped {
545            continue;
546        }
547        if chunk.files.is_empty() {
548            return EmptyChunkManifestSnafu { chunk_id: chunk.id }.fail();
549        }
550        for schema in schemas {
551            validate_chunk_schema_files(chunk, schema, &actual_prefixes)?;
552        }
553    }
554
555    Ok(actual_prefixes)
556}
557
558async fn collect_chunk_data_prefixes(storage: &dyn SnapshotStorage) -> Result<HashSet<String>> {
559    let files = storage
560        .list_files_recursive("data/")
561        .await
562        .context(SnapshotStorageSnafu)?;
563    let mut prefixes = HashSet::new();
564
565    for path in files {
566        let normalized = path.trim_start_matches('/');
567        let mut parts = normalized.splitn(4, '/');
568        let Some(root) = parts.next() else {
569            continue;
570        };
571        let Some(schema) = parts.next() else {
572            continue;
573        };
574        let Some(chunk_id) = parts.next() else {
575            continue;
576        };
577        if root != "data" {
578            continue;
579        }
580        prefixes.insert(format!("data/{schema}/{chunk_id}/"));
581    }
582
583    Ok(prefixes)
584}
585
586fn validate_chunk_schema_files(
587    chunk: &ChunkMeta,
588    schema: &str,
589    actual_prefixes: &HashSet<String>,
590) -> Result<bool> {
591    if !chunk_has_schema_files(chunk, schema) {
592        return Ok(false);
593    }
594
595    let prefix = data_dir_for_schema_chunk(schema, chunk.id);
596    if !actual_prefixes.contains(&prefix) {
597        return MissingChunkDataSnafu {
598            chunk_id: chunk.id,
599            schema: schema.to_string(),
600            path: prefix,
601        }
602        .fail();
603    }
604
605    Ok(true)
606}
607
608#[cfg(test)]
609mod tests {
610    use std::collections::{HashMap, HashSet};
611
612    use async_trait::async_trait;
613
614    use super::*;
615    use crate::data::export_v2::manifest::{ChunkMeta, ChunkStatus, Manifest, TimeRange};
616    use crate::data::export_v2::schema::SchemaSnapshot;
617    use crate::data::snapshot_storage::SnapshotStorage;
618
619    struct StubStorage {
620        manifest: Manifest,
621        files_by_prefix: HashMap<String, Vec<String>>,
622    }
623
624    #[async_trait]
625    impl SnapshotStorage for StubStorage {
626        async fn exists(&self) -> crate::data::export_v2::error::Result<bool> {
627            Ok(true)
628        }
629
630        async fn read_manifest(&self) -> crate::data::export_v2::error::Result<Manifest> {
631            Ok(self.manifest.clone())
632        }
633
634        async fn write_manifest(
635            &self,
636            _manifest: &Manifest,
637        ) -> crate::data::export_v2::error::Result<()> {
638            unimplemented!("not needed in import_v2::command tests")
639        }
640
641        async fn read_text(&self, _path: &str) -> crate::data::export_v2::error::Result<String> {
642            unimplemented!("not needed in import_v2::command tests")
643        }
644
645        async fn write_text(
646            &self,
647            _path: &str,
648            _content: &str,
649        ) -> crate::data::export_v2::error::Result<()> {
650            unimplemented!("not needed in import_v2::command tests")
651        }
652
653        async fn write_schema(
654            &self,
655            _snapshot: &SchemaSnapshot,
656        ) -> crate::data::export_v2::error::Result<()> {
657            unimplemented!("not needed in import_v2::command tests")
658        }
659
660        async fn create_dir_all(&self, _path: &str) -> crate::data::export_v2::error::Result<()> {
661            unimplemented!("not needed in import_v2::command tests")
662        }
663
664        async fn list_files_recursive(
665            &self,
666            prefix: &str,
667        ) -> crate::data::export_v2::error::Result<Vec<String>> {
668            Ok(self
669                .files_by_prefix
670                .iter()
671                .filter(|(candidate, _)| candidate.starts_with(prefix))
672                .flat_map(|(_, files)| files.clone())
673                .collect())
674        }
675
676        async fn delete_snapshot(&self) -> crate::data::export_v2::error::Result<()> {
677            unimplemented!("not needed in import_v2::command tests")
678        }
679    }
680
681    #[test]
682    fn test_parse_ddl_statements() {
683        let content = r#"
684-- Schema: public
685CREATE DATABASE public;
686CREATE TABLE t (ts TIMESTAMP TIME INDEX, host STRING, PRIMARY KEY (host)) ENGINE=mito;
687
688-- comment
689CREATE VIEW v AS SELECT * FROM t;
690"#;
691        let statements = parse_ddl_statements(content);
692        assert_eq!(statements.len(), 3);
693        assert!(statements[0].starts_with("CREATE DATABASE public"));
694        assert!(statements[1].starts_with("CREATE TABLE t"));
695        assert!(statements[2].starts_with("CREATE VIEW v"));
696    }
697
698    #[test]
699    fn test_parse_ddl_statements_preserves_semicolons_in_string_literals() {
700        let content = r#"
701CREATE TABLE t (
702    host STRING DEFAULT 'a;b'
703);
704CREATE VIEW v AS SELECT ';' AS marker;
705"#;
706
707        let statements = parse_ddl_statements(content);
708
709        assert_eq!(statements.len(), 2);
710        assert!(statements[0].contains("'a;b'"));
711        assert!(statements[1].contains("';' AS marker"));
712    }
713
714    #[test]
715    fn test_parse_ddl_statements_handles_comments_without_splitting() {
716        let content = r#"
717-- leading comment
718CREATE TABLE t (ts TIMESTAMP TIME INDEX); /* block; comment */
719CREATE VIEW v AS SELECT 1;
720"#;
721
722        let statements = parse_ddl_statements(content);
723
724        assert_eq!(statements.len(), 2);
725        assert!(statements[0].starts_with("CREATE TABLE t"));
726        assert!(statements[1].starts_with("CREATE VIEW v"));
727    }
728
729    #[test]
730    fn test_canonicalize_schema_filter_uses_manifest_casing() {
731        let filter = vec!["TEST_DB".to_string(), "PUBLIC".to_string()];
732        let manifest_schemas = vec!["test_db".to_string(), "public".to_string()];
733
734        let canonicalized = canonicalize_schema_filter(&filter, &manifest_schemas).unwrap();
735
736        assert_eq!(canonicalized, vec!["test_db", "public"]);
737    }
738
739    #[test]
740    fn test_canonicalize_schema_filter_dedupes_case_insensitive_matches() {
741        let filter = vec![
742            "TEST_DB".to_string(),
743            "test_db".to_string(),
744            "PUBLIC".to_string(),
745            "public".to_string(),
746        ];
747        let manifest_schemas = vec!["test_db".to_string(), "public".to_string()];
748
749        let canonicalized = canonicalize_schema_filter(&filter, &manifest_schemas).unwrap();
750
751        assert_eq!(canonicalized, vec!["test_db", "public"]);
752    }
753
754    #[test]
755    fn test_canonicalize_schema_filter_rejects_missing_schema() {
756        let filter = vec!["missing".to_string()];
757        let manifest_schemas = vec!["test_db".to_string()];
758
759        let error = canonicalize_schema_filter(&filter, &manifest_schemas)
760            .expect_err("missing schema should fail")
761            .to_string();
762
763        assert!(error.contains("missing"));
764    }
765
766    #[test]
767    fn test_ddl_statement_for_schema_create_table_uses_execution_schema() {
768        let stmt = ddl_statement_for_schema(
769            "test_db",
770            "CREATE TABLE metrics (ts TIMESTAMP TIME INDEX) ENGINE=mito".to_string(),
771        );
772        assert_eq!(stmt.execution_schema.as_deref(), Some("test_db"));
773    }
774
775    #[test]
776    fn test_ddl_statement_for_schema_create_view_uses_execution_schema() {
777        let stmt = ddl_statement_for_schema(
778            "test_db",
779            "CREATE VIEW metrics_view AS SELECT * FROM metrics".to_string(),
780        );
781        assert_eq!(stmt.execution_schema.as_deref(), Some("test_db"));
782    }
783
784    #[test]
785    fn test_ddl_statement_for_schema_create_or_replace_view_uses_execution_schema() {
786        let stmt = ddl_statement_for_schema(
787            "test_db",
788            "CREATE OR REPLACE VIEW metrics_view AS SELECT * FROM metrics".to_string(),
789        );
790        assert_eq!(stmt.execution_schema.as_deref(), Some("test_db"));
791    }
792
793    #[test]
794    fn test_ddl_statement_for_schema_create_external_table_uses_execution_schema() {
795        let stmt = ddl_statement_for_schema(
796            "test_db",
797            "CREATE EXTERNAL TABLE IF NOT EXISTS ext_metrics (ts TIMESTAMP TIME INDEX) ENGINE=file"
798                .to_string(),
799        );
800        assert_eq!(stmt.execution_schema.as_deref(), Some("test_db"));
801    }
802
803    #[test]
804    fn test_ddl_statement_for_schema_create_database_uses_public_context() {
805        let stmt = ddl_statement_for_schema("test_db", "CREATE DATABASE test_db".to_string());
806        assert_eq!(stmt.execution_schema, None);
807    }
808
809    #[test]
810    fn test_starts_with_keyword_requires_word_boundary() {
811        assert!(starts_with_keyword("CREATE TABLE t", "CREATE"));
812        assert!(!starts_with_keyword("CREATED TABLE t", "CREATE"));
813        assert!(!starts_with_keyword("TABLESPACE foo", "TABLE"));
814    }
815
816    #[test]
817    fn test_validate_chunk_statuses_rejects_failed_chunk() {
818        let mut failed = ChunkMeta::new(3, TimeRange::unbounded());
819        failed.status = ChunkStatus::Failed;
820
821        let error = validate_chunk_statuses(&[failed]).expect_err("failed chunk should error");
822        assert!(error.to_string().contains("Incomplete snapshot"));
823    }
824
825    #[test]
826    fn test_validate_chunk_statuses_accepts_completed_and_skipped_chunks() {
827        let mut completed = ChunkMeta::new(1, TimeRange::unbounded());
828        completed.status = ChunkStatus::Completed;
829        let skipped = ChunkMeta::skipped(2, TimeRange::unbounded());
830
831        assert!(validate_chunk_statuses(&[completed, skipped]).is_ok());
832    }
833
834    #[test]
835    fn test_chunk_has_schema_files_matches_encoded_schema_prefix() {
836        let mut chunk = ChunkMeta::new(7, TimeRange::unbounded());
837        chunk.files = vec![
838            "data/public/7/a.parquet".to_string(),
839            "data/%E6%B5%8B%E8%AF%95/7/b.parquet".to_string(),
840        ];
841
842        assert!(chunk_has_schema_files(&chunk, "public"));
843        assert!(chunk_has_schema_files(&chunk, "测试"));
844        assert!(!chunk_has_schema_files(&chunk, "metrics"));
845    }
846
847    #[test]
848    fn test_format_data_import_plan_includes_matching_schemas_only() {
849        let mut completed = ChunkMeta::new(1, TimeRange::unbounded());
850        completed.status = ChunkStatus::Completed;
851        completed.files = vec![
852            "data/public/1/a.parquet".to_string(),
853            "data/%E6%B5%8B%E8%AF%95/1/b.parquet".to_string(),
854        ];
855        let skipped = ChunkMeta::skipped(2, TimeRange::unbounded());
856
857        let lines = format_data_import_plan(
858            &[completed, skipped],
859            &[
860                "public".to_string(),
861                "测试".to_string(),
862                "metrics".to_string(),
863            ],
864        );
865
866        assert_eq!(lines[0], "-- Data import plan:");
867        assert!(lines.contains(&"-- Chunk 1: Completed".to_string()));
868        assert!(lines.contains(&"--   public -> COPY DATABASE FROM".to_string()));
869        assert!(lines.contains(&"--   测试 -> COPY DATABASE FROM".to_string()));
870        assert!(!lines.contains(&"--   metrics -> COPY DATABASE FROM".to_string()));
871        assert!(lines.contains(&"-- Chunk 2: Skipped".to_string()));
872    }
873
874    #[tokio::test]
875    async fn test_collect_chunk_data_prefixes_indexes_present_prefixes() {
876        let storage = StubStorage {
877            manifest: Manifest::new_schema_only("greptime".to_string(), vec!["public".to_string()]),
878            files_by_prefix: HashMap::from([
879                (
880                    "data/public/7/".to_string(),
881                    vec!["data/public/7/a.parquet".to_string()],
882                ),
883                (
884                    "data/%E6%B5%8B%E8%AF%95/9/".to_string(),
885                    vec!["data/%E6%B5%8B%E8%AF%95/9/b.parquet".to_string()],
886                ),
887            ]),
888        };
889
890        let prefixes = collect_chunk_data_prefixes(&storage).await.unwrap();
891
892        assert!(prefixes.contains("data/public/7/"));
893        assert!(prefixes.contains("data/%E6%B5%8B%E8%AF%95/9/"));
894    }
895
896    #[test]
897    fn test_validate_chunk_schema_files_accepts_present_prefix() {
898        let mut chunk = ChunkMeta::new(7, TimeRange::unbounded());
899        chunk.files = vec!["data/public/7/a.parquet".to_string()];
900        let actual_prefixes = HashSet::from(["data/public/7/".to_string()]);
901
902        assert!(validate_chunk_schema_files(&chunk, "public", &actual_prefixes).unwrap());
903    }
904
905    #[test]
906    fn test_validate_chunk_schema_files_rejects_missing_prefix() {
907        let mut chunk = ChunkMeta::new(7, TimeRange::unbounded());
908        chunk.files = vec!["data/public/7/a.parquet".to_string()];
909
910        let error = validate_chunk_schema_files(&chunk, "public", &HashSet::new())
911            .expect_err("missing chunk prefix should fail")
912            .to_string();
913        assert!(error.contains("marked completed but no files were found"));
914    }
915
916    #[test]
917    fn test_validate_chunk_schema_files_skips_absent_schema() {
918        let mut chunk = ChunkMeta::new(7, TimeRange::unbounded());
919        chunk.files = vec!["data/public/7/a.parquet".to_string()];
920
921        assert!(!validate_chunk_schema_files(&chunk, "metrics", &HashSet::new()).unwrap());
922    }
923
924    #[tokio::test]
925    async fn test_validate_data_snapshot_rejects_failed_chunk_before_dry_run() {
926        let mut failed = ChunkMeta::new(3, TimeRange::unbounded());
927        failed.status = ChunkStatus::Failed;
928
929        let storage = StubStorage {
930            manifest: Manifest::new_schema_only("greptime".to_string(), vec!["public".to_string()]),
931            files_by_prefix: HashMap::new(),
932        };
933
934        let error = validate_data_snapshot(&storage, &[failed], &["public".to_string()])
935            .await
936            .expect_err("failed chunk should reject dry-run validation")
937            .to_string();
938        assert!(error.contains("Incomplete snapshot"));
939    }
940
941    #[tokio::test]
942    async fn test_validate_data_snapshot_rejects_missing_chunk_prefix_before_dry_run() {
943        let mut completed = ChunkMeta::new(7, TimeRange::unbounded());
944        completed.status = ChunkStatus::Completed;
945        completed.files = vec!["data/public/7/a.parquet".to_string()];
946
947        let storage = StubStorage {
948            manifest: Manifest::new_schema_only("greptime".to_string(), vec!["public".to_string()]),
949            files_by_prefix: HashMap::new(),
950        };
951
952        let error = validate_data_snapshot(&storage, &[completed], &["public".to_string()])
953            .await
954            .expect_err("missing chunk prefix should reject dry-run validation")
955            .to_string();
956        assert!(error.contains("marked completed but no files were found"));
957    }
958
959    #[tokio::test]
960    async fn test_validate_data_snapshot_rejects_completed_chunk_with_empty_manifest() {
961        let mut completed = ChunkMeta::new(7, TimeRange::unbounded());
962        completed.status = ChunkStatus::Completed;
963
964        let storage = StubStorage {
965            manifest: Manifest::new_schema_only("greptime".to_string(), vec!["public".to_string()]),
966            files_by_prefix: HashMap::new(),
967        };
968
969        let error = validate_data_snapshot(&storage, &[completed], &["public".to_string()])
970            .await
971            .expect_err("empty completed chunk should reject validation")
972            .to_string();
973        assert!(error.contains("file manifest is empty"));
974    }
975}