1use std::collections::HashSet;
16use std::path::Path;
17use std::sync::{Mutex, OnceLock};
18
19use serde::Serialize;
20use tinytemplate::TinyTemplate;
21
22use crate::cmd::bare::ServerAddr;
23use crate::env::bare::{Env, GreptimeDBContext, ServiceProvider};
24use crate::util;
25
26const DEFAULT_LOG_LEVEL: &str = "--log-level=debug,hyper=warn,tower=warn,datafusion=warn,reqwest=warn,sqlparser=warn,h2=info,opendal=info";
27
28static USED_PORTS: OnceLock<Mutex<HashSet<u16>>> = OnceLock::new();
29
30fn get_used_ports() -> &'static Mutex<HashSet<u16>> {
31 USED_PORTS.get_or_init(|| Mutex::new(HashSet::new()))
32}
33
34fn get_unique_random_port() -> u16 {
35 const MAX_ATTEMPTS: usize = 100;
37
38 for _ in 0..MAX_ATTEMPTS {
39 let p = util::get_random_port();
40 let mut used = get_used_ports().lock().unwrap();
41 if !used.contains(&p) {
42 used.insert(p);
43 return p;
44 }
45 }
46
47 panic!(
48 "Failed to find an unused port after {} attempts",
49 MAX_ATTEMPTS
50 );
51}
52
53#[derive(Clone)]
54pub enum ServerMode {
55 Standalone {
56 http_addr: String,
57 rpc_bind_addr: String,
58 mysql_addr: String,
59 postgres_addr: String,
60 },
61 Frontend {
62 http_addr: String,
63 rpc_bind_addr: String,
64 mysql_addr: String,
65 postgres_addr: String,
66 metasrv_addr: String,
67 },
68 Metasrv {
69 rpc_bind_addr: String,
70 rpc_server_addr: String,
71 http_addr: String,
72 },
73 Datanode {
74 rpc_bind_addr: String,
75 rpc_server_addr: String,
76 http_addr: String,
77 metasrv_addr: String,
78 node_id: u32,
79 },
80 Flownode {
81 rpc_bind_addr: String,
82 rpc_server_addr: String,
83 http_addr: String,
84 metasrv_addr: String,
85 node_id: u32,
86 },
87}
88
89#[derive(Serialize)]
90struct ConfigContext {
91 wal_dir: String,
92 data_home: String,
93 procedure_dir: String,
94 is_raft_engine: bool,
95 kafka_wal_broker_endpoints: String,
96 use_etcd: bool,
97 store_addrs: String,
98 instance_id: usize,
99 metasrv_addr: String,
102 grpc_addr: String,
104 mysql_addr: String,
106 postgres_addr: String,
108 enable_flat_format: bool,
110}
111
112impl ServerMode {
113 pub fn random_standalone() -> Self {
114 let http_port = get_unique_random_port();
115 let rpc_port = get_unique_random_port();
116 let mysql_port = get_unique_random_port();
117 let postgres_port = get_unique_random_port();
118
119 ServerMode::Standalone {
120 http_addr: format!("127.0.0.1:{http_port}"),
121 rpc_bind_addr: format!("127.0.0.1:{rpc_port}"),
122 mysql_addr: format!("127.0.0.1:{mysql_port}"),
123 postgres_addr: format!("127.0.0.1:{postgres_port}"),
124 }
125 }
126
127 pub fn random_frontend(metasrv_port: u16) -> Self {
128 let http_port = get_unique_random_port();
129 let rpc_port = get_unique_random_port();
130 let mysql_port = get_unique_random_port();
131 let postgres_port = get_unique_random_port();
132
133 ServerMode::Frontend {
134 http_addr: format!("127.0.0.1:{http_port}"),
135 rpc_bind_addr: format!("127.0.0.1:{rpc_port}"),
136 mysql_addr: format!("127.0.0.1:{mysql_port}"),
137 postgres_addr: format!("127.0.0.1:{postgres_port}"),
138 metasrv_addr: format!("127.0.0.1:{metasrv_port}"),
139 }
140 }
141
142 pub fn random_metasrv() -> Self {
143 let bind_port = get_unique_random_port();
144 let http_port = get_unique_random_port();
145
146 ServerMode::Metasrv {
147 rpc_bind_addr: format!("127.0.0.1:{bind_port}"),
148 rpc_server_addr: format!("127.0.0.1:{bind_port}"),
149 http_addr: format!("127.0.0.1:{http_port}"),
150 }
151 }
152
153 pub fn random_datanode(metasrv_port: u16, node_id: u32) -> Self {
154 let rpc_port = get_unique_random_port();
155 let http_port = get_unique_random_port();
156
157 ServerMode::Datanode {
158 rpc_bind_addr: format!("127.0.0.1:{rpc_port}"),
159 rpc_server_addr: format!("127.0.0.1:{rpc_port}"),
160 http_addr: format!("127.0.0.1:{http_port}"),
161 metasrv_addr: format!("127.0.0.1:{metasrv_port}"),
162 node_id,
163 }
164 }
165
166 pub fn random_flownode(metasrv_port: u16, node_id: u32) -> Self {
167 let rpc_port = get_unique_random_port();
168 let http_port = get_unique_random_port();
169
170 ServerMode::Flownode {
171 rpc_bind_addr: format!("127.0.0.1:{rpc_port}"),
172 rpc_server_addr: format!("127.0.0.1:{rpc_port}"),
173 http_addr: format!("127.0.0.1:{http_port}"),
174 metasrv_addr: format!("127.0.0.1:{metasrv_port}"),
175 node_id,
176 }
177 }
178
179 pub fn name(&self) -> &'static str {
180 match self {
181 ServerMode::Standalone { .. } => "standalone",
182 ServerMode::Frontend { .. } => "frontend",
183 ServerMode::Metasrv { .. } => "metasrv",
184 ServerMode::Datanode { .. } => "datanode",
185 ServerMode::Flownode { .. } => "flownode",
186 }
187 }
188
189 pub fn check_addrs(&self) -> Vec<String> {
191 match self {
192 ServerMode::Standalone {
193 rpc_bind_addr,
194 mysql_addr,
195 postgres_addr,
196 http_addr,
197 ..
198 } => {
199 vec![
200 rpc_bind_addr.clone(),
201 mysql_addr.clone(),
202 postgres_addr.clone(),
203 http_addr.clone(),
204 ]
205 }
206 ServerMode::Frontend {
207 rpc_bind_addr,
208 mysql_addr,
209 postgres_addr,
210 ..
211 } => {
212 vec![
213 rpc_bind_addr.clone(),
214 mysql_addr.clone(),
215 postgres_addr.clone(),
216 ]
217 }
218 ServerMode::Metasrv { rpc_bind_addr, .. } => {
219 vec![rpc_bind_addr.clone()]
220 }
221 ServerMode::Datanode { rpc_bind_addr, .. } => {
222 vec![rpc_bind_addr.clone()]
223 }
224 ServerMode::Flownode { rpc_bind_addr, .. } => {
225 vec![rpc_bind_addr.clone()]
226 }
227 }
228 }
229
230 pub fn server_addr(&self) -> Option<ServerAddr> {
232 match self {
233 ServerMode::Standalone {
234 rpc_bind_addr,
235 mysql_addr,
236 postgres_addr,
237 ..
238 } => Some(ServerAddr {
239 server_addr: Some(rpc_bind_addr.clone()),
240 pg_server_addr: Some(postgres_addr.clone()),
241 mysql_server_addr: Some(mysql_addr.clone()),
242 }),
243 ServerMode::Frontend {
244 rpc_bind_addr,
245 mysql_addr,
246 postgres_addr,
247 ..
248 } => Some(ServerAddr {
249 server_addr: Some(rpc_bind_addr.clone()),
250 pg_server_addr: Some(postgres_addr.clone()),
251 mysql_server_addr: Some(mysql_addr.clone()),
252 }),
253 _ => None,
254 }
255 }
256
257 pub fn generate_config_file(
258 &self,
259 sqlness_home: &Path,
260 db_ctx: &GreptimeDBContext,
261 id: usize,
262 ) -> String {
263 let mut tt = TinyTemplate::new();
264
265 let mut path = util::sqlness_conf_path();
266 path.push(format!("{}-test.toml.template", self.name()));
267 let template = std::fs::read_to_string(&path)
268 .unwrap_or_else(|e| panic!("read file '{}' error: {e}", path.display()));
269 tt.add_template(self.name(), &template).unwrap();
270
271 let data_home = sqlness_home.join(format!("greptimedb-{}-{}", id, self.name()));
272 std::fs::create_dir_all(data_home.as_path()).unwrap();
273
274 let wal_dir = data_home.join("wal").display().to_string();
275 let procedure_dir = data_home.join("procedure").display().to_string();
276
277 let (metasrv_addr, grpc_addr, mysql_addr, postgres_addr) = match self {
279 ServerMode::Standalone {
280 rpc_bind_addr,
281 mysql_addr,
282 postgres_addr,
283 ..
284 } => (
285 String::new(),
286 rpc_bind_addr.clone(),
287 mysql_addr.clone(),
288 postgres_addr.clone(),
289 ),
290 ServerMode::Frontend {
291 rpc_bind_addr,
292 mysql_addr,
293 postgres_addr,
294 ..
295 } => (
296 String::new(),
297 rpc_bind_addr.clone(),
298 mysql_addr.clone(),
299 postgres_addr.clone(),
300 ),
301 ServerMode::Datanode {
302 rpc_bind_addr,
303 metasrv_addr,
304 ..
305 } => (
306 metasrv_addr.clone(),
307 rpc_bind_addr.clone(),
308 String::new(),
309 String::new(),
310 ),
311 _ => (String::new(), String::new(), String::new(), String::new()),
312 };
313
314 let ctx = ConfigContext {
315 wal_dir,
316 data_home: data_home.display().to_string(),
317 procedure_dir,
318 is_raft_engine: db_ctx.is_raft_engine(),
319 kafka_wal_broker_endpoints: db_ctx.kafka_wal_broker_endpoints(),
320 use_etcd: !db_ctx.store_config().store_addrs.is_empty(),
321 store_addrs: db_ctx
322 .store_config()
323 .store_addrs
324 .iter()
325 .map(|p| format!("\"{p}\""))
326 .collect::<Vec<_>>()
327 .join(","),
328 instance_id: id,
329 metasrv_addr,
330 grpc_addr,
331 mysql_addr,
332 postgres_addr,
333 enable_flat_format: db_ctx.store_config().enable_flat_format,
334 };
335
336 let rendered = tt.render(self.name(), &ctx).unwrap();
337
338 let conf_file = data_home
339 .join(format!("{}-{}-{}.toml", self.name(), id, db_ctx.time()))
340 .display()
341 .to_string();
342 println!(
343 "Generating id {}, {} config file in {conf_file}",
344 id,
345 self.name()
346 );
347 std::fs::write(&conf_file, rendered).unwrap();
348
349 conf_file
350 }
351
352 pub fn get_args(
353 &self,
354 sqlness_home: &Path,
355 env: &Env,
356 db_ctx: &GreptimeDBContext,
357 id: usize,
358 ) -> Vec<String> {
359 let mut args = env
360 .extra_args()
361 .iter()
362 .map(String::as_str)
363 .chain([DEFAULT_LOG_LEVEL, self.name(), "start"])
364 .map(ToString::to_string)
365 .collect::<Vec<String>>();
366
367 match self {
368 ServerMode::Standalone {
369 http_addr,
370 rpc_bind_addr,
371 mysql_addr,
372 postgres_addr,
373 } => {
374 args.extend([
375 format!(
376 "--log-dir={}/greptimedb-{}-standalone/logs",
377 sqlness_home.display(),
378 id
379 ),
380 "-c".to_string(),
381 self.generate_config_file(sqlness_home, db_ctx, id),
382 format!("--http-addr={http_addr}"),
383 format!("--rpc-addr={rpc_bind_addr}"),
384 format!("--mysql-addr={mysql_addr}"),
385 format!("--postgres-addr={postgres_addr}"),
386 ]);
387 }
388 ServerMode::Frontend {
389 http_addr,
390 rpc_bind_addr,
391 mysql_addr,
392 postgres_addr,
393 metasrv_addr,
394 } => {
395 args.extend([
396 format!("--metasrv-addrs={metasrv_addr}"),
397 format!("--http-addr={http_addr}"),
398 format!("--rpc-addr={rpc_bind_addr}"),
399 format!("--rpc-server-addr={rpc_bind_addr}"),
402 format!("--mysql-addr={mysql_addr}"),
403 format!("--postgres-addr={postgres_addr}"),
404 format!(
405 "--log-dir={}/greptimedb-{}-frontend/logs",
406 sqlness_home.display(),
407 id
408 ),
409 "-c".to_string(),
410 self.generate_config_file(sqlness_home, db_ctx, id),
411 ]);
412 }
413 ServerMode::Metasrv {
414 rpc_bind_addr,
415 rpc_server_addr,
416 http_addr,
417 } => {
418 args.extend([
419 "--bind-addr".to_string(),
420 rpc_bind_addr.clone(),
421 "--server-addr".to_string(),
422 rpc_server_addr.clone(),
423 "--enable-region-failover".to_string(),
424 "false".to_string(),
425 format!("--http-addr={http_addr}"),
426 format!(
427 "--log-dir={}/greptimedb-{}-metasrv/logs",
428 sqlness_home.display(),
429 id
430 ),
431 "-c".to_string(),
432 self.generate_config_file(sqlness_home, db_ctx, id),
433 ]);
434
435 if matches!(
436 db_ctx.store_config().setup_pg,
437 Some(ServiceProvider::Create)
438 ) {
439 let client_ports = db_ctx
440 .store_config()
441 .store_addrs
442 .iter()
443 .map(|s| s.split(':').nth(1).unwrap().parse::<u16>().unwrap())
444 .collect::<Vec<_>>();
445 let client_port = client_ports.first().unwrap_or(&5432);
446 let pg_server_addr = format!(
447 "postgresql://greptimedb:admin@127.0.0.1:{}/postgres",
448 client_port
449 );
450 args.extend(vec!["--backend".to_string(), "postgres-store".to_string()]);
451 args.extend(vec!["--store-addrs".to_string(), pg_server_addr]);
452 } else if let Some(ServiceProvider::External(connection_string)) =
453 db_ctx.store_config().setup_pg
454 {
455 println!("Using external PostgreSQL '{connection_string}' as Kvbackend");
456 args.extend([
457 "--backend".to_string(),
458 "postgres-store".to_string(),
459 "--store-addrs".to_string(),
460 connection_string,
461 ]);
462 } else if matches!(
463 db_ctx.store_config().setup_mysql,
464 Some(ServiceProvider::Create)
465 ) {
466 let client_ports = db_ctx
467 .store_config()
468 .store_addrs
469 .iter()
470 .map(|s| s.split(':').nth(1).unwrap().parse::<u16>().unwrap())
471 .collect::<Vec<_>>();
472 let client_port = client_ports.first().unwrap_or(&3306);
473 let mysql_server_addr =
474 format!("mysql://greptimedb:admin@127.0.0.1:{}/mysql", client_port);
475 args.extend(vec!["--backend".to_string(), "mysql-store".to_string()]);
476 args.extend(vec!["--store-addrs".to_string(), mysql_server_addr]);
477 } else if let Some(ServiceProvider::External(connection_string)) =
478 db_ctx.store_config().setup_mysql
479 {
480 println!("Using external MySQL '{connection_string}' as Kvbackend");
481 args.extend([
482 "--backend".to_string(),
483 "mysql-store".to_string(),
484 "--store-addrs".to_string(),
485 connection_string,
486 ]);
487 } else if db_ctx.store_config().store_addrs.is_empty() {
488 args.extend(vec!["--backend".to_string(), "memory-store".to_string()])
489 }
490 }
491 ServerMode::Datanode {
492 rpc_bind_addr,
493 rpc_server_addr,
494 http_addr,
495 metasrv_addr,
496 node_id,
497 } => {
498 let data_home = sqlness_home.join(format!(
499 "greptimedb_{}_datanode_{}_{node_id}",
500 id,
501 db_ctx.time()
502 ));
503 args.extend([
504 format!("--rpc-addr={rpc_bind_addr}"),
505 format!("--rpc-server-addr={rpc_server_addr}"),
506 format!("--http-addr={http_addr}"),
507 format!("--data-home={}", data_home.display()),
508 format!("--log-dir={}/logs", data_home.display()),
509 format!("--node-id={node_id}"),
510 "-c".to_string(),
511 self.generate_config_file(sqlness_home, db_ctx, id),
512 format!("--metasrv-addrs={metasrv_addr}"),
513 ]);
514 }
515 ServerMode::Flownode {
516 rpc_bind_addr,
517 rpc_server_addr,
518 http_addr,
519 metasrv_addr,
520 node_id,
521 } => {
522 args.extend([
523 format!("--rpc-addr={rpc_bind_addr}"),
524 format!("--rpc-server-addr={rpc_server_addr}"),
525 format!("--node-id={node_id}"),
526 format!(
527 "--log-dir={}/greptimedb-{}-flownode/logs",
528 sqlness_home.display(),
529 id
530 ),
531 format!("--metasrv-addrs={metasrv_addr}"),
532 format!("--http-addr={http_addr}"),
533 ]);
534 }
535 }
536
537 args
538 }
539}