1use std::collections::{HashMap, HashSet};
16use std::path::Path;
17use std::sync::{Mutex, OnceLock};
18
19use serde::Serialize;
20use tinytemplate::TinyTemplate;
21
22use crate::cmd::bare::ServerAddr;
23use crate::env::bare::{Env, GreptimeDBContext, ServiceProvider};
24use crate::util;
25
26const DEFAULT_LOG_LEVEL: &str = "--log-level=debug,hyper=warn,tower=warn,datafusion=warn,reqwest=warn,sqlparser=warn,h2=info,opendal=info";
27
28static USED_PORTS: OnceLock<Mutex<HashSet<u16>>> = OnceLock::new();
29
30fn get_used_ports() -> &'static Mutex<HashSet<u16>> {
31 USED_PORTS.get_or_init(|| Mutex::new(HashSet::new()))
32}
33
34fn get_unique_random_port() -> u16 {
35 const MAX_ATTEMPTS: usize = 100;
37
38 for _ in 0..MAX_ATTEMPTS {
39 let p = util::get_random_port();
40 let mut used = get_used_ports().lock().unwrap();
41 if !used.contains(&p) {
42 used.insert(p);
43 return p;
44 }
45 }
46
47 panic!(
48 "Failed to find an unused port after {} attempts",
49 MAX_ATTEMPTS
50 );
51}
52
53#[derive(Clone)]
54pub enum ServerMode {
55 Standalone {
56 http_addr: String,
57 rpc_bind_addr: String,
58 mysql_addr: String,
59 postgres_addr: String,
60 },
61 Frontend {
62 http_addr: String,
63 rpc_bind_addr: String,
64 mysql_addr: String,
65 postgres_addr: String,
66 metasrv_addr: String,
67 },
68 Metasrv {
69 rpc_bind_addr: String,
70 rpc_server_addr: String,
71 http_addr: String,
72 },
73 Datanode {
74 rpc_bind_addr: String,
75 rpc_server_addr: String,
76 http_addr: String,
77 metasrv_addr: String,
78 node_id: u32,
79 },
80 Flownode {
81 rpc_bind_addr: String,
82 rpc_server_addr: String,
83 http_addr: String,
84 metasrv_addr: String,
85 node_id: u32,
86 },
87}
88
89#[derive(Serialize)]
90struct ConfigContext {
91 wal_dir: String,
92 data_home: String,
93 procedure_dir: String,
94 is_raft_engine: bool,
95 kafka_wal_broker_endpoints: String,
96 use_etcd: bool,
97 store_addrs: String,
98 instance_id: usize,
99 addrs: HashMap<String, String>,
100 enable_flat_format: bool,
102}
103
104impl ServerMode {
105 pub fn random_standalone() -> Self {
106 let http_port = get_unique_random_port();
107 let rpc_port = get_unique_random_port();
108 let mysql_port = get_unique_random_port();
109 let postgres_port = get_unique_random_port();
110
111 ServerMode::Standalone {
112 http_addr: format!("127.0.0.1:{http_port}"),
113 rpc_bind_addr: format!("127.0.0.1:{rpc_port}"),
114 mysql_addr: format!("127.0.0.1:{mysql_port}"),
115 postgres_addr: format!("127.0.0.1:{postgres_port}"),
116 }
117 }
118
119 pub fn random_frontend(metasrv_port: u16) -> Self {
120 let http_port = get_unique_random_port();
121 let rpc_port = get_unique_random_port();
122 let mysql_port = get_unique_random_port();
123 let postgres_port = get_unique_random_port();
124
125 ServerMode::Frontend {
126 http_addr: format!("127.0.0.1:{http_port}"),
127 rpc_bind_addr: format!("127.0.0.1:{rpc_port}"),
128 mysql_addr: format!("127.0.0.1:{mysql_port}"),
129 postgres_addr: format!("127.0.0.1:{postgres_port}"),
130 metasrv_addr: format!("127.0.0.1:{metasrv_port}"),
131 }
132 }
133
134 pub fn random_metasrv() -> Self {
135 let bind_port = get_unique_random_port();
136 let http_port = get_unique_random_port();
137
138 ServerMode::Metasrv {
139 rpc_bind_addr: format!("127.0.0.1:{bind_port}"),
140 rpc_server_addr: format!("127.0.0.1:{bind_port}"),
141 http_addr: format!("127.0.0.1:{http_port}"),
142 }
143 }
144
145 pub fn random_datanode(metasrv_port: u16, node_id: u32) -> Self {
146 let rpc_port = get_unique_random_port();
147 let http_port = get_unique_random_port();
148
149 ServerMode::Datanode {
150 rpc_bind_addr: format!("127.0.0.1:{rpc_port}"),
151 rpc_server_addr: format!("127.0.0.1:{rpc_port}"),
152 http_addr: format!("127.0.0.1:{http_port}"),
153 metasrv_addr: format!("127.0.0.1:{metasrv_port}"),
154 node_id,
155 }
156 }
157
158 pub fn random_flownode(metasrv_port: u16, node_id: u32) -> Self {
159 let rpc_port = get_unique_random_port();
160 let http_port = get_unique_random_port();
161
162 ServerMode::Flownode {
163 rpc_bind_addr: format!("127.0.0.1:{rpc_port}"),
164 rpc_server_addr: format!("127.0.0.1:{rpc_port}"),
165 http_addr: format!("127.0.0.1:{http_port}"),
166 metasrv_addr: format!("127.0.0.1:{metasrv_port}"),
167 node_id,
168 }
169 }
170
171 pub fn name(&self) -> &'static str {
172 match self {
173 ServerMode::Standalone { .. } => "standalone",
174 ServerMode::Frontend { .. } => "frontend",
175 ServerMode::Metasrv { .. } => "metasrv",
176 ServerMode::Datanode { .. } => "datanode",
177 ServerMode::Flownode { .. } => "flownode",
178 }
179 }
180
181 pub fn check_addrs(&self) -> Vec<String> {
183 match self {
184 ServerMode::Standalone {
185 rpc_bind_addr,
186 mysql_addr,
187 postgres_addr,
188 http_addr,
189 ..
190 } => {
191 vec![
192 rpc_bind_addr.clone(),
193 mysql_addr.clone(),
194 postgres_addr.clone(),
195 http_addr.clone(),
196 ]
197 }
198 ServerMode::Frontend {
199 rpc_bind_addr,
200 mysql_addr,
201 postgres_addr,
202 ..
203 } => {
204 vec![
205 rpc_bind_addr.clone(),
206 mysql_addr.clone(),
207 postgres_addr.clone(),
208 ]
209 }
210 ServerMode::Metasrv { rpc_bind_addr, .. } => {
211 vec![rpc_bind_addr.clone()]
212 }
213 ServerMode::Datanode { rpc_bind_addr, .. } => {
214 vec![rpc_bind_addr.clone()]
215 }
216 ServerMode::Flownode { rpc_bind_addr, .. } => {
217 vec![rpc_bind_addr.clone()]
218 }
219 }
220 }
221
222 pub fn server_addr(&self) -> Option<ServerAddr> {
224 match self {
225 ServerMode::Standalone {
226 rpc_bind_addr,
227 mysql_addr,
228 postgres_addr,
229 ..
230 } => Some(ServerAddr {
231 server_addr: Some(rpc_bind_addr.clone()),
232 pg_server_addr: Some(postgres_addr.clone()),
233 mysql_server_addr: Some(mysql_addr.clone()),
234 }),
235 ServerMode::Frontend {
236 rpc_bind_addr,
237 mysql_addr,
238 postgres_addr,
239 ..
240 } => Some(ServerAddr {
241 server_addr: Some(rpc_bind_addr.clone()),
242 pg_server_addr: Some(postgres_addr.clone()),
243 mysql_server_addr: Some(mysql_addr.clone()),
244 }),
245 _ => None,
246 }
247 }
248
249 pub fn generate_config_file(
250 &self,
251 sqlness_home: &Path,
252 db_ctx: &GreptimeDBContext,
253 id: usize,
254 ) -> String {
255 let mut tt = TinyTemplate::new();
256
257 let mut path = util::sqlness_conf_path();
258 path.push(format!("{}-test.toml.template", self.name()));
259 let template = std::fs::read_to_string(&path)
260 .unwrap_or_else(|e| panic!("read file '{}' error: {e}", path.display()));
261 tt.add_template(self.name(), &template).unwrap();
262
263 let data_home = sqlness_home.join(format!("greptimedb-{}-{}", id, self.name()));
264 std::fs::create_dir_all(data_home.as_path()).unwrap();
265
266 let wal_dir = data_home.join("wal").display().to_string();
267 let procedure_dir = data_home.join("procedure").display().to_string();
268
269 let addrs: HashMap<String, String> = match self {
271 ServerMode::Standalone {
272 rpc_bind_addr,
273 mysql_addr,
274 postgres_addr,
275 http_addr,
276 } => [
277 ("http_addr".to_string(), http_addr.clone()),
278 ("grpc_addr".to_string(), rpc_bind_addr.clone()),
279 ("mysql_addr".to_string(), mysql_addr.clone()),
280 ("postgres_addr".to_string(), postgres_addr.clone()),
281 ]
282 .into(),
283 ServerMode::Frontend { rpc_bind_addr, .. } => {
284 [("grpc_addr".to_string(), rpc_bind_addr.clone())].into()
285 }
286 ServerMode::Datanode { metasrv_addr, .. } => {
287 [("metasrv_addr".to_string(), metasrv_addr.clone())].into()
288 }
289 _ => HashMap::new(),
290 };
291
292 let ctx = ConfigContext {
293 wal_dir,
294 data_home: data_home.display().to_string(),
295 procedure_dir,
296 is_raft_engine: db_ctx.is_raft_engine(),
297 kafka_wal_broker_endpoints: db_ctx.kafka_wal_broker_endpoints(),
298 use_etcd: !db_ctx.store_config().store_addrs.is_empty(),
299 store_addrs: db_ctx
300 .store_config()
301 .store_addrs
302 .iter()
303 .map(|p| format!("\"{p}\""))
304 .collect::<Vec<_>>()
305 .join(","),
306 instance_id: id,
307 addrs,
308 enable_flat_format: db_ctx.store_config().enable_flat_format,
309 };
310
311 let rendered = tt.render(self.name(), &ctx).unwrap();
312
313 let conf_file = data_home
314 .join(format!("{}-{}-{}.toml", self.name(), id, db_ctx.time()))
315 .display()
316 .to_string();
317 println!(
318 "Generating id {}, {} config file in {conf_file}",
319 id,
320 self.name()
321 );
322 std::fs::write(&conf_file, rendered).unwrap();
323
324 conf_file
325 }
326
327 pub fn get_args(
328 &self,
329 sqlness_home: &Path,
330 env: &Env,
331 db_ctx: &GreptimeDBContext,
332 id: usize,
333 ) -> Vec<String> {
334 let mut args = env
335 .extra_args()
336 .iter()
337 .map(String::as_str)
338 .chain([DEFAULT_LOG_LEVEL, self.name(), "start"])
339 .map(ToString::to_string)
340 .collect::<Vec<String>>();
341
342 match self {
343 ServerMode::Standalone {
344 http_addr,
345 rpc_bind_addr,
346 mysql_addr,
347 postgres_addr,
348 } => {
349 args.extend([
350 format!(
351 "--log-dir={}/greptimedb-{}-standalone/logs",
352 sqlness_home.display(),
353 id
354 ),
355 "-c".to_string(),
356 self.generate_config_file(sqlness_home, db_ctx, id),
357 format!("--http-addr={http_addr}"),
358 format!("--rpc-addr={rpc_bind_addr}"),
359 format!("--mysql-addr={mysql_addr}"),
360 format!("--postgres-addr={postgres_addr}"),
361 ]);
362 }
363 ServerMode::Frontend {
364 http_addr,
365 rpc_bind_addr,
366 mysql_addr,
367 postgres_addr,
368 metasrv_addr,
369 } => {
370 args.extend([
371 format!("--metasrv-addrs={metasrv_addr}"),
372 format!("--http-addr={http_addr}"),
373 format!("--rpc-addr={rpc_bind_addr}"),
374 format!("--rpc-server-addr={rpc_bind_addr}"),
377 format!("--mysql-addr={mysql_addr}"),
378 format!("--postgres-addr={postgres_addr}"),
379 format!(
380 "--log-dir={}/greptimedb-{}-frontend/logs",
381 sqlness_home.display(),
382 id
383 ),
384 "-c".to_string(),
385 self.generate_config_file(sqlness_home, db_ctx, id),
386 ]);
387 }
388 ServerMode::Metasrv {
389 rpc_bind_addr,
390 rpc_server_addr,
391 http_addr,
392 } => {
393 args.extend([
394 "--bind-addr".to_string(),
395 rpc_bind_addr.clone(),
396 "--server-addr".to_string(),
397 rpc_server_addr.clone(),
398 "--enable-region-failover".to_string(),
399 "false".to_string(),
400 format!("--http-addr={http_addr}"),
401 format!(
402 "--log-dir={}/greptimedb-{}-metasrv/logs",
403 sqlness_home.display(),
404 id
405 ),
406 "-c".to_string(),
407 self.generate_config_file(sqlness_home, db_ctx, id),
408 ]);
409
410 if matches!(
411 db_ctx.store_config().setup_pg,
412 Some(ServiceProvider::Create)
413 ) {
414 let client_ports = db_ctx
415 .store_config()
416 .store_addrs
417 .iter()
418 .map(|s| s.split(':').nth(1).unwrap().parse::<u16>().unwrap())
419 .collect::<Vec<_>>();
420 let client_port = client_ports.first().unwrap_or(&5432);
421 let pg_server_addr = format!(
422 "postgresql://greptimedb:admin@127.0.0.1:{}/postgres",
423 client_port
424 );
425 args.extend(vec!["--backend".to_string(), "postgres-store".to_string()]);
426 args.extend(vec!["--store-addrs".to_string(), pg_server_addr]);
427 } else if let Some(ServiceProvider::External(connection_string)) =
428 db_ctx.store_config().setup_pg
429 {
430 println!("Using external PostgreSQL '{connection_string}' as Kvbackend");
431 args.extend([
432 "--backend".to_string(),
433 "postgres-store".to_string(),
434 "--store-addrs".to_string(),
435 connection_string,
436 ]);
437 } else if matches!(
438 db_ctx.store_config().setup_mysql,
439 Some(ServiceProvider::Create)
440 ) {
441 let client_ports = db_ctx
442 .store_config()
443 .store_addrs
444 .iter()
445 .map(|s| s.split(':').nth(1).unwrap().parse::<u16>().unwrap())
446 .collect::<Vec<_>>();
447 let client_port = client_ports.first().unwrap_or(&3306);
448 let mysql_server_addr =
449 format!("mysql://greptimedb:admin@127.0.0.1:{}/mysql", client_port);
450 args.extend(vec!["--backend".to_string(), "mysql-store".to_string()]);
451 args.extend(vec!["--store-addrs".to_string(), mysql_server_addr]);
452 } else if let Some(ServiceProvider::External(connection_string)) =
453 db_ctx.store_config().setup_mysql
454 {
455 println!("Using external MySQL '{connection_string}' as Kvbackend");
456 args.extend([
457 "--backend".to_string(),
458 "mysql-store".to_string(),
459 "--store-addrs".to_string(),
460 connection_string,
461 ]);
462 } else if db_ctx.store_config().store_addrs.is_empty() {
463 args.extend(vec!["--backend".to_string(), "memory-store".to_string()])
464 }
465 }
466 ServerMode::Datanode {
467 rpc_bind_addr,
468 rpc_server_addr,
469 http_addr,
470 metasrv_addr,
471 node_id,
472 } => {
473 let data_home = sqlness_home.join(format!(
474 "greptimedb_{}_datanode_{}_{node_id}",
475 id,
476 db_ctx.time()
477 ));
478 args.extend([
479 format!("--rpc-addr={rpc_bind_addr}"),
480 format!("--rpc-server-addr={rpc_server_addr}"),
481 format!("--http-addr={http_addr}"),
482 format!("--data-home={}", data_home.display()),
483 format!("--log-dir={}/logs", data_home.display()),
484 format!("--node-id={node_id}"),
485 "-c".to_string(),
486 self.generate_config_file(sqlness_home, db_ctx, id),
487 format!("--metasrv-addrs={metasrv_addr}"),
488 ]);
489 }
490 ServerMode::Flownode {
491 rpc_bind_addr,
492 rpc_server_addr,
493 http_addr,
494 metasrv_addr,
495 node_id,
496 } => {
497 args.extend([
498 format!("--rpc-addr={rpc_bind_addr}"),
499 format!("--rpc-server-addr={rpc_server_addr}"),
500 format!("--node-id={node_id}"),
501 format!(
502 "--log-dir={}/greptimedb-{}-flownode/logs",
503 sqlness_home.display(),
504 id
505 ),
506 format!("--metasrv-addrs={metasrv_addr}"),
507 format!("--http-addr={http_addr}"),
508 ]);
509 }
510 }
511
512 args
513 }
514}