sqlness_runner/
server_mode.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::path::Path;
17use std::sync::{Mutex, OnceLock};
18
19use serde::Serialize;
20use tinytemplate::TinyTemplate;
21
22use crate::env::{Env, GreptimeDBContext};
23use crate::{util, ServerAddr};
24
25const DEFAULT_LOG_LEVEL: &str = "--log-level=debug,hyper=warn,tower=warn,datafusion=warn,reqwest=warn,sqlparser=warn,h2=info,opendal=info";
26
27static USED_PORTS: OnceLock<Mutex<HashSet<u16>>> = OnceLock::new();
28
29fn get_used_ports() -> &'static Mutex<HashSet<u16>> {
30    USED_PORTS.get_or_init(|| Mutex::new(HashSet::new()))
31}
32
33fn get_unique_random_port() -> u16 {
34    // Tricky loop 100 times to find an unused port instead of infinite loop.
35    const MAX_ATTEMPTS: usize = 100;
36
37    for _ in 0..MAX_ATTEMPTS {
38        let p = util::get_random_port();
39        let mut used = get_used_ports().lock().unwrap();
40        if !used.contains(&p) {
41            used.insert(p);
42            return p;
43        }
44    }
45
46    panic!(
47        "Failed to find an unused port after {} attempts",
48        MAX_ATTEMPTS
49    );
50}
51
52#[derive(Clone)]
53pub enum ServerMode {
54    Standalone {
55        http_addr: String,
56        rpc_bind_addr: String,
57        mysql_addr: String,
58        postgres_addr: String,
59    },
60    Frontend {
61        http_addr: String,
62        rpc_bind_addr: String,
63        mysql_addr: String,
64        postgres_addr: String,
65        metasrv_addr: String,
66    },
67    Metasrv {
68        rpc_bind_addr: String,
69        rpc_server_addr: String,
70        http_addr: String,
71    },
72    Datanode {
73        rpc_bind_addr: String,
74        rpc_server_addr: String,
75        http_addr: String,
76        metasrv_addr: String,
77        node_id: u32,
78    },
79    Flownode {
80        rpc_bind_addr: String,
81        rpc_server_addr: String,
82        http_addr: String,
83        metasrv_addr: String,
84        node_id: u32,
85    },
86}
87
88#[derive(Serialize)]
89struct ConfigContext {
90    wal_dir: String,
91    data_home: String,
92    procedure_dir: String,
93    is_raft_engine: bool,
94    kafka_wal_broker_endpoints: String,
95    use_etcd: bool,
96    store_addrs: String,
97    instance_id: usize,
98    // for following addrs, leave it empty if not needed
99    // required for datanode
100    metasrv_addr: String,
101    // for frontend and standalone
102    grpc_addr: String,
103    // for standalone
104    mysql_addr: String,
105    // for standalone
106    postgres_addr: String,
107}
108
109impl ServerMode {
110    pub fn random_standalone() -> Self {
111        let http_port = get_unique_random_port();
112        let rpc_port = get_unique_random_port();
113        let mysql_port = get_unique_random_port();
114        let postgres_port = get_unique_random_port();
115
116        ServerMode::Standalone {
117            http_addr: format!("127.0.0.1:{http_port}"),
118            rpc_bind_addr: format!("127.0.0.1:{rpc_port}"),
119            mysql_addr: format!("127.0.0.1:{mysql_port}"),
120            postgres_addr: format!("127.0.0.1:{postgres_port}"),
121        }
122    }
123
124    pub fn random_frontend(metasrv_port: u16) -> Self {
125        let http_port = get_unique_random_port();
126        let rpc_port = get_unique_random_port();
127        let mysql_port = get_unique_random_port();
128        let postgres_port = get_unique_random_port();
129
130        ServerMode::Frontend {
131            http_addr: format!("127.0.0.1:{http_port}"),
132            rpc_bind_addr: format!("127.0.0.1:{rpc_port}"),
133            mysql_addr: format!("127.0.0.1:{mysql_port}"),
134            postgres_addr: format!("127.0.0.1:{postgres_port}"),
135            metasrv_addr: format!("127.0.0.1:{metasrv_port}"),
136        }
137    }
138
139    pub fn random_metasrv() -> Self {
140        let bind_port = get_unique_random_port();
141        let http_port = get_unique_random_port();
142
143        ServerMode::Metasrv {
144            rpc_bind_addr: format!("127.0.0.1:{bind_port}"),
145            rpc_server_addr: format!("127.0.0.1:{bind_port}"),
146            http_addr: format!("127.0.0.1:{http_port}"),
147        }
148    }
149
150    pub fn random_datanode(metasrv_port: u16, node_id: u32) -> Self {
151        let rpc_port = get_unique_random_port();
152        let http_port = get_unique_random_port();
153
154        ServerMode::Datanode {
155            rpc_bind_addr: format!("127.0.0.1:{rpc_port}"),
156            rpc_server_addr: format!("127.0.0.1:{rpc_port}"),
157            http_addr: format!("127.0.0.1:{http_port}"),
158            metasrv_addr: format!("127.0.0.1:{metasrv_port}"),
159            node_id,
160        }
161    }
162
163    pub fn random_flownode(metasrv_port: u16, node_id: u32) -> Self {
164        let rpc_port = get_unique_random_port();
165        let http_port = get_unique_random_port();
166
167        ServerMode::Flownode {
168            rpc_bind_addr: format!("127.0.0.1:{rpc_port}"),
169            rpc_server_addr: format!("127.0.0.1:{rpc_port}"),
170            http_addr: format!("127.0.0.1:{http_port}"),
171            metasrv_addr: format!("127.0.0.1:{metasrv_port}"),
172            node_id,
173        }
174    }
175
176    pub fn name(&self) -> &'static str {
177        match self {
178            ServerMode::Standalone { .. } => "standalone",
179            ServerMode::Frontend { .. } => "frontend",
180            ServerMode::Metasrv { .. } => "metasrv",
181            ServerMode::Datanode { .. } => "datanode",
182            ServerMode::Flownode { .. } => "flownode",
183        }
184    }
185
186    /// Returns the addresses of the server that needed to be checked.
187    pub fn check_addrs(&self) -> Vec<String> {
188        match self {
189            ServerMode::Standalone {
190                rpc_bind_addr,
191                mysql_addr,
192                postgres_addr,
193                http_addr,
194                ..
195            } => {
196                vec![
197                    rpc_bind_addr.clone(),
198                    mysql_addr.clone(),
199                    postgres_addr.clone(),
200                    http_addr.clone(),
201                ]
202            }
203            ServerMode::Frontend {
204                rpc_bind_addr,
205                mysql_addr,
206                postgres_addr,
207                ..
208            } => {
209                vec![
210                    rpc_bind_addr.clone(),
211                    mysql_addr.clone(),
212                    postgres_addr.clone(),
213                ]
214            }
215            ServerMode::Metasrv { rpc_bind_addr, .. } => {
216                vec![rpc_bind_addr.clone()]
217            }
218            ServerMode::Datanode { rpc_bind_addr, .. } => {
219                vec![rpc_bind_addr.clone()]
220            }
221            ServerMode::Flownode { rpc_bind_addr, .. } => {
222                vec![rpc_bind_addr.clone()]
223            }
224        }
225    }
226
227    /// Returns the server addresses to connect. Only standalone and frontend mode have this.
228    pub fn server_addr(&self) -> Option<ServerAddr> {
229        match self {
230            ServerMode::Standalone {
231                rpc_bind_addr,
232                mysql_addr,
233                postgres_addr,
234                ..
235            } => Some(ServerAddr {
236                server_addr: Some(rpc_bind_addr.clone()),
237                pg_server_addr: Some(postgres_addr.clone()),
238                mysql_server_addr: Some(mysql_addr.clone()),
239            }),
240            ServerMode::Frontend {
241                rpc_bind_addr,
242                mysql_addr,
243                postgres_addr,
244                ..
245            } => Some(ServerAddr {
246                server_addr: Some(rpc_bind_addr.clone()),
247                pg_server_addr: Some(postgres_addr.clone()),
248                mysql_server_addr: Some(mysql_addr.clone()),
249            }),
250            _ => None,
251        }
252    }
253
254    pub fn generate_config_file(
255        &self,
256        sqlness_home: &Path,
257        db_ctx: &GreptimeDBContext,
258        id: usize,
259    ) -> String {
260        let mut tt = TinyTemplate::new();
261
262        let mut path = util::sqlness_conf_path();
263        path.push(format!("{}-test.toml.template", self.name()));
264        let template = std::fs::read_to_string(path).unwrap();
265        tt.add_template(self.name(), &template).unwrap();
266
267        let data_home = sqlness_home.join(format!("greptimedb-{}-{}", id, self.name()));
268        std::fs::create_dir_all(data_home.as_path()).unwrap();
269
270        let wal_dir = data_home.join("wal").display().to_string();
271        let procedure_dir = data_home.join("procedure").display().to_string();
272
273        // Get the required addresses based on server mode
274        let (metasrv_addr, grpc_addr, mysql_addr, postgres_addr) = match self {
275            ServerMode::Standalone {
276                rpc_bind_addr,
277                mysql_addr,
278                postgres_addr,
279                ..
280            } => (
281                String::new(),
282                rpc_bind_addr.clone(),
283                mysql_addr.clone(),
284                postgres_addr.clone(),
285            ),
286            ServerMode::Frontend {
287                rpc_bind_addr,
288                mysql_addr,
289                postgres_addr,
290                ..
291            } => (
292                String::new(),
293                rpc_bind_addr.clone(),
294                mysql_addr.clone(),
295                postgres_addr.clone(),
296            ),
297            ServerMode::Datanode {
298                rpc_bind_addr,
299                metasrv_addr,
300                ..
301            } => (
302                metasrv_addr.clone(),
303                rpc_bind_addr.clone(),
304                String::new(),
305                String::new(),
306            ),
307            _ => (String::new(), String::new(), String::new(), String::new()),
308        };
309
310        let ctx = ConfigContext {
311            wal_dir,
312            data_home: data_home.display().to_string(),
313            procedure_dir,
314            is_raft_engine: db_ctx.is_raft_engine(),
315            kafka_wal_broker_endpoints: db_ctx.kafka_wal_broker_endpoints(),
316            use_etcd: !db_ctx.store_config().store_addrs.is_empty(),
317            store_addrs: db_ctx
318                .store_config()
319                .store_addrs
320                .iter()
321                .map(|p| format!("\"{p}\""))
322                .collect::<Vec<_>>()
323                .join(","),
324            instance_id: id,
325            metasrv_addr,
326            grpc_addr,
327            mysql_addr,
328            postgres_addr,
329        };
330
331        let rendered = tt.render(self.name(), &ctx).unwrap();
332
333        let conf_file = data_home
334            .join(format!("{}-{}-{}.toml", self.name(), id, db_ctx.time()))
335            .display()
336            .to_string();
337        println!(
338            "Generating id {}, {} config file in {conf_file}",
339            id,
340            self.name()
341        );
342        std::fs::write(&conf_file, rendered).unwrap();
343
344        conf_file
345    }
346
347    pub fn get_args(
348        &self,
349        sqlness_home: &Path,
350        _env: &Env,
351        db_ctx: &GreptimeDBContext,
352        id: usize,
353    ) -> Vec<String> {
354        let mut args = vec![
355            DEFAULT_LOG_LEVEL.to_string(),
356            self.name().to_string(),
357            "start".to_string(),
358        ];
359
360        match self {
361            ServerMode::Standalone {
362                http_addr,
363                rpc_bind_addr,
364                mysql_addr,
365                postgres_addr,
366            } => {
367                args.extend([
368                    format!(
369                        "--log-dir={}/greptimedb-{}-standalone/logs",
370                        sqlness_home.display(),
371                        id
372                    ),
373                    "-c".to_string(),
374                    self.generate_config_file(sqlness_home, db_ctx, id),
375                    format!("--http-addr={http_addr}"),
376                    format!("--rpc-addr={rpc_bind_addr}"),
377                    format!("--mysql-addr={mysql_addr}"),
378                    format!("--postgres-addr={postgres_addr}"),
379                ]);
380            }
381            ServerMode::Frontend {
382                http_addr,
383                rpc_bind_addr,
384                mysql_addr,
385                postgres_addr,
386                metasrv_addr,
387            } => {
388                args.extend([
389                    format!("--metasrv-addrs={metasrv_addr}"),
390                    format!("--http-addr={http_addr}"),
391                    format!("--rpc-addr={rpc_bind_addr}"),
392                    // since sqlness run on local, bind addr is the same as server addr
393                    // this is needed so that `cluster_info`'s server addr column can be correct
394                    format!("--rpc-server-addr={rpc_bind_addr}"),
395                    format!("--mysql-addr={mysql_addr}"),
396                    format!("--postgres-addr={postgres_addr}"),
397                    format!(
398                        "--log-dir={}/greptimedb-{}-frontend/logs",
399                        sqlness_home.display(),
400                        id
401                    ),
402                    "-c".to_string(),
403                    self.generate_config_file(sqlness_home, db_ctx, id),
404                ]);
405            }
406            ServerMode::Metasrv {
407                rpc_bind_addr,
408                rpc_server_addr,
409                http_addr,
410            } => {
411                args.extend([
412                    "--bind-addr".to_string(),
413                    rpc_bind_addr.clone(),
414                    "--server-addr".to_string(),
415                    rpc_server_addr.clone(),
416                    "--enable-region-failover".to_string(),
417                    "false".to_string(),
418                    format!("--http-addr={http_addr}"),
419                    format!(
420                        "--log-dir={}/greptimedb-{}-metasrv/logs",
421                        sqlness_home.display(),
422                        id
423                    ),
424                    "-c".to_string(),
425                    self.generate_config_file(sqlness_home, db_ctx, id),
426                ]);
427
428                if db_ctx.store_config().setup_pg {
429                    let client_ports = db_ctx
430                        .store_config()
431                        .store_addrs
432                        .iter()
433                        .map(|s| s.split(':').nth(1).unwrap().parse::<u16>().unwrap())
434                        .collect::<Vec<_>>();
435                    let client_port = client_ports.first().unwrap_or(&5432);
436                    let pg_server_addr = format!(
437                        "postgresql://greptimedb:admin@127.0.0.1:{}/postgres",
438                        client_port
439                    );
440                    args.extend(vec!["--backend".to_string(), "postgres-store".to_string()]);
441                    args.extend(vec!["--store-addrs".to_string(), pg_server_addr]);
442                } else if db_ctx.store_config().setup_mysql {
443                    let client_ports = db_ctx
444                        .store_config()
445                        .store_addrs
446                        .iter()
447                        .map(|s| s.split(':').nth(1).unwrap().parse::<u16>().unwrap())
448                        .collect::<Vec<_>>();
449                    let client_port = client_ports.first().unwrap_or(&3306);
450                    let mysql_server_addr =
451                        format!("mysql://greptimedb:admin@127.0.0.1:{}/mysql", client_port);
452                    args.extend(vec!["--backend".to_string(), "mysql-store".to_string()]);
453                    args.extend(vec!["--store-addrs".to_string(), mysql_server_addr]);
454                } else if db_ctx.store_config().store_addrs.is_empty() {
455                    args.extend(vec!["--backend".to_string(), "memory-store".to_string()])
456                }
457            }
458            ServerMode::Datanode {
459                rpc_bind_addr,
460                rpc_server_addr,
461                http_addr,
462                metasrv_addr,
463                node_id,
464            } => {
465                let data_home = sqlness_home.join(format!(
466                    "greptimedb_{}_datanode_{}_{node_id}",
467                    id,
468                    db_ctx.time()
469                ));
470                args.extend([
471                    format!("--rpc-addr={rpc_bind_addr}"),
472                    format!("--rpc-server-addr={rpc_server_addr}"),
473                    format!("--http-addr={http_addr}"),
474                    format!("--data-home={}", data_home.display()),
475                    format!("--log-dir={}/logs", data_home.display()),
476                    format!("--node-id={node_id}"),
477                    "-c".to_string(),
478                    self.generate_config_file(sqlness_home, db_ctx, id),
479                    format!("--metasrv-addrs={metasrv_addr}"),
480                ]);
481            }
482            ServerMode::Flownode {
483                rpc_bind_addr,
484                rpc_server_addr,
485                http_addr,
486                metasrv_addr,
487                node_id,
488            } => {
489                args.extend([
490                    format!("--rpc-addr={rpc_bind_addr}"),
491                    format!("--rpc-server-addr={rpc_server_addr}"),
492                    format!("--node-id={node_id}"),
493                    format!(
494                        "--log-dir={}/greptimedb-{}-flownode/logs",
495                        sqlness_home.display(),
496                        id
497                    ),
498                    format!("--metasrv-addrs={metasrv_addr}"),
499                    format!("--http-addr={http_addr}"),
500                ]);
501            }
502        }
503
504        args
505    }
506}