Skip to main content

cli/data/import_v2/
command.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Import V2 CLI command.
16
17use std::collections::HashSet;
18use std::time::Duration;
19
20use async_trait::async_trait;
21use clap::Parser;
22use common_error::ext::BoxedError;
23use common_telemetry::info;
24use snafu::ResultExt;
25
26use crate::Tool;
27use crate::common::ObjectStoreConfig;
28use crate::data::export_v2::manifest::MANIFEST_VERSION;
29use crate::data::import_v2::error::{
30    FullSnapshotImportNotSupportedSnafu, ManifestVersionMismatchSnafu, Result,
31    SchemaNotInSnapshotSnafu, SnapshotStorageSnafu,
32};
33use crate::data::import_v2::executor::{DdlExecutor, DdlStatement};
34use crate::data::path::ddl_path_for_schema;
35use crate::data::snapshot_storage::{OpenDalStorage, SnapshotStorage, validate_uri};
36use crate::database::{DatabaseClient, parse_proxy_opts};
37
38/// Import from a snapshot.
39#[derive(Debug, Parser)]
40pub struct ImportV2Command {
41    /// Server address to connect (e.g., 127.0.0.1:4000).
42    #[clap(long)]
43    addr: String,
44
45    /// Source snapshot location (e.g., s3://bucket/path, file:///tmp/backup).
46    #[clap(long)]
47    from: String,
48
49    /// Target catalog name.
50    #[clap(long, default_value = "greptime")]
51    catalog: String,
52
53    /// Schema list to import (default: all in snapshot).
54    /// Can be specified multiple times or comma-separated.
55    #[clap(long, value_delimiter = ',')]
56    schemas: Vec<String>,
57
58    /// Verify without importing (dry-run).
59    #[clap(long)]
60    dry_run: bool,
61
62    /// Basic authentication (user:password).
63    #[clap(long)]
64    auth_basic: Option<String>,
65
66    /// Request timeout.
67    #[clap(long, value_parser = humantime::parse_duration)]
68    timeout: Option<Duration>,
69
70    /// Proxy server address.
71    ///
72    /// If set, it overrides the system proxy unless `--no-proxy` is specified.
73    /// If neither `--proxy` nor `--no-proxy` is set, system proxy (env) may be used.
74    #[clap(long)]
75    proxy: Option<String>,
76
77    /// Disable all proxy usage (ignores `--proxy` and system proxy).
78    ///
79    /// When set and `--proxy` is not provided, this explicitly disables system proxy.
80    #[clap(long)]
81    no_proxy: bool,
82
83    /// Object store configuration for remote storage backends.
84    #[clap(flatten)]
85    storage: ObjectStoreConfig,
86}
87
88impl ImportV2Command {
89    pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
90        // Validate URI format
91        validate_uri(&self.from)
92            .context(SnapshotStorageSnafu)
93            .map_err(BoxedError::new)?;
94
95        // Parse schemas (empty vec means all schemas)
96        let schemas = if self.schemas.is_empty() {
97            None
98        } else {
99            Some(self.schemas.clone())
100        };
101
102        // Build storage
103        let storage = OpenDalStorage::from_uri(&self.from, &self.storage)
104            .context(SnapshotStorageSnafu)
105            .map_err(BoxedError::new)?;
106
107        // Build database client
108        let proxy = parse_proxy_opts(self.proxy.clone(), self.no_proxy)?;
109        let database_client = DatabaseClient::new(
110            self.addr.clone(),
111            self.catalog.clone(),
112            self.auth_basic.clone(),
113            self.timeout.unwrap_or(Duration::from_secs(60)),
114            proxy,
115            self.no_proxy,
116        );
117
118        Ok(Box::new(Import {
119            schemas,
120            dry_run: self.dry_run,
121            storage: Box::new(storage),
122            database_client,
123        }))
124    }
125}
126
127/// Import tool implementation.
128pub struct Import {
129    schemas: Option<Vec<String>>,
130    dry_run: bool,
131    storage: Box<dyn SnapshotStorage>,
132    database_client: DatabaseClient,
133}
134
135#[async_trait]
136impl Tool for Import {
137    async fn do_work(&self) -> std::result::Result<(), BoxedError> {
138        self.run().await.map_err(BoxedError::new)
139    }
140}
141
142impl Import {
143    async fn run(&self) -> Result<()> {
144        // 1. Read manifest
145        let manifest = self
146            .storage
147            .read_manifest()
148            .await
149            .context(SnapshotStorageSnafu)?;
150
151        info!(
152            "Loading snapshot: {} (version: {}, schema_only: {})",
153            manifest.snapshot_id, manifest.version, manifest.schema_only
154        );
155
156        // Check version compatibility
157        if manifest.version != MANIFEST_VERSION {
158            return ManifestVersionMismatchSnafu {
159                expected: MANIFEST_VERSION,
160                found: manifest.version,
161            }
162            .fail();
163        }
164
165        info!("Snapshot contains {} schema(s)", manifest.schemas.len());
166
167        if !manifest.schema_only && !manifest.chunks.is_empty() {
168            return FullSnapshotImportNotSupportedSnafu {
169                chunk_count: manifest.chunks.len(),
170            }
171            .fail();
172        }
173
174        // 2. Determine schemas to import
175        let schemas_to_import = match &self.schemas {
176            Some(filter) => canonicalize_schema_filter(filter, &manifest.schemas)?,
177            None => manifest.schemas.clone(),
178        };
179
180        info!("Importing schemas: {:?}", schemas_to_import);
181
182        // 3. Read DDL statements
183        let ddl_statements = self.read_ddl_statements(&schemas_to_import).await?;
184
185        info!("Generated {} DDL statements", ddl_statements.len());
186
187        // 4. Dry-run mode: print DDL and exit
188        if self.dry_run {
189            info!("Dry-run mode - DDL statements to execute:");
190            println!();
191            for (i, stmt) in ddl_statements.iter().enumerate() {
192                println!("-- Statement {}", i + 1);
193                println!("{};", stmt.sql);
194                println!();
195            }
196            return Ok(());
197        }
198
199        // 5. Execute DDL
200        let executor = DdlExecutor::new(&self.database_client);
201        executor.execute_strict(&ddl_statements).await?;
202
203        info!(
204            "Import completed: {} DDL statements executed",
205            ddl_statements.len()
206        );
207
208        Ok(())
209    }
210
211    async fn read_ddl_statements(&self, schemas: &[String]) -> Result<Vec<DdlStatement>> {
212        let mut statements = Vec::new();
213        for schema in schemas {
214            let path = ddl_path_for_schema(schema);
215            let content = self
216                .storage
217                .read_text(&path)
218                .await
219                .context(SnapshotStorageSnafu)?;
220            statements.extend(
221                parse_ddl_statements(&content)
222                    .into_iter()
223                    .map(|sql| ddl_statement_for_schema(schema, sql)),
224            );
225        }
226
227        Ok(statements)
228    }
229}
230
231fn parse_ddl_statements(content: &str) -> Vec<String> {
232    let mut statements = Vec::new();
233    let mut current = String::new();
234    let mut chars = content.chars().peekable();
235    let mut in_single_quote = false;
236    let mut in_double_quote = false;
237    let mut in_line_comment = false;
238    let mut in_block_comment = false;
239
240    while let Some(ch) = chars.next() {
241        if in_line_comment {
242            if ch == '\n' {
243                in_line_comment = false;
244                current.push('\n');
245            }
246            continue;
247        }
248
249        if in_block_comment {
250            if ch == '*' && chars.peek() == Some(&'/') {
251                chars.next();
252                in_block_comment = false;
253            }
254            continue;
255        }
256
257        if in_single_quote {
258            current.push(ch);
259            if ch == '\'' {
260                if chars.peek() == Some(&'\'') {
261                    current.push(chars.next().expect("peeked quote must exist"));
262                } else {
263                    in_single_quote = false;
264                }
265            }
266            continue;
267        }
268
269        if in_double_quote {
270            current.push(ch);
271            if ch == '"' {
272                if chars.peek() == Some(&'"') {
273                    current.push(chars.next().expect("peeked quote must exist"));
274                } else {
275                    in_double_quote = false;
276                }
277            }
278            continue;
279        }
280
281        match ch {
282            '-' if chars.peek() == Some(&'-') => {
283                chars.next();
284                in_line_comment = true;
285            }
286            '/' if chars.peek() == Some(&'*') => {
287                chars.next();
288                in_block_comment = true;
289            }
290            '\'' => {
291                in_single_quote = true;
292                current.push(ch);
293            }
294            '"' => {
295                in_double_quote = true;
296                current.push(ch);
297            }
298            ';' => {
299                let statement = current.trim();
300                if !statement.is_empty() {
301                    statements.push(statement.to_string());
302                }
303                current.clear();
304            }
305            _ => current.push(ch),
306        }
307    }
308
309    let statement = current.trim();
310    if !statement.is_empty() {
311        statements.push(statement.to_string());
312    }
313
314    statements
315}
316
317fn ddl_statement_for_schema(schema: &str, sql: String) -> DdlStatement {
318    if is_schema_scoped_statement(&sql) {
319        DdlStatement::with_execution_schema(sql, schema.to_string())
320    } else {
321        DdlStatement::new(sql)
322    }
323}
324
325fn is_schema_scoped_statement(sql: &str) -> bool {
326    let trimmed = sql.trim_start();
327    if !starts_with_keyword(trimmed, "CREATE") {
328        return false;
329    }
330
331    let Some(rest) = trimmed.get("CREATE".len()..) else {
332        return false;
333    };
334    let mut rest = rest.trim_start();
335    if starts_with_keyword(rest, "OR") {
336        let Some(next) = rest.get("OR".len()..) else {
337            return false;
338        };
339        rest = next.trim_start();
340        if !starts_with_keyword(rest, "REPLACE") {
341            return false;
342        }
343        let Some(next) = rest.get("REPLACE".len()..) else {
344            return false;
345        };
346        rest = next.trim_start();
347    }
348
349    if starts_with_keyword(rest, "EXTERNAL") {
350        let Some(next) = rest.get("EXTERNAL".len()..) else {
351            return false;
352        };
353        rest = next.trim_start();
354    }
355
356    starts_with_keyword(rest, "TABLE") || starts_with_keyword(rest, "VIEW")
357}
358
359fn starts_with_keyword(input: &str, keyword: &str) -> bool {
360    input
361        .get(0..keyword.len())
362        .map(|s| s.eq_ignore_ascii_case(keyword))
363        .unwrap_or(false)
364        && input
365            .as_bytes()
366            .get(keyword.len())
367            .map(|b| !b.is_ascii_alphanumeric() && *b != b'_')
368            .unwrap_or(true)
369}
370
371fn canonicalize_schema_filter(
372    filter: &[String],
373    manifest_schemas: &[String],
374) -> Result<Vec<String>> {
375    let mut canonicalized = Vec::new();
376    let mut seen = HashSet::new();
377
378    for schema in filter {
379        let canonical = manifest_schemas
380            .iter()
381            .find(|candidate| candidate.eq_ignore_ascii_case(schema))
382            .cloned()
383            .ok_or_else(|| {
384                SchemaNotInSnapshotSnafu {
385                    schema: schema.clone(),
386                }
387                .build()
388            })?;
389
390        if seen.insert(canonical.to_ascii_lowercase()) {
391            canonicalized.push(canonical);
392        }
393    }
394
395    Ok(canonicalized)
396}
397
398#[cfg(test)]
399mod tests {
400    use std::time::Duration;
401
402    use async_trait::async_trait;
403
404    use super::*;
405    use crate::Tool;
406    use crate::data::export_v2::manifest::{ChunkMeta, DataFormat, Manifest, TimeRange};
407    use crate::data::export_v2::schema::SchemaSnapshot;
408    use crate::data::snapshot_storage::SnapshotStorage;
409    use crate::database::DatabaseClient;
410
411    struct StubStorage {
412        manifest: Manifest,
413    }
414
415    #[async_trait]
416    impl SnapshotStorage for StubStorage {
417        async fn exists(&self) -> crate::data::export_v2::error::Result<bool> {
418            Ok(true)
419        }
420
421        async fn read_manifest(&self) -> crate::data::export_v2::error::Result<Manifest> {
422            Ok(self.manifest.clone())
423        }
424
425        async fn write_manifest(
426            &self,
427            _manifest: &Manifest,
428        ) -> crate::data::export_v2::error::Result<()> {
429            unimplemented!("not needed in import_v2::command tests")
430        }
431
432        async fn read_text(&self, _path: &str) -> crate::data::export_v2::error::Result<String> {
433            unimplemented!("not needed in import_v2::command tests")
434        }
435
436        async fn write_text(
437            &self,
438            _path: &str,
439            _content: &str,
440        ) -> crate::data::export_v2::error::Result<()> {
441            unimplemented!("not needed in import_v2::command tests")
442        }
443
444        async fn write_schema(
445            &self,
446            _snapshot: &SchemaSnapshot,
447        ) -> crate::data::export_v2::error::Result<()> {
448            unimplemented!("not needed in import_v2::command tests")
449        }
450
451        async fn create_dir_all(&self, _path: &str) -> crate::data::export_v2::error::Result<()> {
452            unimplemented!("not needed in import_v2::command tests")
453        }
454
455        async fn list_files_recursive(
456            &self,
457            _prefix: &str,
458        ) -> crate::data::export_v2::error::Result<Vec<String>> {
459            unimplemented!("not needed in import_v2::command tests")
460        }
461
462        async fn delete_snapshot(&self) -> crate::data::export_v2::error::Result<()> {
463            unimplemented!("not needed in import_v2::command tests")
464        }
465    }
466
467    fn test_database_client() -> DatabaseClient {
468        DatabaseClient::new(
469            "127.0.0.1:4000".to_string(),
470            "greptime".to_string(),
471            None,
472            Duration::from_secs(1),
473            None,
474            false,
475        )
476    }
477
478    #[tokio::test]
479    async fn test_import_rejects_full_snapshot_before_schema_execution() {
480        let mut manifest = Manifest::new_full(
481            "greptime".to_string(),
482            vec!["public".to_string()],
483            TimeRange::unbounded(),
484            DataFormat::Parquet,
485        );
486        manifest
487            .chunks
488            .push(ChunkMeta::new(1, TimeRange::unbounded()));
489
490        let import = Import {
491            schemas: None,
492            dry_run: false,
493            storage: Box::new(StubStorage { manifest }),
494            database_client: test_database_client(),
495        };
496
497        let error = import
498            .do_work()
499            .await
500            .expect_err("full snapshot import should fail");
501
502        assert!(
503            error
504                .to_string()
505                .contains("Importing data from full snapshots is not implemented yet")
506        );
507    }
508
509    #[test]
510    fn test_parse_ddl_statements() {
511        let content = r#"
512-- Schema: public
513CREATE DATABASE public;
514CREATE TABLE t (ts TIMESTAMP TIME INDEX, host STRING, PRIMARY KEY (host)) ENGINE=mito;
515
516-- comment
517CREATE VIEW v AS SELECT * FROM t;
518"#;
519        let statements = parse_ddl_statements(content);
520        assert_eq!(statements.len(), 3);
521        assert!(statements[0].starts_with("CREATE DATABASE public"));
522        assert!(statements[1].starts_with("CREATE TABLE t"));
523        assert!(statements[2].starts_with("CREATE VIEW v"));
524    }
525
526    #[test]
527    fn test_parse_ddl_statements_preserves_semicolons_in_string_literals() {
528        let content = r#"
529CREATE TABLE t (
530    host STRING DEFAULT 'a;b'
531);
532CREATE VIEW v AS SELECT ';' AS marker;
533"#;
534
535        let statements = parse_ddl_statements(content);
536
537        assert_eq!(statements.len(), 2);
538        assert!(statements[0].contains("'a;b'"));
539        assert!(statements[1].contains("';' AS marker"));
540    }
541
542    #[test]
543    fn test_parse_ddl_statements_handles_comments_without_splitting() {
544        let content = r#"
545-- leading comment
546CREATE TABLE t (ts TIMESTAMP TIME INDEX); /* block; comment */
547CREATE VIEW v AS SELECT 1;
548"#;
549
550        let statements = parse_ddl_statements(content);
551
552        assert_eq!(statements.len(), 2);
553        assert!(statements[0].starts_with("CREATE TABLE t"));
554        assert!(statements[1].starts_with("CREATE VIEW v"));
555    }
556
557    #[test]
558    fn test_canonicalize_schema_filter_uses_manifest_casing() {
559        let filter = vec!["TEST_DB".to_string(), "PUBLIC".to_string()];
560        let manifest_schemas = vec!["test_db".to_string(), "public".to_string()];
561
562        let canonicalized = canonicalize_schema_filter(&filter, &manifest_schemas).unwrap();
563
564        assert_eq!(canonicalized, vec!["test_db", "public"]);
565    }
566
567    #[test]
568    fn test_canonicalize_schema_filter_dedupes_case_insensitive_matches() {
569        let filter = vec![
570            "TEST_DB".to_string(),
571            "test_db".to_string(),
572            "PUBLIC".to_string(),
573            "public".to_string(),
574        ];
575        let manifest_schemas = vec!["test_db".to_string(), "public".to_string()];
576
577        let canonicalized = canonicalize_schema_filter(&filter, &manifest_schemas).unwrap();
578
579        assert_eq!(canonicalized, vec!["test_db", "public"]);
580    }
581
582    #[test]
583    fn test_canonicalize_schema_filter_rejects_missing_schema() {
584        let filter = vec!["missing".to_string()];
585        let manifest_schemas = vec!["test_db".to_string()];
586
587        let error = canonicalize_schema_filter(&filter, &manifest_schemas)
588            .expect_err("missing schema should fail")
589            .to_string();
590
591        assert!(error.contains("missing"));
592    }
593
594    #[test]
595    fn test_ddl_statement_for_schema_create_table_uses_execution_schema() {
596        let stmt = ddl_statement_for_schema(
597            "test_db",
598            "CREATE TABLE metrics (ts TIMESTAMP TIME INDEX) ENGINE=mito".to_string(),
599        );
600        assert_eq!(stmt.execution_schema.as_deref(), Some("test_db"));
601    }
602
603    #[test]
604    fn test_ddl_statement_for_schema_create_view_uses_execution_schema() {
605        let stmt = ddl_statement_for_schema(
606            "test_db",
607            "CREATE VIEW metrics_view AS SELECT * FROM metrics".to_string(),
608        );
609        assert_eq!(stmt.execution_schema.as_deref(), Some("test_db"));
610    }
611
612    #[test]
613    fn test_ddl_statement_for_schema_create_or_replace_view_uses_execution_schema() {
614        let stmt = ddl_statement_for_schema(
615            "test_db",
616            "CREATE OR REPLACE VIEW metrics_view AS SELECT * FROM metrics".to_string(),
617        );
618        assert_eq!(stmt.execution_schema.as_deref(), Some("test_db"));
619    }
620
621    #[test]
622    fn test_ddl_statement_for_schema_create_external_table_uses_execution_schema() {
623        let stmt = ddl_statement_for_schema(
624            "test_db",
625            "CREATE EXTERNAL TABLE IF NOT EXISTS ext_metrics (ts TIMESTAMP TIME INDEX) ENGINE=file"
626                .to_string(),
627        );
628        assert_eq!(stmt.execution_schema.as_deref(), Some("test_db"));
629    }
630
631    #[test]
632    fn test_ddl_statement_for_schema_create_database_uses_public_context() {
633        let stmt = ddl_statement_for_schema("test_db", "CREATE DATABASE test_db".to_string());
634        assert_eq!(stmt.execution_schema, None);
635    }
636
637    #[test]
638    fn test_starts_with_keyword_requires_word_boundary() {
639        assert!(starts_with_keyword("CREATE TABLE t", "CREATE"));
640        assert!(!starts_with_keyword("CREATED TABLE t", "CREATE"));
641        assert!(!starts_with_keyword("TABLESPACE foo", "TABLE"));
642    }
643}