sqlness_runner/
server_mode.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::path::Path;
17use std::sync::{Mutex, OnceLock};
18
19use serde::Serialize;
20use tinytemplate::TinyTemplate;
21
22use crate::cmd::bare::ServerAddr;
23use crate::env::bare::{Env, GreptimeDBContext, ServiceProvider};
24use crate::util;
25
26const DEFAULT_LOG_LEVEL: &str = "--log-level=debug,hyper=warn,tower=warn,datafusion=warn,reqwest=warn,sqlparser=warn,h2=info,opendal=info";
27
28static USED_PORTS: OnceLock<Mutex<HashSet<u16>>> = OnceLock::new();
29
30fn get_used_ports() -> &'static Mutex<HashSet<u16>> {
31    USED_PORTS.get_or_init(|| Mutex::new(HashSet::new()))
32}
33
34fn get_unique_random_port() -> u16 {
35    // Tricky loop 100 times to find an unused port instead of infinite loop.
36    const MAX_ATTEMPTS: usize = 100;
37
38    for _ in 0..MAX_ATTEMPTS {
39        let p = util::get_random_port();
40        let mut used = get_used_ports().lock().unwrap();
41        if !used.contains(&p) {
42            used.insert(p);
43            return p;
44        }
45    }
46
47    panic!(
48        "Failed to find an unused port after {} attempts",
49        MAX_ATTEMPTS
50    );
51}
52
53#[derive(Clone)]
54pub enum ServerMode {
55    Standalone {
56        http_addr: String,
57        rpc_bind_addr: String,
58        mysql_addr: String,
59        postgres_addr: String,
60    },
61    Frontend {
62        http_addr: String,
63        rpc_bind_addr: String,
64        mysql_addr: String,
65        postgres_addr: String,
66        metasrv_addr: String,
67    },
68    Metasrv {
69        rpc_bind_addr: String,
70        rpc_server_addr: String,
71        http_addr: String,
72    },
73    Datanode {
74        rpc_bind_addr: String,
75        rpc_server_addr: String,
76        http_addr: String,
77        metasrv_addr: String,
78        node_id: u32,
79    },
80    Flownode {
81        rpc_bind_addr: String,
82        rpc_server_addr: String,
83        http_addr: String,
84        metasrv_addr: String,
85        node_id: u32,
86    },
87}
88
89#[derive(Serialize)]
90struct ConfigContext {
91    wal_dir: String,
92    data_home: String,
93    procedure_dir: String,
94    is_raft_engine: bool,
95    kafka_wal_broker_endpoints: String,
96    use_etcd: bool,
97    store_addrs: String,
98    instance_id: usize,
99    // for following addrs, leave it empty if not needed
100    // required for datanode
101    metasrv_addr: String,
102    // for frontend and standalone
103    grpc_addr: String,
104    // for standalone
105    mysql_addr: String,
106    // for standalone
107    postgres_addr: String,
108}
109
110impl ServerMode {
111    pub fn random_standalone() -> Self {
112        let http_port = get_unique_random_port();
113        let rpc_port = get_unique_random_port();
114        let mysql_port = get_unique_random_port();
115        let postgres_port = get_unique_random_port();
116
117        ServerMode::Standalone {
118            http_addr: format!("127.0.0.1:{http_port}"),
119            rpc_bind_addr: format!("127.0.0.1:{rpc_port}"),
120            mysql_addr: format!("127.0.0.1:{mysql_port}"),
121            postgres_addr: format!("127.0.0.1:{postgres_port}"),
122        }
123    }
124
125    pub fn random_frontend(metasrv_port: u16) -> Self {
126        let http_port = get_unique_random_port();
127        let rpc_port = get_unique_random_port();
128        let mysql_port = get_unique_random_port();
129        let postgres_port = get_unique_random_port();
130
131        ServerMode::Frontend {
132            http_addr: format!("127.0.0.1:{http_port}"),
133            rpc_bind_addr: format!("127.0.0.1:{rpc_port}"),
134            mysql_addr: format!("127.0.0.1:{mysql_port}"),
135            postgres_addr: format!("127.0.0.1:{postgres_port}"),
136            metasrv_addr: format!("127.0.0.1:{metasrv_port}"),
137        }
138    }
139
140    pub fn random_metasrv() -> Self {
141        let bind_port = get_unique_random_port();
142        let http_port = get_unique_random_port();
143
144        ServerMode::Metasrv {
145            rpc_bind_addr: format!("127.0.0.1:{bind_port}"),
146            rpc_server_addr: format!("127.0.0.1:{bind_port}"),
147            http_addr: format!("127.0.0.1:{http_port}"),
148        }
149    }
150
151    pub fn random_datanode(metasrv_port: u16, node_id: u32) -> Self {
152        let rpc_port = get_unique_random_port();
153        let http_port = get_unique_random_port();
154
155        ServerMode::Datanode {
156            rpc_bind_addr: format!("127.0.0.1:{rpc_port}"),
157            rpc_server_addr: format!("127.0.0.1:{rpc_port}"),
158            http_addr: format!("127.0.0.1:{http_port}"),
159            metasrv_addr: format!("127.0.0.1:{metasrv_port}"),
160            node_id,
161        }
162    }
163
164    pub fn random_flownode(metasrv_port: u16, node_id: u32) -> Self {
165        let rpc_port = get_unique_random_port();
166        let http_port = get_unique_random_port();
167
168        ServerMode::Flownode {
169            rpc_bind_addr: format!("127.0.0.1:{rpc_port}"),
170            rpc_server_addr: format!("127.0.0.1:{rpc_port}"),
171            http_addr: format!("127.0.0.1:{http_port}"),
172            metasrv_addr: format!("127.0.0.1:{metasrv_port}"),
173            node_id,
174        }
175    }
176
177    pub fn name(&self) -> &'static str {
178        match self {
179            ServerMode::Standalone { .. } => "standalone",
180            ServerMode::Frontend { .. } => "frontend",
181            ServerMode::Metasrv { .. } => "metasrv",
182            ServerMode::Datanode { .. } => "datanode",
183            ServerMode::Flownode { .. } => "flownode",
184        }
185    }
186
187    /// Returns the addresses of the server that needed to be checked.
188    pub fn check_addrs(&self) -> Vec<String> {
189        match self {
190            ServerMode::Standalone {
191                rpc_bind_addr,
192                mysql_addr,
193                postgres_addr,
194                http_addr,
195                ..
196            } => {
197                vec![
198                    rpc_bind_addr.clone(),
199                    mysql_addr.clone(),
200                    postgres_addr.clone(),
201                    http_addr.clone(),
202                ]
203            }
204            ServerMode::Frontend {
205                rpc_bind_addr,
206                mysql_addr,
207                postgres_addr,
208                ..
209            } => {
210                vec![
211                    rpc_bind_addr.clone(),
212                    mysql_addr.clone(),
213                    postgres_addr.clone(),
214                ]
215            }
216            ServerMode::Metasrv { rpc_bind_addr, .. } => {
217                vec![rpc_bind_addr.clone()]
218            }
219            ServerMode::Datanode { rpc_bind_addr, .. } => {
220                vec![rpc_bind_addr.clone()]
221            }
222            ServerMode::Flownode { rpc_bind_addr, .. } => {
223                vec![rpc_bind_addr.clone()]
224            }
225        }
226    }
227
228    /// Returns the server addresses to connect. Only standalone and frontend mode have this.
229    pub fn server_addr(&self) -> Option<ServerAddr> {
230        match self {
231            ServerMode::Standalone {
232                rpc_bind_addr,
233                mysql_addr,
234                postgres_addr,
235                ..
236            } => Some(ServerAddr {
237                server_addr: Some(rpc_bind_addr.clone()),
238                pg_server_addr: Some(postgres_addr.clone()),
239                mysql_server_addr: Some(mysql_addr.clone()),
240            }),
241            ServerMode::Frontend {
242                rpc_bind_addr,
243                mysql_addr,
244                postgres_addr,
245                ..
246            } => Some(ServerAddr {
247                server_addr: Some(rpc_bind_addr.clone()),
248                pg_server_addr: Some(postgres_addr.clone()),
249                mysql_server_addr: Some(mysql_addr.clone()),
250            }),
251            _ => None,
252        }
253    }
254
255    pub fn generate_config_file(
256        &self,
257        sqlness_home: &Path,
258        db_ctx: &GreptimeDBContext,
259        id: usize,
260    ) -> String {
261        let mut tt = TinyTemplate::new();
262
263        let mut path = util::sqlness_conf_path();
264        path.push(format!("{}-test.toml.template", self.name()));
265        let template = std::fs::read_to_string(&path)
266            .unwrap_or_else(|e| panic!("read file '{}' error: {e}", path.display()));
267        tt.add_template(self.name(), &template).unwrap();
268
269        let data_home = sqlness_home.join(format!("greptimedb-{}-{}", id, self.name()));
270        std::fs::create_dir_all(data_home.as_path()).unwrap();
271
272        let wal_dir = data_home.join("wal").display().to_string();
273        let procedure_dir = data_home.join("procedure").display().to_string();
274
275        // Get the required addresses based on server mode
276        let (metasrv_addr, grpc_addr, mysql_addr, postgres_addr) = match self {
277            ServerMode::Standalone {
278                rpc_bind_addr,
279                mysql_addr,
280                postgres_addr,
281                ..
282            } => (
283                String::new(),
284                rpc_bind_addr.clone(),
285                mysql_addr.clone(),
286                postgres_addr.clone(),
287            ),
288            ServerMode::Frontend {
289                rpc_bind_addr,
290                mysql_addr,
291                postgres_addr,
292                ..
293            } => (
294                String::new(),
295                rpc_bind_addr.clone(),
296                mysql_addr.clone(),
297                postgres_addr.clone(),
298            ),
299            ServerMode::Datanode {
300                rpc_bind_addr,
301                metasrv_addr,
302                ..
303            } => (
304                metasrv_addr.clone(),
305                rpc_bind_addr.clone(),
306                String::new(),
307                String::new(),
308            ),
309            _ => (String::new(), String::new(), String::new(), String::new()),
310        };
311
312        let ctx = ConfigContext {
313            wal_dir,
314            data_home: data_home.display().to_string(),
315            procedure_dir,
316            is_raft_engine: db_ctx.is_raft_engine(),
317            kafka_wal_broker_endpoints: db_ctx.kafka_wal_broker_endpoints(),
318            use_etcd: !db_ctx.store_config().store_addrs.is_empty(),
319            store_addrs: db_ctx
320                .store_config()
321                .store_addrs
322                .iter()
323                .map(|p| format!("\"{p}\""))
324                .collect::<Vec<_>>()
325                .join(","),
326            instance_id: id,
327            metasrv_addr,
328            grpc_addr,
329            mysql_addr,
330            postgres_addr,
331        };
332
333        let rendered = tt.render(self.name(), &ctx).unwrap();
334
335        let conf_file = data_home
336            .join(format!("{}-{}-{}.toml", self.name(), id, db_ctx.time()))
337            .display()
338            .to_string();
339        println!(
340            "Generating id {}, {} config file in {conf_file}",
341            id,
342            self.name()
343        );
344        std::fs::write(&conf_file, rendered).unwrap();
345
346        conf_file
347    }
348
349    pub fn get_args(
350        &self,
351        sqlness_home: &Path,
352        env: &Env,
353        db_ctx: &GreptimeDBContext,
354        id: usize,
355    ) -> Vec<String> {
356        let mut args = env
357            .extra_args()
358            .iter()
359            .map(String::as_str)
360            .chain([DEFAULT_LOG_LEVEL, self.name(), "start"])
361            .map(ToString::to_string)
362            .collect::<Vec<String>>();
363
364        match self {
365            ServerMode::Standalone {
366                http_addr,
367                rpc_bind_addr,
368                mysql_addr,
369                postgres_addr,
370            } => {
371                args.extend([
372                    format!(
373                        "--log-dir={}/greptimedb-{}-standalone/logs",
374                        sqlness_home.display(),
375                        id
376                    ),
377                    "-c".to_string(),
378                    self.generate_config_file(sqlness_home, db_ctx, id),
379                    format!("--http-addr={http_addr}"),
380                    format!("--rpc-addr={rpc_bind_addr}"),
381                    format!("--mysql-addr={mysql_addr}"),
382                    format!("--postgres-addr={postgres_addr}"),
383                ]);
384            }
385            ServerMode::Frontend {
386                http_addr,
387                rpc_bind_addr,
388                mysql_addr,
389                postgres_addr,
390                metasrv_addr,
391            } => {
392                args.extend([
393                    format!("--metasrv-addrs={metasrv_addr}"),
394                    format!("--http-addr={http_addr}"),
395                    format!("--rpc-addr={rpc_bind_addr}"),
396                    // since sqlness run on local, bind addr is the same as server addr
397                    // this is needed so that `cluster_info`'s server addr column can be correct
398                    format!("--rpc-server-addr={rpc_bind_addr}"),
399                    format!("--mysql-addr={mysql_addr}"),
400                    format!("--postgres-addr={postgres_addr}"),
401                    format!(
402                        "--log-dir={}/greptimedb-{}-frontend/logs",
403                        sqlness_home.display(),
404                        id
405                    ),
406                    "-c".to_string(),
407                    self.generate_config_file(sqlness_home, db_ctx, id),
408                ]);
409            }
410            ServerMode::Metasrv {
411                rpc_bind_addr,
412                rpc_server_addr,
413                http_addr,
414            } => {
415                args.extend([
416                    "--bind-addr".to_string(),
417                    rpc_bind_addr.clone(),
418                    "--server-addr".to_string(),
419                    rpc_server_addr.clone(),
420                    "--enable-region-failover".to_string(),
421                    "false".to_string(),
422                    format!("--http-addr={http_addr}"),
423                    format!(
424                        "--log-dir={}/greptimedb-{}-metasrv/logs",
425                        sqlness_home.display(),
426                        id
427                    ),
428                    "-c".to_string(),
429                    self.generate_config_file(sqlness_home, db_ctx, id),
430                ]);
431
432                if matches!(
433                    db_ctx.store_config().setup_pg,
434                    Some(ServiceProvider::Create)
435                ) {
436                    let client_ports = db_ctx
437                        .store_config()
438                        .store_addrs
439                        .iter()
440                        .map(|s| s.split(':').nth(1).unwrap().parse::<u16>().unwrap())
441                        .collect::<Vec<_>>();
442                    let client_port = client_ports.first().unwrap_or(&5432);
443                    let pg_server_addr = format!(
444                        "postgresql://greptimedb:admin@127.0.0.1:{}/postgres",
445                        client_port
446                    );
447                    args.extend(vec!["--backend".to_string(), "postgres-store".to_string()]);
448                    args.extend(vec!["--store-addrs".to_string(), pg_server_addr]);
449                } else if let Some(ServiceProvider::External(connection_string)) =
450                    db_ctx.store_config().setup_pg
451                {
452                    println!("Using external PostgreSQL '{connection_string}' as Kvbackend");
453                    args.extend([
454                        "--backend".to_string(),
455                        "postgres-store".to_string(),
456                        "--store-addrs".to_string(),
457                        connection_string,
458                    ]);
459                } else if matches!(
460                    db_ctx.store_config().setup_mysql,
461                    Some(ServiceProvider::Create)
462                ) {
463                    let client_ports = db_ctx
464                        .store_config()
465                        .store_addrs
466                        .iter()
467                        .map(|s| s.split(':').nth(1).unwrap().parse::<u16>().unwrap())
468                        .collect::<Vec<_>>();
469                    let client_port = client_ports.first().unwrap_or(&3306);
470                    let mysql_server_addr =
471                        format!("mysql://greptimedb:admin@127.0.0.1:{}/mysql", client_port);
472                    args.extend(vec!["--backend".to_string(), "mysql-store".to_string()]);
473                    args.extend(vec!["--store-addrs".to_string(), mysql_server_addr]);
474                } else if let Some(ServiceProvider::External(connection_string)) =
475                    db_ctx.store_config().setup_mysql
476                {
477                    println!("Using external MySQL '{connection_string}' as Kvbackend");
478                    args.extend([
479                        "--backend".to_string(),
480                        "mysql-store".to_string(),
481                        "--store-addrs".to_string(),
482                        connection_string,
483                    ]);
484                } else if db_ctx.store_config().store_addrs.is_empty() {
485                    args.extend(vec!["--backend".to_string(), "memory-store".to_string()])
486                }
487            }
488            ServerMode::Datanode {
489                rpc_bind_addr,
490                rpc_server_addr,
491                http_addr,
492                metasrv_addr,
493                node_id,
494            } => {
495                let data_home = sqlness_home.join(format!(
496                    "greptimedb_{}_datanode_{}_{node_id}",
497                    id,
498                    db_ctx.time()
499                ));
500                args.extend([
501                    format!("--rpc-addr={rpc_bind_addr}"),
502                    format!("--rpc-server-addr={rpc_server_addr}"),
503                    format!("--http-addr={http_addr}"),
504                    format!("--data-home={}", data_home.display()),
505                    format!("--log-dir={}/logs", data_home.display()),
506                    format!("--node-id={node_id}"),
507                    "-c".to_string(),
508                    self.generate_config_file(sqlness_home, db_ctx, id),
509                    format!("--metasrv-addrs={metasrv_addr}"),
510                ]);
511            }
512            ServerMode::Flownode {
513                rpc_bind_addr,
514                rpc_server_addr,
515                http_addr,
516                metasrv_addr,
517                node_id,
518            } => {
519                args.extend([
520                    format!("--rpc-addr={rpc_bind_addr}"),
521                    format!("--rpc-server-addr={rpc_server_addr}"),
522                    format!("--node-id={node_id}"),
523                    format!(
524                        "--log-dir={}/greptimedb-{}-flownode/logs",
525                        sqlness_home.display(),
526                        id
527                    ),
528                    format!("--metasrv-addrs={metasrv_addr}"),
529                    format!("--http-addr={http_addr}"),
530                ]);
531            }
532        }
533
534        args
535    }
536}