Skip to main content

sqlness_runner/
server_mode.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::path::Path;
17use std::sync::{Mutex, OnceLock};
18
19use serde::Serialize;
20use tinytemplate::TinyTemplate;
21
22use crate::cmd::bare::ServerAddr;
23use crate::env::bare::{Env, GreptimeDBContext, ServiceProvider};
24use crate::util;
25
26const DEFAULT_LOG_LEVEL: &str = "--log-level=debug,hyper=warn,tower=warn,datafusion=warn,reqwest=warn,sqlparser=warn,h2=info,opendal=info";
27
28static USED_PORTS: OnceLock<Mutex<HashSet<u16>>> = OnceLock::new();
29
30fn get_used_ports() -> &'static Mutex<HashSet<u16>> {
31    USED_PORTS.get_or_init(|| Mutex::new(HashSet::new()))
32}
33
34fn get_unique_random_port() -> u16 {
35    // Tricky loop 100 times to find an unused port instead of infinite loop.
36    const MAX_ATTEMPTS: usize = 100;
37
38    for _ in 0..MAX_ATTEMPTS {
39        let p = util::get_random_port();
40        let mut used = get_used_ports().lock().unwrap();
41        if !used.contains(&p) {
42            used.insert(p);
43            return p;
44        }
45    }
46
47    panic!(
48        "Failed to find an unused port after {} attempts",
49        MAX_ATTEMPTS
50    );
51}
52
53#[derive(Clone)]
54pub enum ServerMode {
55    Standalone {
56        http_addr: String,
57        rpc_bind_addr: String,
58        mysql_addr: String,
59        postgres_addr: String,
60    },
61    Frontend {
62        http_addr: String,
63        rpc_bind_addr: String,
64        mysql_addr: String,
65        postgres_addr: String,
66        metasrv_addr: String,
67    },
68    Metasrv {
69        rpc_bind_addr: String,
70        rpc_server_addr: String,
71        http_addr: String,
72    },
73    Datanode {
74        rpc_bind_addr: String,
75        rpc_server_addr: String,
76        http_addr: String,
77        metasrv_addr: String,
78        node_id: u32,
79    },
80    Flownode {
81        rpc_bind_addr: String,
82        rpc_server_addr: String,
83        http_addr: String,
84        metasrv_addr: String,
85        node_id: u32,
86    },
87}
88
89#[derive(Serialize)]
90struct ConfigContext {
91    wal_dir: String,
92    data_home: String,
93    procedure_dir: String,
94    is_raft_engine: bool,
95    kafka_wal_broker_endpoints: String,
96    use_etcd: bool,
97    store_addrs: String,
98    instance_id: usize,
99    addrs: HashMap<String, String>,
100    // enable flat format for storage engine
101    enable_flat_format: bool,
102}
103
104impl ServerMode {
105    pub fn random_standalone() -> Self {
106        let http_port = get_unique_random_port();
107        let rpc_port = get_unique_random_port();
108        let mysql_port = get_unique_random_port();
109        let postgres_port = get_unique_random_port();
110
111        ServerMode::Standalone {
112            http_addr: format!("127.0.0.1:{http_port}"),
113            rpc_bind_addr: format!("127.0.0.1:{rpc_port}"),
114            mysql_addr: format!("127.0.0.1:{mysql_port}"),
115            postgres_addr: format!("127.0.0.1:{postgres_port}"),
116        }
117    }
118
119    pub fn random_frontend(metasrv_port: u16) -> Self {
120        let http_port = get_unique_random_port();
121        let rpc_port = get_unique_random_port();
122        let mysql_port = get_unique_random_port();
123        let postgres_port = get_unique_random_port();
124
125        ServerMode::Frontend {
126            http_addr: format!("127.0.0.1:{http_port}"),
127            rpc_bind_addr: format!("127.0.0.1:{rpc_port}"),
128            mysql_addr: format!("127.0.0.1:{mysql_port}"),
129            postgres_addr: format!("127.0.0.1:{postgres_port}"),
130            metasrv_addr: format!("127.0.0.1:{metasrv_port}"),
131        }
132    }
133
134    pub fn random_metasrv() -> Self {
135        let bind_port = get_unique_random_port();
136        let http_port = get_unique_random_port();
137
138        ServerMode::Metasrv {
139            rpc_bind_addr: format!("127.0.0.1:{bind_port}"),
140            rpc_server_addr: format!("127.0.0.1:{bind_port}"),
141            http_addr: format!("127.0.0.1:{http_port}"),
142        }
143    }
144
145    pub fn random_datanode(metasrv_port: u16, node_id: u32) -> Self {
146        let rpc_port = get_unique_random_port();
147        let http_port = get_unique_random_port();
148
149        ServerMode::Datanode {
150            rpc_bind_addr: format!("127.0.0.1:{rpc_port}"),
151            rpc_server_addr: format!("127.0.0.1:{rpc_port}"),
152            http_addr: format!("127.0.0.1:{http_port}"),
153            metasrv_addr: format!("127.0.0.1:{metasrv_port}"),
154            node_id,
155        }
156    }
157
158    pub fn random_flownode(metasrv_port: u16, node_id: u32) -> Self {
159        let rpc_port = get_unique_random_port();
160        let http_port = get_unique_random_port();
161
162        ServerMode::Flownode {
163            rpc_bind_addr: format!("127.0.0.1:{rpc_port}"),
164            rpc_server_addr: format!("127.0.0.1:{rpc_port}"),
165            http_addr: format!("127.0.0.1:{http_port}"),
166            metasrv_addr: format!("127.0.0.1:{metasrv_port}"),
167            node_id,
168        }
169    }
170
171    pub fn name(&self) -> &'static str {
172        match self {
173            ServerMode::Standalone { .. } => "standalone",
174            ServerMode::Frontend { .. } => "frontend",
175            ServerMode::Metasrv { .. } => "metasrv",
176            ServerMode::Datanode { .. } => "datanode",
177            ServerMode::Flownode { .. } => "flownode",
178        }
179    }
180
181    /// Returns the addresses of the server that needed to be checked.
182    pub fn check_addrs(&self) -> Vec<String> {
183        match self {
184            ServerMode::Standalone {
185                rpc_bind_addr,
186                mysql_addr,
187                postgres_addr,
188                http_addr,
189                ..
190            } => {
191                vec![
192                    rpc_bind_addr.clone(),
193                    mysql_addr.clone(),
194                    postgres_addr.clone(),
195                    http_addr.clone(),
196                ]
197            }
198            ServerMode::Frontend {
199                rpc_bind_addr,
200                mysql_addr,
201                postgres_addr,
202                ..
203            } => {
204                vec![
205                    rpc_bind_addr.clone(),
206                    mysql_addr.clone(),
207                    postgres_addr.clone(),
208                ]
209            }
210            ServerMode::Metasrv { rpc_bind_addr, .. } => {
211                vec![rpc_bind_addr.clone()]
212            }
213            ServerMode::Datanode { rpc_bind_addr, .. } => {
214                vec![rpc_bind_addr.clone()]
215            }
216            ServerMode::Flownode { rpc_bind_addr, .. } => {
217                vec![rpc_bind_addr.clone()]
218            }
219        }
220    }
221
222    /// Returns the server addresses to connect. Only standalone and frontend mode have this.
223    pub fn server_addr(&self) -> Option<ServerAddr> {
224        match self {
225            ServerMode::Standalone {
226                rpc_bind_addr,
227                mysql_addr,
228                postgres_addr,
229                ..
230            } => Some(ServerAddr {
231                server_addr: Some(rpc_bind_addr.clone()),
232                pg_server_addr: Some(postgres_addr.clone()),
233                mysql_server_addr: Some(mysql_addr.clone()),
234            }),
235            ServerMode::Frontend {
236                rpc_bind_addr,
237                mysql_addr,
238                postgres_addr,
239                ..
240            } => Some(ServerAddr {
241                server_addr: Some(rpc_bind_addr.clone()),
242                pg_server_addr: Some(postgres_addr.clone()),
243                mysql_server_addr: Some(mysql_addr.clone()),
244            }),
245            _ => None,
246        }
247    }
248
249    pub fn generate_config_file(
250        &self,
251        sqlness_home: &Path,
252        db_ctx: &GreptimeDBContext,
253        id: usize,
254    ) -> String {
255        let mut tt = TinyTemplate::new();
256
257        let mut path = util::sqlness_conf_path();
258        path.push(format!("{}-test.toml.template", self.name()));
259        let template = std::fs::read_to_string(&path)
260            .unwrap_or_else(|e| panic!("read file '{}' error: {e}", path.display()));
261        tt.add_template(self.name(), &template).unwrap();
262
263        let data_home = sqlness_home.join(format!("greptimedb-{}-{}", id, self.name()));
264        std::fs::create_dir_all(data_home.as_path()).unwrap();
265
266        let wal_dir = data_home.join("wal").display().to_string();
267        let procedure_dir = data_home.join("procedure").display().to_string();
268
269        // Get the required addresses based on server mode
270        let addrs: HashMap<String, String> = match self {
271            ServerMode::Standalone {
272                rpc_bind_addr,
273                mysql_addr,
274                postgres_addr,
275                http_addr,
276            } => [
277                ("http_addr".to_string(), http_addr.clone()),
278                ("grpc_addr".to_string(), rpc_bind_addr.clone()),
279                ("mysql_addr".to_string(), mysql_addr.clone()),
280                ("postgres_addr".to_string(), postgres_addr.clone()),
281            ]
282            .into(),
283            ServerMode::Frontend { rpc_bind_addr, .. } => {
284                [("grpc_addr".to_string(), rpc_bind_addr.clone())].into()
285            }
286            ServerMode::Datanode { metasrv_addr, .. } => {
287                [("metasrv_addr".to_string(), metasrv_addr.clone())].into()
288            }
289            _ => HashMap::new(),
290        };
291
292        let ctx = ConfigContext {
293            wal_dir,
294            data_home: data_home.display().to_string(),
295            procedure_dir,
296            is_raft_engine: db_ctx.is_raft_engine(),
297            kafka_wal_broker_endpoints: db_ctx.kafka_wal_broker_endpoints(),
298            use_etcd: !db_ctx.store_config().store_addrs.is_empty(),
299            store_addrs: db_ctx
300                .store_config()
301                .store_addrs
302                .iter()
303                .map(|p| format!("\"{p}\""))
304                .collect::<Vec<_>>()
305                .join(","),
306            instance_id: id,
307            addrs,
308            enable_flat_format: db_ctx.store_config().enable_flat_format,
309        };
310
311        let rendered = tt.render(self.name(), &ctx).unwrap();
312
313        let conf_file = data_home
314            .join(format!("{}-{}-{}.toml", self.name(), id, db_ctx.time()))
315            .display()
316            .to_string();
317        println!(
318            "Generating id {}, {} config file in {conf_file}",
319            id,
320            self.name()
321        );
322        std::fs::write(&conf_file, rendered).unwrap();
323
324        conf_file
325    }
326
327    pub fn get_args(
328        &self,
329        sqlness_home: &Path,
330        env: &Env,
331        db_ctx: &GreptimeDBContext,
332        id: usize,
333    ) -> Vec<String> {
334        let mut args = env
335            .extra_args()
336            .iter()
337            .map(String::as_str)
338            .chain([DEFAULT_LOG_LEVEL, self.name(), "start"])
339            .map(ToString::to_string)
340            .collect::<Vec<String>>();
341
342        match self {
343            ServerMode::Standalone {
344                http_addr,
345                rpc_bind_addr,
346                mysql_addr,
347                postgres_addr,
348            } => {
349                args.extend([
350                    format!(
351                        "--log-dir={}/greptimedb-{}-standalone/logs",
352                        sqlness_home.display(),
353                        id
354                    ),
355                    "-c".to_string(),
356                    self.generate_config_file(sqlness_home, db_ctx, id),
357                    format!("--http-addr={http_addr}"),
358                    format!("--rpc-addr={rpc_bind_addr}"),
359                    format!("--mysql-addr={mysql_addr}"),
360                    format!("--postgres-addr={postgres_addr}"),
361                ]);
362            }
363            ServerMode::Frontend {
364                http_addr,
365                rpc_bind_addr,
366                mysql_addr,
367                postgres_addr,
368                metasrv_addr,
369            } => {
370                args.extend([
371                    format!("--metasrv-addrs={metasrv_addr}"),
372                    format!("--http-addr={http_addr}"),
373                    format!("--rpc-addr={rpc_bind_addr}"),
374                    // since sqlness run on local, bind addr is the same as server addr
375                    // this is needed so that `cluster_info`'s server addr column can be correct
376                    format!("--rpc-server-addr={rpc_bind_addr}"),
377                    format!("--mysql-addr={mysql_addr}"),
378                    format!("--postgres-addr={postgres_addr}"),
379                    format!(
380                        "--log-dir={}/greptimedb-{}-frontend/logs",
381                        sqlness_home.display(),
382                        id
383                    ),
384                    "-c".to_string(),
385                    self.generate_config_file(sqlness_home, db_ctx, id),
386                ]);
387            }
388            ServerMode::Metasrv {
389                rpc_bind_addr,
390                rpc_server_addr,
391                http_addr,
392            } => {
393                args.extend([
394                    "--bind-addr".to_string(),
395                    rpc_bind_addr.clone(),
396                    "--server-addr".to_string(),
397                    rpc_server_addr.clone(),
398                    "--enable-region-failover".to_string(),
399                    "false".to_string(),
400                    format!("--http-addr={http_addr}"),
401                    format!(
402                        "--log-dir={}/greptimedb-{}-metasrv/logs",
403                        sqlness_home.display(),
404                        id
405                    ),
406                    "-c".to_string(),
407                    self.generate_config_file(sqlness_home, db_ctx, id),
408                ]);
409
410                if matches!(
411                    db_ctx.store_config().setup_pg,
412                    Some(ServiceProvider::Create)
413                ) {
414                    let client_ports = db_ctx
415                        .store_config()
416                        .store_addrs
417                        .iter()
418                        .map(|s| s.split(':').nth(1).unwrap().parse::<u16>().unwrap())
419                        .collect::<Vec<_>>();
420                    let client_port = client_ports.first().unwrap_or(&5432);
421                    let pg_server_addr = format!(
422                        "postgresql://greptimedb:admin@127.0.0.1:{}/postgres",
423                        client_port
424                    );
425                    args.extend(vec!["--backend".to_string(), "postgres-store".to_string()]);
426                    args.extend(vec!["--store-addrs".to_string(), pg_server_addr]);
427                } else if let Some(ServiceProvider::External(connection_string)) =
428                    db_ctx.store_config().setup_pg
429                {
430                    println!("Using external PostgreSQL '{connection_string}' as Kvbackend");
431                    args.extend([
432                        "--backend".to_string(),
433                        "postgres-store".to_string(),
434                        "--store-addrs".to_string(),
435                        connection_string,
436                    ]);
437                } else if matches!(
438                    db_ctx.store_config().setup_mysql,
439                    Some(ServiceProvider::Create)
440                ) {
441                    let client_ports = db_ctx
442                        .store_config()
443                        .store_addrs
444                        .iter()
445                        .map(|s| s.split(':').nth(1).unwrap().parse::<u16>().unwrap())
446                        .collect::<Vec<_>>();
447                    let client_port = client_ports.first().unwrap_or(&3306);
448                    let mysql_server_addr =
449                        format!("mysql://greptimedb:admin@127.0.0.1:{}/mysql", client_port);
450                    args.extend(vec!["--backend".to_string(), "mysql-store".to_string()]);
451                    args.extend(vec!["--store-addrs".to_string(), mysql_server_addr]);
452                } else if let Some(ServiceProvider::External(connection_string)) =
453                    db_ctx.store_config().setup_mysql
454                {
455                    println!("Using external MySQL '{connection_string}' as Kvbackend");
456                    args.extend([
457                        "--backend".to_string(),
458                        "mysql-store".to_string(),
459                        "--store-addrs".to_string(),
460                        connection_string,
461                    ]);
462                } else if db_ctx.store_config().store_addrs.is_empty() {
463                    args.extend(vec!["--backend".to_string(), "memory-store".to_string()])
464                }
465            }
466            ServerMode::Datanode {
467                rpc_bind_addr,
468                rpc_server_addr,
469                http_addr,
470                metasrv_addr,
471                node_id,
472            } => {
473                let data_home = sqlness_home.join(format!(
474                    "greptimedb_{}_datanode_{}_{node_id}",
475                    id,
476                    db_ctx.time()
477                ));
478                args.extend([
479                    format!("--rpc-addr={rpc_bind_addr}"),
480                    format!("--rpc-server-addr={rpc_server_addr}"),
481                    format!("--http-addr={http_addr}"),
482                    format!("--data-home={}", data_home.display()),
483                    format!("--log-dir={}/logs", data_home.display()),
484                    format!("--node-id={node_id}"),
485                    "-c".to_string(),
486                    self.generate_config_file(sqlness_home, db_ctx, id),
487                    format!("--metasrv-addrs={metasrv_addr}"),
488                ]);
489            }
490            ServerMode::Flownode {
491                rpc_bind_addr,
492                rpc_server_addr,
493                http_addr,
494                metasrv_addr,
495                node_id,
496            } => {
497                args.extend([
498                    format!("--rpc-addr={rpc_bind_addr}"),
499                    format!("--rpc-server-addr={rpc_server_addr}"),
500                    format!("--node-id={node_id}"),
501                    format!(
502                        "--log-dir={}/greptimedb-{}-flownode/logs",
503                        sqlness_home.display(),
504                        id
505                    ),
506                    format!("--metasrv-addrs={metasrv_addr}"),
507                    format!("--http-addr={http_addr}"),
508                ]);
509            }
510        }
511
512        args
513    }
514}