1use std::collections::HashSet;
16use std::path::Path;
17use std::sync::{Mutex, OnceLock};
18
19use serde::Serialize;
20use tinytemplate::TinyTemplate;
21
22use crate::cmd::bare::ServerAddr;
23use crate::env::bare::{Env, GreptimeDBContext, ServiceProvider};
24use crate::util;
25
26const DEFAULT_LOG_LEVEL: &str = "--log-level=debug,hyper=warn,tower=warn,datafusion=warn,reqwest=warn,sqlparser=warn,h2=info,opendal=info";
27
28static USED_PORTS: OnceLock<Mutex<HashSet<u16>>> = OnceLock::new();
29
30fn get_used_ports() -> &'static Mutex<HashSet<u16>> {
31 USED_PORTS.get_or_init(|| Mutex::new(HashSet::new()))
32}
33
34fn get_unique_random_port() -> u16 {
35 const MAX_ATTEMPTS: usize = 100;
37
38 for _ in 0..MAX_ATTEMPTS {
39 let p = util::get_random_port();
40 let mut used = get_used_ports().lock().unwrap();
41 if !used.contains(&p) {
42 used.insert(p);
43 return p;
44 }
45 }
46
47 panic!(
48 "Failed to find an unused port after {} attempts",
49 MAX_ATTEMPTS
50 );
51}
52
53#[derive(Clone)]
54pub enum ServerMode {
55 Standalone {
56 http_addr: String,
57 rpc_bind_addr: String,
58 mysql_addr: String,
59 postgres_addr: String,
60 },
61 Frontend {
62 http_addr: String,
63 rpc_bind_addr: String,
64 mysql_addr: String,
65 postgres_addr: String,
66 metasrv_addr: String,
67 },
68 Metasrv {
69 rpc_bind_addr: String,
70 rpc_server_addr: String,
71 http_addr: String,
72 },
73 Datanode {
74 rpc_bind_addr: String,
75 rpc_server_addr: String,
76 http_addr: String,
77 metasrv_addr: String,
78 node_id: u32,
79 },
80 Flownode {
81 rpc_bind_addr: String,
82 rpc_server_addr: String,
83 http_addr: String,
84 metasrv_addr: String,
85 node_id: u32,
86 },
87}
88
89#[derive(Serialize)]
90struct ConfigContext {
91 wal_dir: String,
92 data_home: String,
93 procedure_dir: String,
94 is_raft_engine: bool,
95 kafka_wal_broker_endpoints: String,
96 use_etcd: bool,
97 store_addrs: String,
98 instance_id: usize,
99 metasrv_addr: String,
102 grpc_addr: String,
104 mysql_addr: String,
106 postgres_addr: String,
108}
109
110impl ServerMode {
111 pub fn random_standalone() -> Self {
112 let http_port = get_unique_random_port();
113 let rpc_port = get_unique_random_port();
114 let mysql_port = get_unique_random_port();
115 let postgres_port = get_unique_random_port();
116
117 ServerMode::Standalone {
118 http_addr: format!("127.0.0.1:{http_port}"),
119 rpc_bind_addr: format!("127.0.0.1:{rpc_port}"),
120 mysql_addr: format!("127.0.0.1:{mysql_port}"),
121 postgres_addr: format!("127.0.0.1:{postgres_port}"),
122 }
123 }
124
125 pub fn random_frontend(metasrv_port: u16) -> Self {
126 let http_port = get_unique_random_port();
127 let rpc_port = get_unique_random_port();
128 let mysql_port = get_unique_random_port();
129 let postgres_port = get_unique_random_port();
130
131 ServerMode::Frontend {
132 http_addr: format!("127.0.0.1:{http_port}"),
133 rpc_bind_addr: format!("127.0.0.1:{rpc_port}"),
134 mysql_addr: format!("127.0.0.1:{mysql_port}"),
135 postgres_addr: format!("127.0.0.1:{postgres_port}"),
136 metasrv_addr: format!("127.0.0.1:{metasrv_port}"),
137 }
138 }
139
140 pub fn random_metasrv() -> Self {
141 let bind_port = get_unique_random_port();
142 let http_port = get_unique_random_port();
143
144 ServerMode::Metasrv {
145 rpc_bind_addr: format!("127.0.0.1:{bind_port}"),
146 rpc_server_addr: format!("127.0.0.1:{bind_port}"),
147 http_addr: format!("127.0.0.1:{http_port}"),
148 }
149 }
150
151 pub fn random_datanode(metasrv_port: u16, node_id: u32) -> Self {
152 let rpc_port = get_unique_random_port();
153 let http_port = get_unique_random_port();
154
155 ServerMode::Datanode {
156 rpc_bind_addr: format!("127.0.0.1:{rpc_port}"),
157 rpc_server_addr: format!("127.0.0.1:{rpc_port}"),
158 http_addr: format!("127.0.0.1:{http_port}"),
159 metasrv_addr: format!("127.0.0.1:{metasrv_port}"),
160 node_id,
161 }
162 }
163
164 pub fn random_flownode(metasrv_port: u16, node_id: u32) -> Self {
165 let rpc_port = get_unique_random_port();
166 let http_port = get_unique_random_port();
167
168 ServerMode::Flownode {
169 rpc_bind_addr: format!("127.0.0.1:{rpc_port}"),
170 rpc_server_addr: format!("127.0.0.1:{rpc_port}"),
171 http_addr: format!("127.0.0.1:{http_port}"),
172 metasrv_addr: format!("127.0.0.1:{metasrv_port}"),
173 node_id,
174 }
175 }
176
177 pub fn name(&self) -> &'static str {
178 match self {
179 ServerMode::Standalone { .. } => "standalone",
180 ServerMode::Frontend { .. } => "frontend",
181 ServerMode::Metasrv { .. } => "metasrv",
182 ServerMode::Datanode { .. } => "datanode",
183 ServerMode::Flownode { .. } => "flownode",
184 }
185 }
186
187 pub fn check_addrs(&self) -> Vec<String> {
189 match self {
190 ServerMode::Standalone {
191 rpc_bind_addr,
192 mysql_addr,
193 postgres_addr,
194 http_addr,
195 ..
196 } => {
197 vec![
198 rpc_bind_addr.clone(),
199 mysql_addr.clone(),
200 postgres_addr.clone(),
201 http_addr.clone(),
202 ]
203 }
204 ServerMode::Frontend {
205 rpc_bind_addr,
206 mysql_addr,
207 postgres_addr,
208 ..
209 } => {
210 vec![
211 rpc_bind_addr.clone(),
212 mysql_addr.clone(),
213 postgres_addr.clone(),
214 ]
215 }
216 ServerMode::Metasrv { rpc_bind_addr, .. } => {
217 vec![rpc_bind_addr.clone()]
218 }
219 ServerMode::Datanode { rpc_bind_addr, .. } => {
220 vec![rpc_bind_addr.clone()]
221 }
222 ServerMode::Flownode { rpc_bind_addr, .. } => {
223 vec![rpc_bind_addr.clone()]
224 }
225 }
226 }
227
228 pub fn server_addr(&self) -> Option<ServerAddr> {
230 match self {
231 ServerMode::Standalone {
232 rpc_bind_addr,
233 mysql_addr,
234 postgres_addr,
235 ..
236 } => Some(ServerAddr {
237 server_addr: Some(rpc_bind_addr.clone()),
238 pg_server_addr: Some(postgres_addr.clone()),
239 mysql_server_addr: Some(mysql_addr.clone()),
240 }),
241 ServerMode::Frontend {
242 rpc_bind_addr,
243 mysql_addr,
244 postgres_addr,
245 ..
246 } => Some(ServerAddr {
247 server_addr: Some(rpc_bind_addr.clone()),
248 pg_server_addr: Some(postgres_addr.clone()),
249 mysql_server_addr: Some(mysql_addr.clone()),
250 }),
251 _ => None,
252 }
253 }
254
255 pub fn generate_config_file(
256 &self,
257 sqlness_home: &Path,
258 db_ctx: &GreptimeDBContext,
259 id: usize,
260 ) -> String {
261 let mut tt = TinyTemplate::new();
262
263 let mut path = util::sqlness_conf_path();
264 path.push(format!("{}-test.toml.template", self.name()));
265 let template = std::fs::read_to_string(&path)
266 .unwrap_or_else(|e| panic!("read file '{}' error: {e}", path.display()));
267 tt.add_template(self.name(), &template).unwrap();
268
269 let data_home = sqlness_home.join(format!("greptimedb-{}-{}", id, self.name()));
270 std::fs::create_dir_all(data_home.as_path()).unwrap();
271
272 let wal_dir = data_home.join("wal").display().to_string();
273 let procedure_dir = data_home.join("procedure").display().to_string();
274
275 let (metasrv_addr, grpc_addr, mysql_addr, postgres_addr) = match self {
277 ServerMode::Standalone {
278 rpc_bind_addr,
279 mysql_addr,
280 postgres_addr,
281 ..
282 } => (
283 String::new(),
284 rpc_bind_addr.clone(),
285 mysql_addr.clone(),
286 postgres_addr.clone(),
287 ),
288 ServerMode::Frontend {
289 rpc_bind_addr,
290 mysql_addr,
291 postgres_addr,
292 ..
293 } => (
294 String::new(),
295 rpc_bind_addr.clone(),
296 mysql_addr.clone(),
297 postgres_addr.clone(),
298 ),
299 ServerMode::Datanode {
300 rpc_bind_addr,
301 metasrv_addr,
302 ..
303 } => (
304 metasrv_addr.clone(),
305 rpc_bind_addr.clone(),
306 String::new(),
307 String::new(),
308 ),
309 _ => (String::new(), String::new(), String::new(), String::new()),
310 };
311
312 let ctx = ConfigContext {
313 wal_dir,
314 data_home: data_home.display().to_string(),
315 procedure_dir,
316 is_raft_engine: db_ctx.is_raft_engine(),
317 kafka_wal_broker_endpoints: db_ctx.kafka_wal_broker_endpoints(),
318 use_etcd: !db_ctx.store_config().store_addrs.is_empty(),
319 store_addrs: db_ctx
320 .store_config()
321 .store_addrs
322 .iter()
323 .map(|p| format!("\"{p}\""))
324 .collect::<Vec<_>>()
325 .join(","),
326 instance_id: id,
327 metasrv_addr,
328 grpc_addr,
329 mysql_addr,
330 postgres_addr,
331 };
332
333 let rendered = tt.render(self.name(), &ctx).unwrap();
334
335 let conf_file = data_home
336 .join(format!("{}-{}-{}.toml", self.name(), id, db_ctx.time()))
337 .display()
338 .to_string();
339 println!(
340 "Generating id {}, {} config file in {conf_file}",
341 id,
342 self.name()
343 );
344 std::fs::write(&conf_file, rendered).unwrap();
345
346 conf_file
347 }
348
349 pub fn get_args(
350 &self,
351 sqlness_home: &Path,
352 env: &Env,
353 db_ctx: &GreptimeDBContext,
354 id: usize,
355 ) -> Vec<String> {
356 let mut args = env
357 .extra_args()
358 .iter()
359 .map(String::as_str)
360 .chain([DEFAULT_LOG_LEVEL, self.name(), "start"])
361 .map(ToString::to_string)
362 .collect::<Vec<String>>();
363
364 match self {
365 ServerMode::Standalone {
366 http_addr,
367 rpc_bind_addr,
368 mysql_addr,
369 postgres_addr,
370 } => {
371 args.extend([
372 format!(
373 "--log-dir={}/greptimedb-{}-standalone/logs",
374 sqlness_home.display(),
375 id
376 ),
377 "-c".to_string(),
378 self.generate_config_file(sqlness_home, db_ctx, id),
379 format!("--http-addr={http_addr}"),
380 format!("--rpc-addr={rpc_bind_addr}"),
381 format!("--mysql-addr={mysql_addr}"),
382 format!("--postgres-addr={postgres_addr}"),
383 ]);
384 }
385 ServerMode::Frontend {
386 http_addr,
387 rpc_bind_addr,
388 mysql_addr,
389 postgres_addr,
390 metasrv_addr,
391 } => {
392 args.extend([
393 format!("--metasrv-addrs={metasrv_addr}"),
394 format!("--http-addr={http_addr}"),
395 format!("--rpc-addr={rpc_bind_addr}"),
396 format!("--rpc-server-addr={rpc_bind_addr}"),
399 format!("--mysql-addr={mysql_addr}"),
400 format!("--postgres-addr={postgres_addr}"),
401 format!(
402 "--log-dir={}/greptimedb-{}-frontend/logs",
403 sqlness_home.display(),
404 id
405 ),
406 "-c".to_string(),
407 self.generate_config_file(sqlness_home, db_ctx, id),
408 ]);
409 }
410 ServerMode::Metasrv {
411 rpc_bind_addr,
412 rpc_server_addr,
413 http_addr,
414 } => {
415 args.extend([
416 "--bind-addr".to_string(),
417 rpc_bind_addr.clone(),
418 "--server-addr".to_string(),
419 rpc_server_addr.clone(),
420 "--enable-region-failover".to_string(),
421 "false".to_string(),
422 format!("--http-addr={http_addr}"),
423 format!(
424 "--log-dir={}/greptimedb-{}-metasrv/logs",
425 sqlness_home.display(),
426 id
427 ),
428 "-c".to_string(),
429 self.generate_config_file(sqlness_home, db_ctx, id),
430 ]);
431
432 if matches!(
433 db_ctx.store_config().setup_pg,
434 Some(ServiceProvider::Create)
435 ) {
436 let client_ports = db_ctx
437 .store_config()
438 .store_addrs
439 .iter()
440 .map(|s| s.split(':').nth(1).unwrap().parse::<u16>().unwrap())
441 .collect::<Vec<_>>();
442 let client_port = client_ports.first().unwrap_or(&5432);
443 let pg_server_addr = format!(
444 "postgresql://greptimedb:admin@127.0.0.1:{}/postgres",
445 client_port
446 );
447 args.extend(vec!["--backend".to_string(), "postgres-store".to_string()]);
448 args.extend(vec!["--store-addrs".to_string(), pg_server_addr]);
449 } else if let Some(ServiceProvider::External(connection_string)) =
450 db_ctx.store_config().setup_pg
451 {
452 println!("Using external PostgreSQL '{connection_string}' as Kvbackend");
453 args.extend([
454 "--backend".to_string(),
455 "postgres-store".to_string(),
456 "--store-addrs".to_string(),
457 connection_string,
458 ]);
459 } else if matches!(
460 db_ctx.store_config().setup_mysql,
461 Some(ServiceProvider::Create)
462 ) {
463 let client_ports = db_ctx
464 .store_config()
465 .store_addrs
466 .iter()
467 .map(|s| s.split(':').nth(1).unwrap().parse::<u16>().unwrap())
468 .collect::<Vec<_>>();
469 let client_port = client_ports.first().unwrap_or(&3306);
470 let mysql_server_addr =
471 format!("mysql://greptimedb:admin@127.0.0.1:{}/mysql", client_port);
472 args.extend(vec!["--backend".to_string(), "mysql-store".to_string()]);
473 args.extend(vec!["--store-addrs".to_string(), mysql_server_addr]);
474 } else if let Some(ServiceProvider::External(connection_string)) =
475 db_ctx.store_config().setup_mysql
476 {
477 println!("Using external MySQL '{connection_string}' as Kvbackend");
478 args.extend([
479 "--backend".to_string(),
480 "mysql-store".to_string(),
481 "--store-addrs".to_string(),
482 connection_string,
483 ]);
484 } else if db_ctx.store_config().store_addrs.is_empty() {
485 args.extend(vec!["--backend".to_string(), "memory-store".to_string()])
486 }
487 }
488 ServerMode::Datanode {
489 rpc_bind_addr,
490 rpc_server_addr,
491 http_addr,
492 metasrv_addr,
493 node_id,
494 } => {
495 let data_home = sqlness_home.join(format!(
496 "greptimedb_{}_datanode_{}_{node_id}",
497 id,
498 db_ctx.time()
499 ));
500 args.extend([
501 format!("--rpc-addr={rpc_bind_addr}"),
502 format!("--rpc-server-addr={rpc_server_addr}"),
503 format!("--http-addr={http_addr}"),
504 format!("--data-home={}", data_home.display()),
505 format!("--log-dir={}/logs", data_home.display()),
506 format!("--node-id={node_id}"),
507 "-c".to_string(),
508 self.generate_config_file(sqlness_home, db_ctx, id),
509 format!("--metasrv-addrs={metasrv_addr}"),
510 ]);
511 }
512 ServerMode::Flownode {
513 rpc_bind_addr,
514 rpc_server_addr,
515 http_addr,
516 metasrv_addr,
517 node_id,
518 } => {
519 args.extend([
520 format!("--rpc-addr={rpc_bind_addr}"),
521 format!("--rpc-server-addr={rpc_server_addr}"),
522 format!("--node-id={node_id}"),
523 format!(
524 "--log-dir={}/greptimedb-{}-flownode/logs",
525 sqlness_home.display(),
526 id
527 ),
528 format!("--metasrv-addrs={metasrv_addr}"),
529 format!("--http-addr={http_addr}"),
530 ]);
531 }
532 }
533
534 args
535 }
536}