sqlness_runner/env/
kube.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Display;
16use std::path::Path;
17use std::sync::Arc;
18
19use async_trait::async_trait;
20use sqlness::{Database, EnvController, QueryContext};
21use tokio::process::Command;
22use tokio::sync::Mutex;
23
24use crate::client::MultiProtocolClient;
25use crate::formatter::{ErrorFormatter, MysqlFormatter, OutputFormatter, PostgresqlFormatter};
26use crate::protocol_interceptor::{MYSQL, PROTOCOL_KEY};
27
28#[async_trait]
29pub trait DatabaseManager: Send + Sync {
30    // Restarts the database.
31    async fn restart(&self, database: &GreptimeDB);
32}
33
34#[async_trait]
35impl DatabaseManager for () {
36    async fn restart(&self, _: &GreptimeDB) {
37        // Do nothing
38    }
39}
40
41#[async_trait]
42pub trait ResourcesManager: Send + Sync {
43    // Delete namespace.
44    async fn delete_namespace(&self);
45
46    // Get namespace.
47    fn namespace(&self) -> &str;
48}
49
50#[derive(Clone)]
51pub struct Env {
52    /// Whether to delete the namespace on stop.
53    pub delete_namespace_on_stop: bool,
54    /// Address of the grpc server.
55    pub server_addr: String,
56    /// Address of the postgres server.
57    pub pg_server_addr: String,
58    /// Address of the mysql server.
59    pub mysql_server_addr: String,
60    /// The database manager.
61    pub database_manager: Arc<dyn DatabaseManager>,
62    /// The resources manager.
63    pub resources_manager: Arc<dyn ResourcesManager>,
64}
65
66#[async_trait]
67impl EnvController for Env {
68    type DB = GreptimeDB;
69
70    async fn start(&self, mode: &str, id: usize, _config: Option<&Path>) -> Self::DB {
71        if id > 0 {
72            panic!("Parallel test mode is not supported in kube env");
73        }
74
75        match mode {
76            "standalone" | "distributed" => GreptimeDB {
77                client: Mutex::new(
78                    MultiProtocolClient::connect(
79                        &self.server_addr,
80                        &self.pg_server_addr,
81                        &self.mysql_server_addr,
82                    )
83                    .await,
84                ),
85                database_manager: self.database_manager.clone(),
86                resources_manager: self.resources_manager.clone(),
87                delete_namespace_on_stop: self.delete_namespace_on_stop,
88            },
89
90            _ => panic!("Unexpected mode: {mode}"),
91        }
92    }
93
94    async fn stop(&self, _mode: &str, database: Self::DB) {
95        database.stop().await;
96    }
97}
98
99pub struct GreptimeDB {
100    pub client: Mutex<MultiProtocolClient>,
101    pub delete_namespace_on_stop: bool,
102    pub database_manager: Arc<dyn DatabaseManager>,
103    pub resources_manager: Arc<dyn ResourcesManager>,
104}
105
106impl GreptimeDB {
107    async fn postgres_query(&self, _ctx: QueryContext, query: String) -> Box<dyn Display> {
108        let mut client = self.client.lock().await;
109
110        match client.postgres_query(&query).await {
111            Ok(rows) => Box::new(PostgresqlFormatter::from(rows)),
112            Err(e) => Box::new(e),
113        }
114    }
115
116    async fn mysql_query(&self, _ctx: QueryContext, query: String) -> Box<dyn Display> {
117        let mut client = self.client.lock().await;
118
119        match client.mysql_query(&query).await {
120            Ok(res) => Box::new(MysqlFormatter::from(res)),
121            Err(e) => Box::new(e),
122        }
123    }
124
125    async fn grpc_query(&self, _ctx: QueryContext, query: String) -> Box<dyn Display> {
126        let mut client = self.client.lock().await;
127
128        match client.grpc_query(&query).await {
129            Ok(rows) => Box::new(OutputFormatter::from(rows)),
130            Err(e) => Box::new(ErrorFormatter::from(e)),
131        }
132    }
133}
134
135#[async_trait]
136impl Database for GreptimeDB {
137    async fn query(&self, ctx: QueryContext, query: String) -> Box<dyn Display> {
138        if ctx.context.contains_key("restart") {
139            self.database_manager.restart(self).await
140        }
141
142        if let Some(protocol) = ctx.context.get(PROTOCOL_KEY) {
143            // protocol is bound to be either "mysql" or "postgres"
144            if protocol == MYSQL {
145                self.mysql_query(ctx, query).await
146            } else {
147                self.postgres_query(ctx, query).await
148            }
149        } else {
150            self.grpc_query(ctx, query).await
151        }
152    }
153}
154
155impl GreptimeDB {
156    async fn stop(&self) {
157        if self.delete_namespace_on_stop {
158            self.resources_manager.delete_namespace().await;
159            println!("Deleted namespace({})", self.resources_manager.namespace());
160        } else {
161            println!(
162                "Namespace({}) is not deleted",
163                self.resources_manager.namespace()
164            );
165        }
166    }
167}
168
169pub struct NaiveResourcesManager {
170    namespace: String,
171}
172
173impl NaiveResourcesManager {
174    pub fn new(namespace: String) -> Self {
175        Self { namespace }
176    }
177}
178
179#[async_trait]
180impl ResourcesManager for NaiveResourcesManager {
181    async fn delete_namespace(&self) {
182        let output = Command::new("kubectl")
183            .arg("delete")
184            .arg("namespace")
185            .arg(&self.namespace)
186            .output()
187            .await
188            .unwrap_or_else(|e| {
189                panic!(
190                    "Failed to execute kubectl delete namespace({}): {}",
191                    self.namespace, e
192                )
193            });
194
195        if !output.status.success() {
196            let stderr = String::from_utf8_lossy(&output.stderr);
197            panic!("Failed to delete namespace({}): {}", self.namespace, stderr);
198        }
199    }
200
201    fn namespace(&self) -> &str {
202        &self.namespace
203    }
204}