Skip to main content

cli/data/import_v2/
executor.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! DDL execution for import.
16
17use common_telemetry::info;
18use snafu::ResultExt;
19
20use crate::data::import_v2::error::{DatabaseSnafu, Result};
21use crate::database::DatabaseClient;
22
23/// A DDL statement with an explicit execution schema context.
24#[derive(Debug, Clone, PartialEq, Eq)]
25pub struct DdlStatement {
26    pub sql: String,
27    pub execution_schema: Option<String>,
28}
29
30impl DdlStatement {
31    pub fn new(sql: String) -> Self {
32        Self {
33            sql,
34            execution_schema: None,
35        }
36    }
37
38    pub fn with_execution_schema(sql: String, schema: String) -> Self {
39        Self {
40            sql,
41            execution_schema: Some(schema),
42        }
43    }
44}
45
46/// Executes DDL statements against the database.
47pub struct DdlExecutor<'a> {
48    client: &'a DatabaseClient,
49}
50
51impl<'a> DdlExecutor<'a> {
52    /// Creates a new DDL executor.
53    pub fn new(client: &'a DatabaseClient) -> Self {
54        Self { client }
55    }
56
57    /// Executes a list of DDL statements, stopping on first error.
58    pub async fn execute_strict(&self, statements: &[DdlStatement]) -> Result<()> {
59        let total = statements.len();
60
61        for (i, stmt) in statements.iter().enumerate() {
62            let preview = preview_sql(&stmt.sql);
63
64            info!("Executing DDL ({}/{}): {}", i + 1, total, preview);
65
66            if let Some(schema) = stmt.execution_schema.as_deref() {
67                self.client
68                    .sql(&stmt.sql, schema)
69                    .await
70                    .context(DatabaseSnafu)?;
71            } else {
72                self.client
73                    .sql_in_public(&stmt.sql)
74                    .await
75                    .context(DatabaseSnafu)?;
76            }
77        }
78
79        Ok(())
80    }
81}
82
83fn preview_sql(sql: &str) -> String {
84    let mut chars = sql.chars();
85    let preview: String = chars.by_ref().take(80).collect();
86    if chars.next().is_some() {
87        format!("{preview}...")
88    } else {
89        preview
90    }
91}
92#[cfg(test)]
93mod tests {
94    use super::*;
95
96    #[test]
97    fn test_statement_without_execution_schema_uses_public() {
98        let stmt = DdlStatement::new("CREATE DATABASE IF NOT EXISTS test_db".to_string());
99        assert_eq!(stmt.execution_schema, None);
100    }
101
102    #[test]
103    fn test_statement_with_execution_schema_preserves_context() {
104        let stmt = DdlStatement::with_execution_schema(
105            r#"CREATE TABLE IF NOT EXISTS "my""schema"."metrics" (ts TIMESTAMP TIME INDEX)"#
106                .to_string(),
107            r#"my"schema"#.to_string(),
108        );
109        assert_eq!(stmt.execution_schema.as_deref(), Some(r#"my"schema"#));
110    }
111
112    #[test]
113    fn test_preview_sql_truncates_at_char_boundary() {
114        let sql = format!(
115            "CREATE TABLE {} (ts TIMESTAMP TIME INDEX)",
116            "测".repeat(100)
117        );
118        let preview = preview_sql(&sql);
119        assert!(preview.ends_with("..."));
120        assert!(preview.is_char_boundary(preview.len()));
121    }
122}