sqlness_runner/
server_mode.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::path::Path;
17use std::sync::{Mutex, OnceLock};
18
19use serde::Serialize;
20use tinytemplate::TinyTemplate;
21
22use crate::cmd::bare::ServerAddr;
23use crate::env::bare::{Env, GreptimeDBContext, ServiceProvider};
24use crate::util;
25
26const DEFAULT_LOG_LEVEL: &str = "--log-level=debug,hyper=warn,tower=warn,datafusion=warn,reqwest=warn,sqlparser=warn,h2=info,opendal=info";
27
28static USED_PORTS: OnceLock<Mutex<HashSet<u16>>> = OnceLock::new();
29
30fn get_used_ports() -> &'static Mutex<HashSet<u16>> {
31    USED_PORTS.get_or_init(|| Mutex::new(HashSet::new()))
32}
33
34fn get_unique_random_port() -> u16 {
35    // Tricky loop 100 times to find an unused port instead of infinite loop.
36    const MAX_ATTEMPTS: usize = 100;
37
38    for _ in 0..MAX_ATTEMPTS {
39        let p = util::get_random_port();
40        let mut used = get_used_ports().lock().unwrap();
41        if !used.contains(&p) {
42            used.insert(p);
43            return p;
44        }
45    }
46
47    panic!(
48        "Failed to find an unused port after {} attempts",
49        MAX_ATTEMPTS
50    );
51}
52
53#[derive(Clone)]
54pub enum ServerMode {
55    Standalone {
56        http_addr: String,
57        rpc_bind_addr: String,
58        mysql_addr: String,
59        postgres_addr: String,
60    },
61    Frontend {
62        http_addr: String,
63        rpc_bind_addr: String,
64        mysql_addr: String,
65        postgres_addr: String,
66        metasrv_addr: String,
67    },
68    Metasrv {
69        rpc_bind_addr: String,
70        rpc_server_addr: String,
71        http_addr: String,
72    },
73    Datanode {
74        rpc_bind_addr: String,
75        rpc_server_addr: String,
76        http_addr: String,
77        metasrv_addr: String,
78        node_id: u32,
79    },
80    Flownode {
81        rpc_bind_addr: String,
82        rpc_server_addr: String,
83        http_addr: String,
84        metasrv_addr: String,
85        node_id: u32,
86    },
87}
88
89#[derive(Serialize)]
90struct ConfigContext {
91    wal_dir: String,
92    data_home: String,
93    procedure_dir: String,
94    is_raft_engine: bool,
95    kafka_wal_broker_endpoints: String,
96    use_etcd: bool,
97    store_addrs: String,
98    instance_id: usize,
99    // for following addrs, leave it empty if not needed
100    // required for datanode
101    metasrv_addr: String,
102    // for frontend and standalone
103    grpc_addr: String,
104    // for standalone
105    mysql_addr: String,
106    // for standalone
107    postgres_addr: String,
108    // enable flat format for storage engine
109    enable_flat_format: bool,
110}
111
112impl ServerMode {
113    pub fn random_standalone() -> Self {
114        let http_port = get_unique_random_port();
115        let rpc_port = get_unique_random_port();
116        let mysql_port = get_unique_random_port();
117        let postgres_port = get_unique_random_port();
118
119        ServerMode::Standalone {
120            http_addr: format!("127.0.0.1:{http_port}"),
121            rpc_bind_addr: format!("127.0.0.1:{rpc_port}"),
122            mysql_addr: format!("127.0.0.1:{mysql_port}"),
123            postgres_addr: format!("127.0.0.1:{postgres_port}"),
124        }
125    }
126
127    pub fn random_frontend(metasrv_port: u16) -> Self {
128        let http_port = get_unique_random_port();
129        let rpc_port = get_unique_random_port();
130        let mysql_port = get_unique_random_port();
131        let postgres_port = get_unique_random_port();
132
133        ServerMode::Frontend {
134            http_addr: format!("127.0.0.1:{http_port}"),
135            rpc_bind_addr: format!("127.0.0.1:{rpc_port}"),
136            mysql_addr: format!("127.0.0.1:{mysql_port}"),
137            postgres_addr: format!("127.0.0.1:{postgres_port}"),
138            metasrv_addr: format!("127.0.0.1:{metasrv_port}"),
139        }
140    }
141
142    pub fn random_metasrv() -> Self {
143        let bind_port = get_unique_random_port();
144        let http_port = get_unique_random_port();
145
146        ServerMode::Metasrv {
147            rpc_bind_addr: format!("127.0.0.1:{bind_port}"),
148            rpc_server_addr: format!("127.0.0.1:{bind_port}"),
149            http_addr: format!("127.0.0.1:{http_port}"),
150        }
151    }
152
153    pub fn random_datanode(metasrv_port: u16, node_id: u32) -> Self {
154        let rpc_port = get_unique_random_port();
155        let http_port = get_unique_random_port();
156
157        ServerMode::Datanode {
158            rpc_bind_addr: format!("127.0.0.1:{rpc_port}"),
159            rpc_server_addr: format!("127.0.0.1:{rpc_port}"),
160            http_addr: format!("127.0.0.1:{http_port}"),
161            metasrv_addr: format!("127.0.0.1:{metasrv_port}"),
162            node_id,
163        }
164    }
165
166    pub fn random_flownode(metasrv_port: u16, node_id: u32) -> Self {
167        let rpc_port = get_unique_random_port();
168        let http_port = get_unique_random_port();
169
170        ServerMode::Flownode {
171            rpc_bind_addr: format!("127.0.0.1:{rpc_port}"),
172            rpc_server_addr: format!("127.0.0.1:{rpc_port}"),
173            http_addr: format!("127.0.0.1:{http_port}"),
174            metasrv_addr: format!("127.0.0.1:{metasrv_port}"),
175            node_id,
176        }
177    }
178
179    pub fn name(&self) -> &'static str {
180        match self {
181            ServerMode::Standalone { .. } => "standalone",
182            ServerMode::Frontend { .. } => "frontend",
183            ServerMode::Metasrv { .. } => "metasrv",
184            ServerMode::Datanode { .. } => "datanode",
185            ServerMode::Flownode { .. } => "flownode",
186        }
187    }
188
189    /// Returns the addresses of the server that needed to be checked.
190    pub fn check_addrs(&self) -> Vec<String> {
191        match self {
192            ServerMode::Standalone {
193                rpc_bind_addr,
194                mysql_addr,
195                postgres_addr,
196                http_addr,
197                ..
198            } => {
199                vec![
200                    rpc_bind_addr.clone(),
201                    mysql_addr.clone(),
202                    postgres_addr.clone(),
203                    http_addr.clone(),
204                ]
205            }
206            ServerMode::Frontend {
207                rpc_bind_addr,
208                mysql_addr,
209                postgres_addr,
210                ..
211            } => {
212                vec![
213                    rpc_bind_addr.clone(),
214                    mysql_addr.clone(),
215                    postgres_addr.clone(),
216                ]
217            }
218            ServerMode::Metasrv { rpc_bind_addr, .. } => {
219                vec![rpc_bind_addr.clone()]
220            }
221            ServerMode::Datanode { rpc_bind_addr, .. } => {
222                vec![rpc_bind_addr.clone()]
223            }
224            ServerMode::Flownode { rpc_bind_addr, .. } => {
225                vec![rpc_bind_addr.clone()]
226            }
227        }
228    }
229
230    /// Returns the server addresses to connect. Only standalone and frontend mode have this.
231    pub fn server_addr(&self) -> Option<ServerAddr> {
232        match self {
233            ServerMode::Standalone {
234                rpc_bind_addr,
235                mysql_addr,
236                postgres_addr,
237                ..
238            } => Some(ServerAddr {
239                server_addr: Some(rpc_bind_addr.clone()),
240                pg_server_addr: Some(postgres_addr.clone()),
241                mysql_server_addr: Some(mysql_addr.clone()),
242            }),
243            ServerMode::Frontend {
244                rpc_bind_addr,
245                mysql_addr,
246                postgres_addr,
247                ..
248            } => Some(ServerAddr {
249                server_addr: Some(rpc_bind_addr.clone()),
250                pg_server_addr: Some(postgres_addr.clone()),
251                mysql_server_addr: Some(mysql_addr.clone()),
252            }),
253            _ => None,
254        }
255    }
256
257    pub fn generate_config_file(
258        &self,
259        sqlness_home: &Path,
260        db_ctx: &GreptimeDBContext,
261        id: usize,
262    ) -> String {
263        let mut tt = TinyTemplate::new();
264
265        let mut path = util::sqlness_conf_path();
266        path.push(format!("{}-test.toml.template", self.name()));
267        let template = std::fs::read_to_string(&path)
268            .unwrap_or_else(|e| panic!("read file '{}' error: {e}", path.display()));
269        tt.add_template(self.name(), &template).unwrap();
270
271        let data_home = sqlness_home.join(format!("greptimedb-{}-{}", id, self.name()));
272        std::fs::create_dir_all(data_home.as_path()).unwrap();
273
274        let wal_dir = data_home.join("wal").display().to_string();
275        let procedure_dir = data_home.join("procedure").display().to_string();
276
277        // Get the required addresses based on server mode
278        let (metasrv_addr, grpc_addr, mysql_addr, postgres_addr) = match self {
279            ServerMode::Standalone {
280                rpc_bind_addr,
281                mysql_addr,
282                postgres_addr,
283                ..
284            } => (
285                String::new(),
286                rpc_bind_addr.clone(),
287                mysql_addr.clone(),
288                postgres_addr.clone(),
289            ),
290            ServerMode::Frontend {
291                rpc_bind_addr,
292                mysql_addr,
293                postgres_addr,
294                ..
295            } => (
296                String::new(),
297                rpc_bind_addr.clone(),
298                mysql_addr.clone(),
299                postgres_addr.clone(),
300            ),
301            ServerMode::Datanode {
302                rpc_bind_addr,
303                metasrv_addr,
304                ..
305            } => (
306                metasrv_addr.clone(),
307                rpc_bind_addr.clone(),
308                String::new(),
309                String::new(),
310            ),
311            _ => (String::new(), String::new(), String::new(), String::new()),
312        };
313
314        let ctx = ConfigContext {
315            wal_dir,
316            data_home: data_home.display().to_string(),
317            procedure_dir,
318            is_raft_engine: db_ctx.is_raft_engine(),
319            kafka_wal_broker_endpoints: db_ctx.kafka_wal_broker_endpoints(),
320            use_etcd: !db_ctx.store_config().store_addrs.is_empty(),
321            store_addrs: db_ctx
322                .store_config()
323                .store_addrs
324                .iter()
325                .map(|p| format!("\"{p}\""))
326                .collect::<Vec<_>>()
327                .join(","),
328            instance_id: id,
329            metasrv_addr,
330            grpc_addr,
331            mysql_addr,
332            postgres_addr,
333            enable_flat_format: db_ctx.store_config().enable_flat_format,
334        };
335
336        let rendered = tt.render(self.name(), &ctx).unwrap();
337
338        let conf_file = data_home
339            .join(format!("{}-{}-{}.toml", self.name(), id, db_ctx.time()))
340            .display()
341            .to_string();
342        println!(
343            "Generating id {}, {} config file in {conf_file}",
344            id,
345            self.name()
346        );
347        std::fs::write(&conf_file, rendered).unwrap();
348
349        conf_file
350    }
351
352    pub fn get_args(
353        &self,
354        sqlness_home: &Path,
355        env: &Env,
356        db_ctx: &GreptimeDBContext,
357        id: usize,
358    ) -> Vec<String> {
359        let mut args = env
360            .extra_args()
361            .iter()
362            .map(String::as_str)
363            .chain([DEFAULT_LOG_LEVEL, self.name(), "start"])
364            .map(ToString::to_string)
365            .collect::<Vec<String>>();
366
367        match self {
368            ServerMode::Standalone {
369                http_addr,
370                rpc_bind_addr,
371                mysql_addr,
372                postgres_addr,
373            } => {
374                args.extend([
375                    format!(
376                        "--log-dir={}/greptimedb-{}-standalone/logs",
377                        sqlness_home.display(),
378                        id
379                    ),
380                    "-c".to_string(),
381                    self.generate_config_file(sqlness_home, db_ctx, id),
382                    format!("--http-addr={http_addr}"),
383                    format!("--rpc-addr={rpc_bind_addr}"),
384                    format!("--mysql-addr={mysql_addr}"),
385                    format!("--postgres-addr={postgres_addr}"),
386                ]);
387            }
388            ServerMode::Frontend {
389                http_addr,
390                rpc_bind_addr,
391                mysql_addr,
392                postgres_addr,
393                metasrv_addr,
394            } => {
395                args.extend([
396                    format!("--metasrv-addrs={metasrv_addr}"),
397                    format!("--http-addr={http_addr}"),
398                    format!("--rpc-addr={rpc_bind_addr}"),
399                    // since sqlness run on local, bind addr is the same as server addr
400                    // this is needed so that `cluster_info`'s server addr column can be correct
401                    format!("--rpc-server-addr={rpc_bind_addr}"),
402                    format!("--mysql-addr={mysql_addr}"),
403                    format!("--postgres-addr={postgres_addr}"),
404                    format!(
405                        "--log-dir={}/greptimedb-{}-frontend/logs",
406                        sqlness_home.display(),
407                        id
408                    ),
409                    "-c".to_string(),
410                    self.generate_config_file(sqlness_home, db_ctx, id),
411                ]);
412            }
413            ServerMode::Metasrv {
414                rpc_bind_addr,
415                rpc_server_addr,
416                http_addr,
417            } => {
418                args.extend([
419                    "--bind-addr".to_string(),
420                    rpc_bind_addr.clone(),
421                    "--server-addr".to_string(),
422                    rpc_server_addr.clone(),
423                    "--enable-region-failover".to_string(),
424                    "false".to_string(),
425                    format!("--http-addr={http_addr}"),
426                    format!(
427                        "--log-dir={}/greptimedb-{}-metasrv/logs",
428                        sqlness_home.display(),
429                        id
430                    ),
431                    "-c".to_string(),
432                    self.generate_config_file(sqlness_home, db_ctx, id),
433                ]);
434
435                if matches!(
436                    db_ctx.store_config().setup_pg,
437                    Some(ServiceProvider::Create)
438                ) {
439                    let client_ports = db_ctx
440                        .store_config()
441                        .store_addrs
442                        .iter()
443                        .map(|s| s.split(':').nth(1).unwrap().parse::<u16>().unwrap())
444                        .collect::<Vec<_>>();
445                    let client_port = client_ports.first().unwrap_or(&5432);
446                    let pg_server_addr = format!(
447                        "postgresql://greptimedb:admin@127.0.0.1:{}/postgres",
448                        client_port
449                    );
450                    args.extend(vec!["--backend".to_string(), "postgres-store".to_string()]);
451                    args.extend(vec!["--store-addrs".to_string(), pg_server_addr]);
452                } else if let Some(ServiceProvider::External(connection_string)) =
453                    db_ctx.store_config().setup_pg
454                {
455                    println!("Using external PostgreSQL '{connection_string}' as Kvbackend");
456                    args.extend([
457                        "--backend".to_string(),
458                        "postgres-store".to_string(),
459                        "--store-addrs".to_string(),
460                        connection_string,
461                    ]);
462                } else if matches!(
463                    db_ctx.store_config().setup_mysql,
464                    Some(ServiceProvider::Create)
465                ) {
466                    let client_ports = db_ctx
467                        .store_config()
468                        .store_addrs
469                        .iter()
470                        .map(|s| s.split(':').nth(1).unwrap().parse::<u16>().unwrap())
471                        .collect::<Vec<_>>();
472                    let client_port = client_ports.first().unwrap_or(&3306);
473                    let mysql_server_addr =
474                        format!("mysql://greptimedb:admin@127.0.0.1:{}/mysql", client_port);
475                    args.extend(vec!["--backend".to_string(), "mysql-store".to_string()]);
476                    args.extend(vec!["--store-addrs".to_string(), mysql_server_addr]);
477                } else if let Some(ServiceProvider::External(connection_string)) =
478                    db_ctx.store_config().setup_mysql
479                {
480                    println!("Using external MySQL '{connection_string}' as Kvbackend");
481                    args.extend([
482                        "--backend".to_string(),
483                        "mysql-store".to_string(),
484                        "--store-addrs".to_string(),
485                        connection_string,
486                    ]);
487                } else if db_ctx.store_config().store_addrs.is_empty() {
488                    args.extend(vec!["--backend".to_string(), "memory-store".to_string()])
489                }
490            }
491            ServerMode::Datanode {
492                rpc_bind_addr,
493                rpc_server_addr,
494                http_addr,
495                metasrv_addr,
496                node_id,
497            } => {
498                let data_home = sqlness_home.join(format!(
499                    "greptimedb_{}_datanode_{}_{node_id}",
500                    id,
501                    db_ctx.time()
502                ));
503                args.extend([
504                    format!("--rpc-addr={rpc_bind_addr}"),
505                    format!("--rpc-server-addr={rpc_server_addr}"),
506                    format!("--http-addr={http_addr}"),
507                    format!("--data-home={}", data_home.display()),
508                    format!("--log-dir={}/logs", data_home.display()),
509                    format!("--node-id={node_id}"),
510                    "-c".to_string(),
511                    self.generate_config_file(sqlness_home, db_ctx, id),
512                    format!("--metasrv-addrs={metasrv_addr}"),
513                ]);
514            }
515            ServerMode::Flownode {
516                rpc_bind_addr,
517                rpc_server_addr,
518                http_addr,
519                metasrv_addr,
520                node_id,
521            } => {
522                args.extend([
523                    format!("--rpc-addr={rpc_bind_addr}"),
524                    format!("--rpc-server-addr={rpc_server_addr}"),
525                    format!("--node-id={node_id}"),
526                    format!(
527                        "--log-dir={}/greptimedb-{}-flownode/logs",
528                        sqlness_home.display(),
529                        id
530                    ),
531                    format!("--metasrv-addrs={metasrv_addr}"),
532                    format!("--http-addr={http_addr}"),
533                ]);
534            }
535        }
536
537        args
538    }
539}