1use std::collections::HashSet;
16use std::path::Path;
17use std::sync::{Mutex, OnceLock};
18
19use serde::Serialize;
20use tinytemplate::TinyTemplate;
21
22use crate::env::{Env, GreptimeDBContext};
23use crate::{util, ServerAddr};
24
25const DEFAULT_LOG_LEVEL: &str = "--log-level=debug,hyper=warn,tower=warn,datafusion=warn,reqwest=warn,sqlparser=warn,h2=info,opendal=info";
26
27static USED_PORTS: OnceLock<Mutex<HashSet<u16>>> = OnceLock::new();
28
29fn get_used_ports() -> &'static Mutex<HashSet<u16>> {
30 USED_PORTS.get_or_init(|| Mutex::new(HashSet::new()))
31}
32
33fn get_unique_random_port() -> u16 {
34 const MAX_ATTEMPTS: usize = 100;
36
37 for _ in 0..MAX_ATTEMPTS {
38 let p = util::get_random_port();
39 let mut used = get_used_ports().lock().unwrap();
40 if !used.contains(&p) {
41 used.insert(p);
42 return p;
43 }
44 }
45
46 panic!(
47 "Failed to find an unused port after {} attempts",
48 MAX_ATTEMPTS
49 );
50}
51
52#[derive(Clone)]
53pub enum ServerMode {
54 Standalone {
55 http_addr: String,
56 rpc_bind_addr: String,
57 mysql_addr: String,
58 postgres_addr: String,
59 },
60 Frontend {
61 http_addr: String,
62 rpc_bind_addr: String,
63 mysql_addr: String,
64 postgres_addr: String,
65 metasrv_addr: String,
66 },
67 Metasrv {
68 rpc_bind_addr: String,
69 rpc_server_addr: String,
70 http_addr: String,
71 },
72 Datanode {
73 rpc_bind_addr: String,
74 rpc_server_addr: String,
75 http_addr: String,
76 metasrv_addr: String,
77 node_id: u32,
78 },
79 Flownode {
80 rpc_bind_addr: String,
81 rpc_server_addr: String,
82 http_addr: String,
83 metasrv_addr: String,
84 node_id: u32,
85 },
86}
87
88#[derive(Serialize)]
89struct ConfigContext {
90 wal_dir: String,
91 data_home: String,
92 procedure_dir: String,
93 is_raft_engine: bool,
94 kafka_wal_broker_endpoints: String,
95 use_etcd: bool,
96 store_addrs: String,
97 instance_id: usize,
98 metasrv_addr: String,
101 grpc_addr: String,
103 mysql_addr: String,
105 postgres_addr: String,
107}
108
109impl ServerMode {
110 pub fn random_standalone() -> Self {
111 let http_port = get_unique_random_port();
112 let rpc_port = get_unique_random_port();
113 let mysql_port = get_unique_random_port();
114 let postgres_port = get_unique_random_port();
115
116 ServerMode::Standalone {
117 http_addr: format!("127.0.0.1:{http_port}"),
118 rpc_bind_addr: format!("127.0.0.1:{rpc_port}"),
119 mysql_addr: format!("127.0.0.1:{mysql_port}"),
120 postgres_addr: format!("127.0.0.1:{postgres_port}"),
121 }
122 }
123
124 pub fn random_frontend(metasrv_port: u16) -> Self {
125 let http_port = get_unique_random_port();
126 let rpc_port = get_unique_random_port();
127 let mysql_port = get_unique_random_port();
128 let postgres_port = get_unique_random_port();
129
130 ServerMode::Frontend {
131 http_addr: format!("127.0.0.1:{http_port}"),
132 rpc_bind_addr: format!("127.0.0.1:{rpc_port}"),
133 mysql_addr: format!("127.0.0.1:{mysql_port}"),
134 postgres_addr: format!("127.0.0.1:{postgres_port}"),
135 metasrv_addr: format!("127.0.0.1:{metasrv_port}"),
136 }
137 }
138
139 pub fn random_metasrv() -> Self {
140 let bind_port = get_unique_random_port();
141 let http_port = get_unique_random_port();
142
143 ServerMode::Metasrv {
144 rpc_bind_addr: format!("127.0.0.1:{bind_port}"),
145 rpc_server_addr: format!("127.0.0.1:{bind_port}"),
146 http_addr: format!("127.0.0.1:{http_port}"),
147 }
148 }
149
150 pub fn random_datanode(metasrv_port: u16, node_id: u32) -> Self {
151 let rpc_port = get_unique_random_port();
152 let http_port = get_unique_random_port();
153
154 ServerMode::Datanode {
155 rpc_bind_addr: format!("127.0.0.1:{rpc_port}"),
156 rpc_server_addr: format!("127.0.0.1:{rpc_port}"),
157 http_addr: format!("127.0.0.1:{http_port}"),
158 metasrv_addr: format!("127.0.0.1:{metasrv_port}"),
159 node_id,
160 }
161 }
162
163 pub fn random_flownode(metasrv_port: u16, node_id: u32) -> Self {
164 let rpc_port = get_unique_random_port();
165 let http_port = get_unique_random_port();
166
167 ServerMode::Flownode {
168 rpc_bind_addr: format!("127.0.0.1:{rpc_port}"),
169 rpc_server_addr: format!("127.0.0.1:{rpc_port}"),
170 http_addr: format!("127.0.0.1:{http_port}"),
171 metasrv_addr: format!("127.0.0.1:{metasrv_port}"),
172 node_id,
173 }
174 }
175
176 pub fn name(&self) -> &'static str {
177 match self {
178 ServerMode::Standalone { .. } => "standalone",
179 ServerMode::Frontend { .. } => "frontend",
180 ServerMode::Metasrv { .. } => "metasrv",
181 ServerMode::Datanode { .. } => "datanode",
182 ServerMode::Flownode { .. } => "flownode",
183 }
184 }
185
186 pub fn check_addrs(&self) -> Vec<String> {
188 match self {
189 ServerMode::Standalone {
190 rpc_bind_addr,
191 mysql_addr,
192 postgres_addr,
193 http_addr,
194 ..
195 } => {
196 vec![
197 rpc_bind_addr.clone(),
198 mysql_addr.clone(),
199 postgres_addr.clone(),
200 http_addr.clone(),
201 ]
202 }
203 ServerMode::Frontend {
204 rpc_bind_addr,
205 mysql_addr,
206 postgres_addr,
207 ..
208 } => {
209 vec![
210 rpc_bind_addr.clone(),
211 mysql_addr.clone(),
212 postgres_addr.clone(),
213 ]
214 }
215 ServerMode::Metasrv { rpc_bind_addr, .. } => {
216 vec![rpc_bind_addr.clone()]
217 }
218 ServerMode::Datanode { rpc_bind_addr, .. } => {
219 vec![rpc_bind_addr.clone()]
220 }
221 ServerMode::Flownode { rpc_bind_addr, .. } => {
222 vec![rpc_bind_addr.clone()]
223 }
224 }
225 }
226
227 pub fn server_addr(&self) -> Option<ServerAddr> {
229 match self {
230 ServerMode::Standalone {
231 rpc_bind_addr,
232 mysql_addr,
233 postgres_addr,
234 ..
235 } => Some(ServerAddr {
236 server_addr: Some(rpc_bind_addr.clone()),
237 pg_server_addr: Some(postgres_addr.clone()),
238 mysql_server_addr: Some(mysql_addr.clone()),
239 }),
240 ServerMode::Frontend {
241 rpc_bind_addr,
242 mysql_addr,
243 postgres_addr,
244 ..
245 } => Some(ServerAddr {
246 server_addr: Some(rpc_bind_addr.clone()),
247 pg_server_addr: Some(postgres_addr.clone()),
248 mysql_server_addr: Some(mysql_addr.clone()),
249 }),
250 _ => None,
251 }
252 }
253
254 pub fn generate_config_file(
255 &self,
256 sqlness_home: &Path,
257 db_ctx: &GreptimeDBContext,
258 id: usize,
259 ) -> String {
260 let mut tt = TinyTemplate::new();
261
262 let mut path = util::sqlness_conf_path();
263 path.push(format!("{}-test.toml.template", self.name()));
264 let template = std::fs::read_to_string(path).unwrap();
265 tt.add_template(self.name(), &template).unwrap();
266
267 let data_home = sqlness_home.join(format!("greptimedb-{}-{}", id, self.name()));
268 std::fs::create_dir_all(data_home.as_path()).unwrap();
269
270 let wal_dir = data_home.join("wal").display().to_string();
271 let procedure_dir = data_home.join("procedure").display().to_string();
272
273 let (metasrv_addr, grpc_addr, mysql_addr, postgres_addr) = match self {
275 ServerMode::Standalone {
276 rpc_bind_addr,
277 mysql_addr,
278 postgres_addr,
279 ..
280 } => (
281 String::new(),
282 rpc_bind_addr.clone(),
283 mysql_addr.clone(),
284 postgres_addr.clone(),
285 ),
286 ServerMode::Frontend {
287 rpc_bind_addr,
288 mysql_addr,
289 postgres_addr,
290 ..
291 } => (
292 String::new(),
293 rpc_bind_addr.clone(),
294 mysql_addr.clone(),
295 postgres_addr.clone(),
296 ),
297 ServerMode::Datanode {
298 rpc_bind_addr,
299 metasrv_addr,
300 ..
301 } => (
302 metasrv_addr.clone(),
303 rpc_bind_addr.clone(),
304 String::new(),
305 String::new(),
306 ),
307 _ => (String::new(), String::new(), String::new(), String::new()),
308 };
309
310 let ctx = ConfigContext {
311 wal_dir,
312 data_home: data_home.display().to_string(),
313 procedure_dir,
314 is_raft_engine: db_ctx.is_raft_engine(),
315 kafka_wal_broker_endpoints: db_ctx.kafka_wal_broker_endpoints(),
316 use_etcd: !db_ctx.store_config().store_addrs.is_empty(),
317 store_addrs: db_ctx
318 .store_config()
319 .store_addrs
320 .iter()
321 .map(|p| format!("\"{p}\""))
322 .collect::<Vec<_>>()
323 .join(","),
324 instance_id: id,
325 metasrv_addr,
326 grpc_addr,
327 mysql_addr,
328 postgres_addr,
329 };
330
331 let rendered = tt.render(self.name(), &ctx).unwrap();
332
333 let conf_file = data_home
334 .join(format!("{}-{}-{}.toml", self.name(), id, db_ctx.time()))
335 .display()
336 .to_string();
337 println!(
338 "Generating id {}, {} config file in {conf_file}",
339 id,
340 self.name()
341 );
342 std::fs::write(&conf_file, rendered).unwrap();
343
344 conf_file
345 }
346
347 pub fn get_args(
348 &self,
349 sqlness_home: &Path,
350 _env: &Env,
351 db_ctx: &GreptimeDBContext,
352 id: usize,
353 ) -> Vec<String> {
354 let mut args = vec![
355 DEFAULT_LOG_LEVEL.to_string(),
356 self.name().to_string(),
357 "start".to_string(),
358 ];
359
360 match self {
361 ServerMode::Standalone {
362 http_addr,
363 rpc_bind_addr,
364 mysql_addr,
365 postgres_addr,
366 } => {
367 args.extend([
368 format!(
369 "--log-dir={}/greptimedb-{}-standalone/logs",
370 sqlness_home.display(),
371 id
372 ),
373 "-c".to_string(),
374 self.generate_config_file(sqlness_home, db_ctx, id),
375 format!("--http-addr={http_addr}"),
376 format!("--rpc-addr={rpc_bind_addr}"),
377 format!("--mysql-addr={mysql_addr}"),
378 format!("--postgres-addr={postgres_addr}"),
379 ]);
380 }
381 ServerMode::Frontend {
382 http_addr,
383 rpc_bind_addr,
384 mysql_addr,
385 postgres_addr,
386 metasrv_addr,
387 } => {
388 args.extend([
389 format!("--metasrv-addrs={metasrv_addr}"),
390 format!("--http-addr={http_addr}"),
391 format!("--rpc-addr={rpc_bind_addr}"),
392 format!("--rpc-server-addr={rpc_bind_addr}"),
395 format!("--mysql-addr={mysql_addr}"),
396 format!("--postgres-addr={postgres_addr}"),
397 format!(
398 "--log-dir={}/greptimedb-{}-frontend/logs",
399 sqlness_home.display(),
400 id
401 ),
402 "-c".to_string(),
403 self.generate_config_file(sqlness_home, db_ctx, id),
404 ]);
405 }
406 ServerMode::Metasrv {
407 rpc_bind_addr,
408 rpc_server_addr,
409 http_addr,
410 } => {
411 args.extend([
412 "--bind-addr".to_string(),
413 rpc_bind_addr.clone(),
414 "--server-addr".to_string(),
415 rpc_server_addr.clone(),
416 "--enable-region-failover".to_string(),
417 "false".to_string(),
418 format!("--http-addr={http_addr}"),
419 format!(
420 "--log-dir={}/greptimedb-{}-metasrv/logs",
421 sqlness_home.display(),
422 id
423 ),
424 "-c".to_string(),
425 self.generate_config_file(sqlness_home, db_ctx, id),
426 ]);
427
428 if db_ctx.store_config().setup_pg {
429 let client_ports = db_ctx
430 .store_config()
431 .store_addrs
432 .iter()
433 .map(|s| s.split(':').nth(1).unwrap().parse::<u16>().unwrap())
434 .collect::<Vec<_>>();
435 let client_port = client_ports.first().unwrap_or(&5432);
436 let pg_server_addr = format!(
437 "postgresql://greptimedb:admin@127.0.0.1:{}/postgres",
438 client_port
439 );
440 args.extend(vec!["--backend".to_string(), "postgres-store".to_string()]);
441 args.extend(vec!["--store-addrs".to_string(), pg_server_addr]);
442 } else if db_ctx.store_config().setup_mysql {
443 let client_ports = db_ctx
444 .store_config()
445 .store_addrs
446 .iter()
447 .map(|s| s.split(':').nth(1).unwrap().parse::<u16>().unwrap())
448 .collect::<Vec<_>>();
449 let client_port = client_ports.first().unwrap_or(&3306);
450 let mysql_server_addr =
451 format!("mysql://greptimedb:admin@127.0.0.1:{}/mysql", client_port);
452 args.extend(vec!["--backend".to_string(), "mysql-store".to_string()]);
453 args.extend(vec!["--store-addrs".to_string(), mysql_server_addr]);
454 } else if db_ctx.store_config().store_addrs.is_empty() {
455 args.extend(vec!["--backend".to_string(), "memory-store".to_string()])
456 }
457 }
458 ServerMode::Datanode {
459 rpc_bind_addr,
460 rpc_server_addr,
461 http_addr,
462 metasrv_addr,
463 node_id,
464 } => {
465 let data_home = sqlness_home.join(format!(
466 "greptimedb_{}_datanode_{}_{node_id}",
467 id,
468 db_ctx.time()
469 ));
470 args.extend([
471 format!("--rpc-addr={rpc_bind_addr}"),
472 format!("--rpc-server-addr={rpc_server_addr}"),
473 format!("--http-addr={http_addr}"),
474 format!("--data-home={}", data_home.display()),
475 format!("--log-dir={}/logs", data_home.display()),
476 format!("--node-id={node_id}"),
477 "-c".to_string(),
478 self.generate_config_file(sqlness_home, db_ctx, id),
479 format!("--metasrv-addrs={metasrv_addr}"),
480 ]);
481 }
482 ServerMode::Flownode {
483 rpc_bind_addr,
484 rpc_server_addr,
485 http_addr,
486 metasrv_addr,
487 node_id,
488 } => {
489 args.extend([
490 format!("--rpc-addr={rpc_bind_addr}"),
491 format!("--rpc-server-addr={rpc_server_addr}"),
492 format!("--node-id={node_id}"),
493 format!(
494 "--log-dir={}/greptimedb-{}-flownode/logs",
495 sqlness_home.display(),
496 id
497 ),
498 format!("--metasrv-addrs={metasrv_addr}"),
499 format!("--http-addr={http_addr}"),
500 ]);
501 }
502 }
503
504 args
505 }
506}