1use std::collections::HashSet;
18use std::time::Duration;
19
20use async_trait::async_trait;
21use clap::Parser;
22use common_error::ext::BoxedError;
23use common_telemetry::info;
24use snafu::ResultExt;
25
26use crate::Tool;
27use crate::common::ObjectStoreConfig;
28use crate::data::export_v2::manifest::MANIFEST_VERSION;
29use crate::data::import_v2::error::{
30 FullSnapshotImportNotSupportedSnafu, ManifestVersionMismatchSnafu, Result,
31 SchemaNotInSnapshotSnafu, SnapshotStorageSnafu,
32};
33use crate::data::import_v2::executor::{DdlExecutor, DdlStatement};
34use crate::data::path::ddl_path_for_schema;
35use crate::data::snapshot_storage::{OpenDalStorage, SnapshotStorage, validate_uri};
36use crate::database::{DatabaseClient, parse_proxy_opts};
37
38#[derive(Debug, Parser)]
40pub struct ImportV2Command {
41 #[clap(long)]
43 addr: String,
44
45 #[clap(long)]
47 from: String,
48
49 #[clap(long, default_value = "greptime")]
51 catalog: String,
52
53 #[clap(long, value_delimiter = ',')]
56 schemas: Vec<String>,
57
58 #[clap(long)]
60 dry_run: bool,
61
62 #[clap(long)]
64 auth_basic: Option<String>,
65
66 #[clap(long, value_parser = humantime::parse_duration)]
68 timeout: Option<Duration>,
69
70 #[clap(long)]
75 proxy: Option<String>,
76
77 #[clap(long)]
81 no_proxy: bool,
82
83 #[clap(flatten)]
85 storage: ObjectStoreConfig,
86}
87
88impl ImportV2Command {
89 pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
90 validate_uri(&self.from)
92 .context(SnapshotStorageSnafu)
93 .map_err(BoxedError::new)?;
94
95 let schemas = if self.schemas.is_empty() {
97 None
98 } else {
99 Some(self.schemas.clone())
100 };
101
102 let storage = OpenDalStorage::from_uri(&self.from, &self.storage)
104 .context(SnapshotStorageSnafu)
105 .map_err(BoxedError::new)?;
106
107 let proxy = parse_proxy_opts(self.proxy.clone(), self.no_proxy)?;
109 let database_client = DatabaseClient::new(
110 self.addr.clone(),
111 self.catalog.clone(),
112 self.auth_basic.clone(),
113 self.timeout.unwrap_or(Duration::from_secs(60)),
114 proxy,
115 self.no_proxy,
116 );
117
118 Ok(Box::new(Import {
119 schemas,
120 dry_run: self.dry_run,
121 storage: Box::new(storage),
122 database_client,
123 }))
124 }
125}
126
127pub struct Import {
129 schemas: Option<Vec<String>>,
130 dry_run: bool,
131 storage: Box<dyn SnapshotStorage>,
132 database_client: DatabaseClient,
133}
134
135#[async_trait]
136impl Tool for Import {
137 async fn do_work(&self) -> std::result::Result<(), BoxedError> {
138 self.run().await.map_err(BoxedError::new)
139 }
140}
141
142impl Import {
143 async fn run(&self) -> Result<()> {
144 let manifest = self
146 .storage
147 .read_manifest()
148 .await
149 .context(SnapshotStorageSnafu)?;
150
151 info!(
152 "Loading snapshot: {} (version: {}, schema_only: {})",
153 manifest.snapshot_id, manifest.version, manifest.schema_only
154 );
155
156 if manifest.version != MANIFEST_VERSION {
158 return ManifestVersionMismatchSnafu {
159 expected: MANIFEST_VERSION,
160 found: manifest.version,
161 }
162 .fail();
163 }
164
165 info!("Snapshot contains {} schema(s)", manifest.schemas.len());
166
167 if !manifest.schema_only && !manifest.chunks.is_empty() {
168 return FullSnapshotImportNotSupportedSnafu {
169 chunk_count: manifest.chunks.len(),
170 }
171 .fail();
172 }
173
174 let schemas_to_import = match &self.schemas {
176 Some(filter) => canonicalize_schema_filter(filter, &manifest.schemas)?,
177 None => manifest.schemas.clone(),
178 };
179
180 info!("Importing schemas: {:?}", schemas_to_import);
181
182 let ddl_statements = self.read_ddl_statements(&schemas_to_import).await?;
184
185 info!("Generated {} DDL statements", ddl_statements.len());
186
187 if self.dry_run {
189 info!("Dry-run mode - DDL statements to execute:");
190 println!();
191 for (i, stmt) in ddl_statements.iter().enumerate() {
192 println!("-- Statement {}", i + 1);
193 println!("{};", stmt.sql);
194 println!();
195 }
196 return Ok(());
197 }
198
199 let executor = DdlExecutor::new(&self.database_client);
201 executor.execute_strict(&ddl_statements).await?;
202
203 info!(
204 "Import completed: {} DDL statements executed",
205 ddl_statements.len()
206 );
207
208 Ok(())
209 }
210
211 async fn read_ddl_statements(&self, schemas: &[String]) -> Result<Vec<DdlStatement>> {
212 let mut statements = Vec::new();
213 for schema in schemas {
214 let path = ddl_path_for_schema(schema);
215 let content = self
216 .storage
217 .read_text(&path)
218 .await
219 .context(SnapshotStorageSnafu)?;
220 statements.extend(
221 parse_ddl_statements(&content)
222 .into_iter()
223 .map(|sql| ddl_statement_for_schema(schema, sql)),
224 );
225 }
226
227 Ok(statements)
228 }
229}
230
231fn parse_ddl_statements(content: &str) -> Vec<String> {
232 let mut statements = Vec::new();
233 let mut current = String::new();
234 let mut chars = content.chars().peekable();
235 let mut in_single_quote = false;
236 let mut in_double_quote = false;
237 let mut in_line_comment = false;
238 let mut in_block_comment = false;
239
240 while let Some(ch) = chars.next() {
241 if in_line_comment {
242 if ch == '\n' {
243 in_line_comment = false;
244 current.push('\n');
245 }
246 continue;
247 }
248
249 if in_block_comment {
250 if ch == '*' && chars.peek() == Some(&'/') {
251 chars.next();
252 in_block_comment = false;
253 }
254 continue;
255 }
256
257 if in_single_quote {
258 current.push(ch);
259 if ch == '\'' {
260 if chars.peek() == Some(&'\'') {
261 current.push(chars.next().expect("peeked quote must exist"));
262 } else {
263 in_single_quote = false;
264 }
265 }
266 continue;
267 }
268
269 if in_double_quote {
270 current.push(ch);
271 if ch == '"' {
272 if chars.peek() == Some(&'"') {
273 current.push(chars.next().expect("peeked quote must exist"));
274 } else {
275 in_double_quote = false;
276 }
277 }
278 continue;
279 }
280
281 match ch {
282 '-' if chars.peek() == Some(&'-') => {
283 chars.next();
284 in_line_comment = true;
285 }
286 '/' if chars.peek() == Some(&'*') => {
287 chars.next();
288 in_block_comment = true;
289 }
290 '\'' => {
291 in_single_quote = true;
292 current.push(ch);
293 }
294 '"' => {
295 in_double_quote = true;
296 current.push(ch);
297 }
298 ';' => {
299 let statement = current.trim();
300 if !statement.is_empty() {
301 statements.push(statement.to_string());
302 }
303 current.clear();
304 }
305 _ => current.push(ch),
306 }
307 }
308
309 let statement = current.trim();
310 if !statement.is_empty() {
311 statements.push(statement.to_string());
312 }
313
314 statements
315}
316
317fn ddl_statement_for_schema(schema: &str, sql: String) -> DdlStatement {
318 if is_schema_scoped_statement(&sql) {
319 DdlStatement::with_execution_schema(sql, schema.to_string())
320 } else {
321 DdlStatement::new(sql)
322 }
323}
324
325fn is_schema_scoped_statement(sql: &str) -> bool {
326 let trimmed = sql.trim_start();
327 if !starts_with_keyword(trimmed, "CREATE") {
328 return false;
329 }
330
331 let Some(rest) = trimmed.get("CREATE".len()..) else {
332 return false;
333 };
334 let mut rest = rest.trim_start();
335 if starts_with_keyword(rest, "OR") {
336 let Some(next) = rest.get("OR".len()..) else {
337 return false;
338 };
339 rest = next.trim_start();
340 if !starts_with_keyword(rest, "REPLACE") {
341 return false;
342 }
343 let Some(next) = rest.get("REPLACE".len()..) else {
344 return false;
345 };
346 rest = next.trim_start();
347 }
348
349 if starts_with_keyword(rest, "EXTERNAL") {
350 let Some(next) = rest.get("EXTERNAL".len()..) else {
351 return false;
352 };
353 rest = next.trim_start();
354 }
355
356 starts_with_keyword(rest, "TABLE") || starts_with_keyword(rest, "VIEW")
357}
358
359fn starts_with_keyword(input: &str, keyword: &str) -> bool {
360 input
361 .get(0..keyword.len())
362 .map(|s| s.eq_ignore_ascii_case(keyword))
363 .unwrap_or(false)
364 && input
365 .as_bytes()
366 .get(keyword.len())
367 .map(|b| !b.is_ascii_alphanumeric() && *b != b'_')
368 .unwrap_or(true)
369}
370
371fn canonicalize_schema_filter(
372 filter: &[String],
373 manifest_schemas: &[String],
374) -> Result<Vec<String>> {
375 let mut canonicalized = Vec::new();
376 let mut seen = HashSet::new();
377
378 for schema in filter {
379 let canonical = manifest_schemas
380 .iter()
381 .find(|candidate| candidate.eq_ignore_ascii_case(schema))
382 .cloned()
383 .ok_or_else(|| {
384 SchemaNotInSnapshotSnafu {
385 schema: schema.clone(),
386 }
387 .build()
388 })?;
389
390 if seen.insert(canonical.to_ascii_lowercase()) {
391 canonicalized.push(canonical);
392 }
393 }
394
395 Ok(canonicalized)
396}
397
398#[cfg(test)]
399mod tests {
400 use std::time::Duration;
401
402 use async_trait::async_trait;
403
404 use super::*;
405 use crate::Tool;
406 use crate::data::export_v2::manifest::{ChunkMeta, DataFormat, Manifest, TimeRange};
407 use crate::data::export_v2::schema::SchemaSnapshot;
408 use crate::data::snapshot_storage::SnapshotStorage;
409 use crate::database::DatabaseClient;
410
411 struct StubStorage {
412 manifest: Manifest,
413 }
414
415 #[async_trait]
416 impl SnapshotStorage for StubStorage {
417 async fn exists(&self) -> crate::data::export_v2::error::Result<bool> {
418 Ok(true)
419 }
420
421 async fn read_manifest(&self) -> crate::data::export_v2::error::Result<Manifest> {
422 Ok(self.manifest.clone())
423 }
424
425 async fn write_manifest(
426 &self,
427 _manifest: &Manifest,
428 ) -> crate::data::export_v2::error::Result<()> {
429 unimplemented!("not needed in import_v2::command tests")
430 }
431
432 async fn read_text(&self, _path: &str) -> crate::data::export_v2::error::Result<String> {
433 unimplemented!("not needed in import_v2::command tests")
434 }
435
436 async fn write_text(
437 &self,
438 _path: &str,
439 _content: &str,
440 ) -> crate::data::export_v2::error::Result<()> {
441 unimplemented!("not needed in import_v2::command tests")
442 }
443
444 async fn write_schema(
445 &self,
446 _snapshot: &SchemaSnapshot,
447 ) -> crate::data::export_v2::error::Result<()> {
448 unimplemented!("not needed in import_v2::command tests")
449 }
450
451 async fn create_dir_all(&self, _path: &str) -> crate::data::export_v2::error::Result<()> {
452 unimplemented!("not needed in import_v2::command tests")
453 }
454
455 async fn list_files_recursive(
456 &self,
457 _prefix: &str,
458 ) -> crate::data::export_v2::error::Result<Vec<String>> {
459 unimplemented!("not needed in import_v2::command tests")
460 }
461
462 async fn delete_snapshot(&self) -> crate::data::export_v2::error::Result<()> {
463 unimplemented!("not needed in import_v2::command tests")
464 }
465 }
466
467 fn test_database_client() -> DatabaseClient {
468 DatabaseClient::new(
469 "127.0.0.1:4000".to_string(),
470 "greptime".to_string(),
471 None,
472 Duration::from_secs(1),
473 None,
474 false,
475 )
476 }
477
478 #[tokio::test]
479 async fn test_import_rejects_full_snapshot_before_schema_execution() {
480 let mut manifest = Manifest::new_full(
481 "greptime".to_string(),
482 vec!["public".to_string()],
483 TimeRange::unbounded(),
484 DataFormat::Parquet,
485 );
486 manifest
487 .chunks
488 .push(ChunkMeta::new(1, TimeRange::unbounded()));
489
490 let import = Import {
491 schemas: None,
492 dry_run: false,
493 storage: Box::new(StubStorage { manifest }),
494 database_client: test_database_client(),
495 };
496
497 let error = import
498 .do_work()
499 .await
500 .expect_err("full snapshot import should fail");
501
502 assert!(
503 error
504 .to_string()
505 .contains("Importing data from full snapshots is not implemented yet")
506 );
507 }
508
509 #[test]
510 fn test_parse_ddl_statements() {
511 let content = r#"
512-- Schema: public
513CREATE DATABASE public;
514CREATE TABLE t (ts TIMESTAMP TIME INDEX, host STRING, PRIMARY KEY (host)) ENGINE=mito;
515
516-- comment
517CREATE VIEW v AS SELECT * FROM t;
518"#;
519 let statements = parse_ddl_statements(content);
520 assert_eq!(statements.len(), 3);
521 assert!(statements[0].starts_with("CREATE DATABASE public"));
522 assert!(statements[1].starts_with("CREATE TABLE t"));
523 assert!(statements[2].starts_with("CREATE VIEW v"));
524 }
525
526 #[test]
527 fn test_parse_ddl_statements_preserves_semicolons_in_string_literals() {
528 let content = r#"
529CREATE TABLE t (
530 host STRING DEFAULT 'a;b'
531);
532CREATE VIEW v AS SELECT ';' AS marker;
533"#;
534
535 let statements = parse_ddl_statements(content);
536
537 assert_eq!(statements.len(), 2);
538 assert!(statements[0].contains("'a;b'"));
539 assert!(statements[1].contains("';' AS marker"));
540 }
541
542 #[test]
543 fn test_parse_ddl_statements_handles_comments_without_splitting() {
544 let content = r#"
545-- leading comment
546CREATE TABLE t (ts TIMESTAMP TIME INDEX); /* block; comment */
547CREATE VIEW v AS SELECT 1;
548"#;
549
550 let statements = parse_ddl_statements(content);
551
552 assert_eq!(statements.len(), 2);
553 assert!(statements[0].starts_with("CREATE TABLE t"));
554 assert!(statements[1].starts_with("CREATE VIEW v"));
555 }
556
557 #[test]
558 fn test_canonicalize_schema_filter_uses_manifest_casing() {
559 let filter = vec!["TEST_DB".to_string(), "PUBLIC".to_string()];
560 let manifest_schemas = vec!["test_db".to_string(), "public".to_string()];
561
562 let canonicalized = canonicalize_schema_filter(&filter, &manifest_schemas).unwrap();
563
564 assert_eq!(canonicalized, vec!["test_db", "public"]);
565 }
566
567 #[test]
568 fn test_canonicalize_schema_filter_dedupes_case_insensitive_matches() {
569 let filter = vec![
570 "TEST_DB".to_string(),
571 "test_db".to_string(),
572 "PUBLIC".to_string(),
573 "public".to_string(),
574 ];
575 let manifest_schemas = vec!["test_db".to_string(), "public".to_string()];
576
577 let canonicalized = canonicalize_schema_filter(&filter, &manifest_schemas).unwrap();
578
579 assert_eq!(canonicalized, vec!["test_db", "public"]);
580 }
581
582 #[test]
583 fn test_canonicalize_schema_filter_rejects_missing_schema() {
584 let filter = vec!["missing".to_string()];
585 let manifest_schemas = vec!["test_db".to_string()];
586
587 let error = canonicalize_schema_filter(&filter, &manifest_schemas)
588 .expect_err("missing schema should fail")
589 .to_string();
590
591 assert!(error.contains("missing"));
592 }
593
594 #[test]
595 fn test_ddl_statement_for_schema_create_table_uses_execution_schema() {
596 let stmt = ddl_statement_for_schema(
597 "test_db",
598 "CREATE TABLE metrics (ts TIMESTAMP TIME INDEX) ENGINE=mito".to_string(),
599 );
600 assert_eq!(stmt.execution_schema.as_deref(), Some("test_db"));
601 }
602
603 #[test]
604 fn test_ddl_statement_for_schema_create_view_uses_execution_schema() {
605 let stmt = ddl_statement_for_schema(
606 "test_db",
607 "CREATE VIEW metrics_view AS SELECT * FROM metrics".to_string(),
608 );
609 assert_eq!(stmt.execution_schema.as_deref(), Some("test_db"));
610 }
611
612 #[test]
613 fn test_ddl_statement_for_schema_create_or_replace_view_uses_execution_schema() {
614 let stmt = ddl_statement_for_schema(
615 "test_db",
616 "CREATE OR REPLACE VIEW metrics_view AS SELECT * FROM metrics".to_string(),
617 );
618 assert_eq!(stmt.execution_schema.as_deref(), Some("test_db"));
619 }
620
621 #[test]
622 fn test_ddl_statement_for_schema_create_external_table_uses_execution_schema() {
623 let stmt = ddl_statement_for_schema(
624 "test_db",
625 "CREATE EXTERNAL TABLE IF NOT EXISTS ext_metrics (ts TIMESTAMP TIME INDEX) ENGINE=file"
626 .to_string(),
627 );
628 assert_eq!(stmt.execution_schema.as_deref(), Some("test_db"));
629 }
630
631 #[test]
632 fn test_ddl_statement_for_schema_create_database_uses_public_context() {
633 let stmt = ddl_statement_for_schema("test_db", "CREATE DATABASE test_db".to_string());
634 assert_eq!(stmt.execution_schema, None);
635 }
636
637 #[test]
638 fn test_starts_with_keyword_requires_word_boundary() {
639 assert!(starts_with_keyword("CREATE TABLE t", "CREATE"));
640 assert!(!starts_with_keyword("CREATED TABLE t", "CREATE"));
641 assert!(!starts_with_keyword("TABLESPACE foo", "TABLE"));
642 }
643}