cli/data/import_v2/
executor.rs1use common_telemetry::info;
18use snafu::ResultExt;
19
20use crate::data::import_v2::error::{DatabaseSnafu, Result};
21use crate::database::DatabaseClient;
22
23#[derive(Debug, Clone, PartialEq, Eq)]
25pub struct DdlStatement {
26 pub sql: String,
27 pub execution_schema: Option<String>,
28}
29
30impl DdlStatement {
31 pub fn new(sql: String) -> Self {
32 Self {
33 sql,
34 execution_schema: None,
35 }
36 }
37
38 pub fn with_execution_schema(sql: String, schema: String) -> Self {
39 Self {
40 sql,
41 execution_schema: Some(schema),
42 }
43 }
44}
45
46pub struct DdlExecutor<'a> {
48 client: &'a DatabaseClient,
49}
50
51impl<'a> DdlExecutor<'a> {
52 pub fn new(client: &'a DatabaseClient) -> Self {
54 Self { client }
55 }
56
57 pub async fn execute_strict(&self, statements: &[DdlStatement]) -> Result<()> {
59 let total = statements.len();
60
61 for (i, stmt) in statements.iter().enumerate() {
62 let preview = preview_sql(&stmt.sql);
63
64 info!("Executing DDL ({}/{}): {}", i + 1, total, preview);
65
66 if let Some(schema) = stmt.execution_schema.as_deref() {
67 self.client
68 .sql(&stmt.sql, schema)
69 .await
70 .context(DatabaseSnafu)?;
71 } else {
72 self.client
73 .sql_in_public(&stmt.sql)
74 .await
75 .context(DatabaseSnafu)?;
76 }
77 }
78
79 Ok(())
80 }
81}
82
83fn preview_sql(sql: &str) -> String {
84 let mut chars = sql.chars();
85 let preview: String = chars.by_ref().take(80).collect();
86 if chars.next().is_some() {
87 format!("{preview}...")
88 } else {
89 preview
90 }
91}
92#[cfg(test)]
93mod tests {
94 use super::*;
95
96 #[test]
97 fn test_statement_without_execution_schema_uses_public() {
98 let stmt = DdlStatement::new("CREATE DATABASE IF NOT EXISTS test_db".to_string());
99 assert_eq!(stmt.execution_schema, None);
100 }
101
102 #[test]
103 fn test_statement_with_execution_schema_preserves_context() {
104 let stmt = DdlStatement::with_execution_schema(
105 r#"CREATE TABLE IF NOT EXISTS "my""schema"."metrics" (ts TIMESTAMP TIME INDEX)"#
106 .to_string(),
107 r#"my"schema"#.to_string(),
108 );
109 assert_eq!(stmt.execution_schema.as_deref(), Some(r#"my"schema"#));
110 }
111
112 #[test]
113 fn test_preview_sql_truncates_at_char_boundary() {
114 let sql = format!(
115 "CREATE TABLE {} (ts TIMESTAMP TIME INDEX)",
116 "测".repeat(100)
117 );
118 let preview = preview_sql(&sql);
119 assert!(preview.ends_with("..."));
120 assert!(preview.is_char_boundary(preview.len()));
121 }
122}