sqlness_runner/env/
kube.rs1use std::fmt::Display;
16use std::path::Path;
17use std::sync::Arc;
18
19use async_trait::async_trait;
20use sqlness::{Database, EnvController, QueryContext};
21use tokio::process::Command;
22use tokio::sync::Mutex;
23
24use crate::client::MultiProtocolClient;
25use crate::formatter::{ErrorFormatter, MysqlFormatter, OutputFormatter, PostgresqlFormatter};
26use crate::protocol_interceptor::{MYSQL, PROTOCOL_KEY};
27
28#[async_trait]
29pub trait DatabaseManager: Send + Sync {
30 async fn restart(&self, database: &GreptimeDB);
32}
33
34#[async_trait]
35impl DatabaseManager for () {
36 async fn restart(&self, _: &GreptimeDB) {
37 }
39}
40
41#[async_trait]
42pub trait ResourcesManager: Send + Sync {
43 async fn delete_namespace(&self);
45
46 fn namespace(&self) -> &str;
48}
49
50#[derive(Clone)]
51pub struct Env {
52 pub delete_namespace_on_stop: bool,
54 pub server_addr: String,
56 pub pg_server_addr: String,
58 pub mysql_server_addr: String,
60 pub database_manager: Arc<dyn DatabaseManager>,
62 pub resources_manager: Arc<dyn ResourcesManager>,
64}
65
66#[async_trait]
67impl EnvController for Env {
68 type DB = GreptimeDB;
69
70 async fn start(&self, mode: &str, id: usize, _config: Option<&Path>) -> Self::DB {
71 if id > 0 {
72 panic!("Parallel test mode is not supported in kube env");
73 }
74
75 match mode {
76 "standalone" | "distributed" => GreptimeDB {
77 client: Mutex::new(
78 MultiProtocolClient::connect(
79 &self.server_addr,
80 &self.pg_server_addr,
81 &self.mysql_server_addr,
82 )
83 .await,
84 ),
85 database_manager: self.database_manager.clone(),
86 resources_manager: self.resources_manager.clone(),
87 delete_namespace_on_stop: self.delete_namespace_on_stop,
88 },
89
90 _ => panic!("Unexpected mode: {mode}"),
91 }
92 }
93
94 async fn stop(&self, _mode: &str, database: Self::DB) {
95 database.stop().await;
96 }
97}
98
99pub struct GreptimeDB {
100 pub client: Mutex<MultiProtocolClient>,
101 pub delete_namespace_on_stop: bool,
102 pub database_manager: Arc<dyn DatabaseManager>,
103 pub resources_manager: Arc<dyn ResourcesManager>,
104}
105
106impl GreptimeDB {
107 async fn postgres_query(&self, _ctx: QueryContext, query: String) -> Box<dyn Display> {
108 let mut client = self.client.lock().await;
109
110 match client.postgres_query(&query).await {
111 Ok(rows) => Box::new(PostgresqlFormatter::from(rows)),
112 Err(e) => Box::new(e),
113 }
114 }
115
116 async fn mysql_query(&self, _ctx: QueryContext, query: String) -> Box<dyn Display> {
117 let mut client = self.client.lock().await;
118
119 match client.mysql_query(&query).await {
120 Ok(res) => Box::new(MysqlFormatter::from(res)),
121 Err(e) => Box::new(e),
122 }
123 }
124
125 async fn grpc_query(&self, _ctx: QueryContext, query: String) -> Box<dyn Display> {
126 let mut client = self.client.lock().await;
127
128 match client.grpc_query(&query).await {
129 Ok(rows) => Box::new(OutputFormatter::from(rows)),
130 Err(e) => Box::new(ErrorFormatter::from(e)),
131 }
132 }
133}
134
135#[async_trait]
136impl Database for GreptimeDB {
137 async fn query(&self, ctx: QueryContext, query: String) -> Box<dyn Display> {
138 if ctx.context.contains_key("restart") {
139 self.database_manager.restart(self).await
140 }
141
142 if let Some(protocol) = ctx.context.get(PROTOCOL_KEY) {
143 if protocol == MYSQL {
145 self.mysql_query(ctx, query).await
146 } else {
147 self.postgres_query(ctx, query).await
148 }
149 } else {
150 self.grpc_query(ctx, query).await
151 }
152 }
153}
154
155impl GreptimeDB {
156 async fn stop(&self) {
157 if self.delete_namespace_on_stop {
158 self.resources_manager.delete_namespace().await;
159 println!("Deleted namespace({})", self.resources_manager.namespace());
160 } else {
161 println!(
162 "Namespace({}) is not deleted",
163 self.resources_manager.namespace()
164 );
165 }
166 }
167}
168
169pub struct NaiveResourcesManager {
170 namespace: String,
171}
172
173impl NaiveResourcesManager {
174 pub fn new(namespace: String) -> Self {
175 Self { namespace }
176 }
177}
178
179#[async_trait]
180impl ResourcesManager for NaiveResourcesManager {
181 async fn delete_namespace(&self) {
182 let output = Command::new("kubectl")
183 .arg("delete")
184 .arg("namespace")
185 .arg(&self.namespace)
186 .output()
187 .await
188 .unwrap_or_else(|e| {
189 panic!(
190 "Failed to execute kubectl delete namespace({}): {}",
191 self.namespace, e
192 )
193 });
194
195 if !output.status.success() {
196 let stderr = String::from_utf8_lossy(&output.stderr);
197 panic!("Failed to delete namespace({}): {}", self.namespace, stderr);
198 }
199 }
200
201 fn namespace(&self) -> &str {
202 &self.namespace
203 }
204}