1use std::collections::HashMap;
16use std::fmt::Display;
17use std::fs::OpenOptions;
18use std::io;
19use std::io::Write;
20use std::path::{Path, PathBuf};
21use std::process::{Child, Command};
22use std::sync::atomic::{AtomicU32, Ordering};
23use std::sync::{Arc, Mutex};
24use std::time::Duration;
25
26use async_trait::async_trait;
27use sqlness::{Database, EnvController, QueryContext};
28use tokio::sync::Mutex as TokioMutex;
29
30use crate::client::MultiProtocolClient;
31use crate::cmd::bare::ServerAddr;
32use crate::formatter::{ErrorFormatter, MysqlFormatter, OutputFormatter, PostgresqlFormatter};
33use crate::protocol_interceptor::{MYSQL, PROTOCOL_KEY};
34use crate::server_mode::ServerMode;
35use crate::util;
36use crate::util::{PROGRAM, get_workspace_root, maybe_pull_binary};
37
38const SERVER_MODE_STANDALONE_IDX: usize = 0;
40const SERVER_MODE_METASRV_IDX: usize = 0;
42const SERVER_MODE_DATANODE_START_IDX: usize = 1;
43const SERVER_MODE_FRONTEND_IDX: usize = 4;
44const SERVER_MODE_FLOWNODE_IDX: usize = 5;
45
46#[derive(Clone)]
47pub enum WalConfig {
48 RaftEngine,
49 Kafka {
50 needs_kafka_cluster: bool,
53 broker_endpoints: Vec<String>,
54 },
55}
56
57#[derive(Debug, Clone)]
58pub(crate) enum ServiceProvider {
59 Create,
60 External(String),
61}
62
63impl From<&str> for ServiceProvider {
64 fn from(value: &str) -> Self {
65 if value.is_empty() {
66 Self::Create
67 } else {
68 Self::External(value.to_string())
69 }
70 }
71}
72
73#[derive(Clone)]
74pub struct StoreConfig {
75 pub store_addrs: Vec<String>,
76 pub setup_etcd: bool,
77 pub(crate) setup_pg: Option<ServiceProvider>,
78 pub(crate) setup_mysql: Option<ServiceProvider>,
79}
80
81#[derive(Clone)]
82pub struct Env {
83 sqlness_home: PathBuf,
84 server_addrs: ServerAddr,
85 wal: WalConfig,
86
87 bins_dir: Arc<Mutex<Option<PathBuf>>>,
91 versioned_bins_dirs: Arc<Mutex<HashMap<String, PathBuf>>>,
93 pull_version_on_need: bool,
95 store_config: StoreConfig,
97 extra_args: Vec<String>,
99}
100
101#[async_trait]
102impl EnvController for Env {
103 type DB = GreptimeDB;
104
105 async fn start(&self, mode: &str, id: usize, _config: Option<&Path>) -> Self::DB {
106 if self.server_addrs.server_addr.is_some() && id > 0 {
107 panic!("Parallel test mode is not supported when server address is already set.");
108 }
109
110 unsafe {
111 std::env::set_var("SQLNESS_HOME", self.sqlness_home.display().to_string());
112 }
113 match mode {
114 "standalone" => self.start_standalone(id).await,
115 "distributed" => self.start_distributed(id).await,
116 _ => panic!("Unexpected mode: {mode}"),
117 }
118 }
119
120 async fn stop(&self, _mode: &str, mut database: Self::DB) {
122 database.stop();
123 }
124}
125
126impl Env {
127 pub fn new(
128 data_home: PathBuf,
129 server_addrs: ServerAddr,
130 wal: WalConfig,
131 pull_version_on_need: bool,
132 bins_dir: Option<PathBuf>,
133 store_config: StoreConfig,
134 extra_args: Vec<String>,
135 ) -> Self {
136 Self {
137 sqlness_home: data_home,
138 server_addrs,
139 wal,
140 pull_version_on_need,
141 bins_dir: Arc::new(Mutex::new(bins_dir.clone())),
142 versioned_bins_dirs: Arc::new(Mutex::new(HashMap::from_iter([(
143 "latest".to_string(),
144 bins_dir.clone().unwrap_or(util::get_binary_dir("debug")),
145 )]))),
146 store_config,
147 extra_args,
148 }
149 }
150
151 async fn start_standalone(&self, id: usize) -> GreptimeDB {
152 println!("Starting standalone instance id: {id}");
153
154 if self.server_addrs.server_addr.is_some() {
155 self.connect_db(&self.server_addrs, id).await
156 } else {
157 self.build_db();
158 self.setup_wal();
159 let mut db_ctx = GreptimeDBContext::new(self.wal.clone(), self.store_config.clone());
160
161 let server_mode = ServerMode::random_standalone();
162 db_ctx.set_server_mode(server_mode.clone(), SERVER_MODE_STANDALONE_IDX);
163 let server_addr = server_mode.server_addr().unwrap();
164 let server_process = self.start_server(server_mode, &db_ctx, id, true).await;
165
166 let mut greptimedb = self.connect_db(&server_addr, id).await;
167 greptimedb.server_processes = Some(Arc::new(Mutex::new(vec![server_process])));
168 greptimedb.is_standalone = true;
169 greptimedb.ctx = db_ctx;
170
171 greptimedb
172 }
173 }
174
175 async fn start_distributed(&self, id: usize) -> GreptimeDB {
176 if self.server_addrs.server_addr.is_some() {
177 self.connect_db(&self.server_addrs, id).await
178 } else {
179 self.build_db();
180 self.setup_wal();
181 self.setup_etcd();
182 self.setup_pg();
183 self.setup_mysql().await;
184 let mut db_ctx = GreptimeDBContext::new(self.wal.clone(), self.store_config.clone());
185
186 let meta_server_mode = ServerMode::random_metasrv();
188 let metasrv_port = match &meta_server_mode {
189 ServerMode::Metasrv {
190 rpc_server_addr, ..
191 } => rpc_server_addr
192 .split(':')
193 .nth(1)
194 .unwrap()
195 .parse::<u16>()
196 .unwrap(),
197 _ => panic!(
198 "metasrv mode not set, maybe running in remote mode which doesn't support restart?"
199 ),
200 };
201 db_ctx.set_server_mode(meta_server_mode.clone(), SERVER_MODE_METASRV_IDX);
202 let meta_server = self.start_server(meta_server_mode, &db_ctx, id, true).await;
203
204 let datanode_1_mode = ServerMode::random_datanode(metasrv_port, 0);
205 db_ctx.set_server_mode(datanode_1_mode.clone(), SERVER_MODE_DATANODE_START_IDX);
206 let datanode_1 = self.start_server(datanode_1_mode, &db_ctx, id, true).await;
207 let datanode_2_mode = ServerMode::random_datanode(metasrv_port, 1);
208 db_ctx.set_server_mode(datanode_2_mode.clone(), SERVER_MODE_DATANODE_START_IDX + 1);
209 let datanode_2 = self.start_server(datanode_2_mode, &db_ctx, id, true).await;
210 let datanode_3_mode = ServerMode::random_datanode(metasrv_port, 2);
211 db_ctx.set_server_mode(datanode_3_mode.clone(), SERVER_MODE_DATANODE_START_IDX + 2);
212 let datanode_3 = self.start_server(datanode_3_mode, &db_ctx, id, true).await;
213
214 let frontend_mode = ServerMode::random_frontend(metasrv_port);
215 let server_addr = frontend_mode.server_addr().unwrap();
216 db_ctx.set_server_mode(frontend_mode.clone(), SERVER_MODE_FRONTEND_IDX);
217 let frontend = self.start_server(frontend_mode, &db_ctx, id, true).await;
218
219 let flownode_mode = ServerMode::random_flownode(metasrv_port, 0);
220 db_ctx.set_server_mode(flownode_mode.clone(), SERVER_MODE_FLOWNODE_IDX);
221 let flownode = self.start_server(flownode_mode, &db_ctx, id, true).await;
222
223 let mut greptimedb = self.connect_db(&server_addr, id).await;
224
225 greptimedb.metasrv_process = Some(meta_server).into();
226 greptimedb.server_processes = Some(Arc::new(Mutex::new(vec![
227 datanode_1, datanode_2, datanode_3,
228 ])));
229 greptimedb.frontend_process = Some(frontend).into();
230 greptimedb.flownode_process = Some(flownode).into();
231 greptimedb.is_standalone = false;
232 greptimedb.ctx = db_ctx;
233
234 greptimedb
235 }
236 }
237
238 async fn connect_db(&self, server_addr: &ServerAddr, id: usize) -> GreptimeDB {
239 let grpc_server_addr = server_addr.server_addr.as_ref().unwrap();
240 let pg_server_addr = server_addr.pg_server_addr.as_ref().unwrap();
241 let mysql_server_addr = server_addr.mysql_server_addr.as_ref().unwrap();
242
243 let client =
244 MultiProtocolClient::connect(grpc_server_addr, pg_server_addr, mysql_server_addr).await;
245 GreptimeDB {
246 client: TokioMutex::new(client),
247 server_processes: None,
248 metasrv_process: None.into(),
249 frontend_process: None.into(),
250 flownode_process: None.into(),
251 ctx: GreptimeDBContext {
252 time: 0,
253 datanode_id: Default::default(),
254 wal: self.wal.clone(),
255 store_config: self.store_config.clone(),
256 server_modes: Vec::new(),
257 },
258 is_standalone: false,
259 env: self.clone(),
260 id,
261 }
262 }
263
264 fn stop_server(process: &mut Child) {
265 let _ = process.kill();
266 let _ = process.wait();
267 }
268
269 async fn start_server(
270 &self,
271 mode: ServerMode,
272 db_ctx: &GreptimeDBContext,
273 id: usize,
274 truncate_log: bool,
275 ) -> Child {
276 let log_file_name = match mode {
277 ServerMode::Datanode { node_id, .. } => {
278 db_ctx.incr_datanode_id();
279 format!("greptime-{}-sqlness-datanode-{}.log", id, node_id)
280 }
281 ServerMode::Flownode { .. } => format!("greptime-{}-sqlness-flownode.log", id),
282 ServerMode::Frontend { .. } => format!("greptime-{}-sqlness-frontend.log", id),
283 ServerMode::Metasrv { .. } => format!("greptime-{}-sqlness-metasrv.log", id),
284 ServerMode::Standalone { .. } => format!("greptime-{}-sqlness-standalone.log", id),
285 };
286 let stdout_file_name = self.sqlness_home.join(log_file_name).display().to_string();
287
288 println!("DB instance {id} log file at {stdout_file_name}");
289
290 let stdout_file = OpenOptions::new()
291 .create(true)
292 .write(true)
293 .truncate(truncate_log)
294 .append(!truncate_log)
295 .open(stdout_file_name)
296 .unwrap();
297
298 let args = mode.get_args(&self.sqlness_home, self, db_ctx, id);
299 let check_ip_addrs = mode.check_addrs();
300
301 for check_ip_addr in &check_ip_addrs {
302 if util::check_port(check_ip_addr.parse().unwrap(), Duration::from_secs(1)).await {
303 panic!(
304 "Port {check_ip_addr} is already in use, please check and retry.",
305 check_ip_addr = check_ip_addr
306 );
307 }
308 }
309
310 let program = PROGRAM;
311
312 let bins_dir = self.bins_dir.lock().unwrap().clone().expect(
313 "GreptimeDB binary is not available. Please pass in the path to the directory that contains the pre-built GreptimeDB binary. Or you may call `self.build_db()` beforehand.",
314 );
315
316 let abs_bins_dir = bins_dir
317 .canonicalize()
318 .expect("Failed to canonicalize bins_dir");
319
320 let mut process = Command::new(abs_bins_dir.join(program))
321 .current_dir(bins_dir.clone())
322 .env("TZ", "UTC")
323 .args(args)
324 .stdout(stdout_file)
325 .spawn()
326 .unwrap_or_else(|error| {
327 panic!(
328 "Failed to start the DB with subcommand {}, Error: {error}, path: {:?}",
329 mode.name(),
330 bins_dir.join(program)
331 );
332 });
333
334 for check_ip_addr in &check_ip_addrs {
335 if !util::check_port(check_ip_addr.parse().unwrap(), Duration::from_secs(10)).await {
336 Env::stop_server(&mut process);
337 panic!("{} doesn't up in 10 seconds, quit.", mode.name())
338 }
339 }
340
341 process
342 }
343
344 async fn restart_server(&self, db: &GreptimeDB, is_full_restart: bool) {
346 {
347 if let Some(server_process) = db.server_processes.clone() {
348 let mut server_processes = server_process.lock().unwrap();
349 for server_process in server_processes.iter_mut() {
350 Env::stop_server(server_process);
351 }
352 }
353
354 if is_full_restart {
355 if let Some(mut metasrv_process) =
356 db.metasrv_process.lock().expect("poisoned lock").take()
357 {
358 Env::stop_server(&mut metasrv_process);
359 }
360 if let Some(mut frontend_process) =
361 db.frontend_process.lock().expect("poisoned lock").take()
362 {
363 Env::stop_server(&mut frontend_process);
364 }
365 }
366
367 if let Some(mut flownode_process) =
368 db.flownode_process.lock().expect("poisoned lock").take()
369 {
370 Env::stop_server(&mut flownode_process);
371 }
372 }
373
374 let new_server_processes = if db.is_standalone {
376 let server_mode = db
377 .ctx
378 .get_server_mode(SERVER_MODE_STANDALONE_IDX)
379 .cloned()
380 .unwrap();
381 let server_addr = server_mode.server_addr().unwrap();
382 let new_server_process = self.start_server(server_mode, &db.ctx, db.id, false).await;
383
384 let mut client = db.client.lock().await;
385 client
386 .reconnect_mysql_client(&server_addr.mysql_server_addr.unwrap())
387 .await;
388 client
389 .reconnect_pg_client(&server_addr.pg_server_addr.unwrap())
390 .await;
391 vec![new_server_process]
392 } else {
393 db.ctx.reset_datanode_id();
394 if is_full_restart {
395 let metasrv_mode = db
396 .ctx
397 .get_server_mode(SERVER_MODE_METASRV_IDX)
398 .cloned()
399 .unwrap();
400 let metasrv = self.start_server(metasrv_mode, &db.ctx, db.id, false).await;
401 db.metasrv_process
402 .lock()
403 .expect("lock poisoned")
404 .replace(metasrv);
405
406 tokio::time::sleep(Duration::from_secs(5)).await;
409 }
410
411 let mut processes = vec![];
412 for i in 0..3 {
413 let datanode_mode = db
414 .ctx
415 .get_server_mode(SERVER_MODE_DATANODE_START_IDX + i)
416 .cloned()
417 .unwrap();
418 let new_server_process = self
419 .start_server(datanode_mode, &db.ctx, db.id, false)
420 .await;
421 processes.push(new_server_process);
422 }
423
424 if is_full_restart {
425 let frontend_mode = db
426 .ctx
427 .get_server_mode(SERVER_MODE_FRONTEND_IDX)
428 .cloned()
429 .unwrap();
430 let frontend = self
431 .start_server(frontend_mode, &db.ctx, db.id, false)
432 .await;
433 db.frontend_process
434 .lock()
435 .expect("lock poisoned")
436 .replace(frontend);
437 }
438
439 let flownode_mode = db
440 .ctx
441 .get_server_mode(SERVER_MODE_FLOWNODE_IDX)
442 .cloned()
443 .unwrap();
444 let flownode = self
445 .start_server(flownode_mode, &db.ctx, db.id, false)
446 .await;
447 db.flownode_process
448 .lock()
449 .expect("lock poisoned")
450 .replace(flownode);
451
452 processes
453 };
454
455 if let Some(server_processes) = db.server_processes.clone() {
456 let mut server_processes = server_processes.lock().unwrap();
457 *server_processes = new_server_processes;
458 }
459 }
460
461 fn setup_wal(&self) {
463 if matches!(self.wal, WalConfig::Kafka { needs_kafka_cluster, .. } if needs_kafka_cluster) {
464 util::setup_wal();
465 }
466 }
467
468 fn setup_etcd(&self) {
470 if self.store_config.setup_etcd {
471 let client_ports = self
472 .store_config
473 .store_addrs
474 .iter()
475 .map(|s| s.split(':').nth(1).unwrap().parse::<u16>().unwrap())
476 .collect::<Vec<_>>();
477 util::setup_etcd(client_ports, None, None);
478 }
479 }
480
481 fn setup_pg(&self) {
483 if matches!(self.store_config.setup_pg, Some(ServiceProvider::Create)) {
484 let client_ports = self
485 .store_config
486 .store_addrs
487 .iter()
488 .map(|s| s.split(':').nth(1).unwrap().parse::<u16>().unwrap())
489 .collect::<Vec<_>>();
490 let client_port = client_ports.first().unwrap_or(&5432);
491 util::setup_pg(*client_port, None);
492 }
493 }
494
495 async fn setup_mysql(&self) {
497 if matches!(self.store_config.setup_mysql, Some(ServiceProvider::Create)) {
498 let client_ports = self
499 .store_config
500 .store_addrs
501 .iter()
502 .map(|s| s.split(':').nth(1).unwrap().parse::<u16>().unwrap())
503 .collect::<Vec<_>>();
504 let client_port = client_ports.first().unwrap_or(&3306);
505 util::setup_mysql(*client_port, None);
506
507 tokio::time::sleep(Duration::from_secs(10)).await;
509 }
510 }
511
512 fn build_db(&self) {
514 if self.bins_dir.lock().unwrap().is_some() {
515 return;
516 }
517
518 println!("Going to build the DB...");
519 let output = Command::new("cargo")
520 .current_dir(util::get_workspace_root())
521 .args([
522 "build",
523 "--bin",
524 "greptime",
525 "--features",
526 "pg_kvbackend,mysql_kvbackend",
527 ])
528 .output()
529 .expect("Failed to start GreptimeDB");
530 if !output.status.success() {
531 println!("Failed to build GreptimeDB, {}", output.status);
532 println!("Cargo build stdout:");
533 io::stdout().write_all(&output.stdout).unwrap();
534 println!("Cargo build stderr:");
535 io::stderr().write_all(&output.stderr).unwrap();
536 panic!();
537 }
538
539 let _ = self
540 .bins_dir
541 .lock()
542 .unwrap()
543 .insert(util::get_binary_dir("debug"));
544 }
545
546 pub(crate) fn extra_args(&self) -> &Vec<String> {
547 &self.extra_args
548 }
549}
550
551pub struct GreptimeDB {
552 server_processes: Option<Arc<Mutex<Vec<Child>>>>,
553 metasrv_process: Mutex<Option<Child>>,
554 frontend_process: Mutex<Option<Child>>,
555 flownode_process: Mutex<Option<Child>>,
556 client: TokioMutex<MultiProtocolClient>,
557 ctx: GreptimeDBContext,
558 is_standalone: bool,
559 env: Env,
560 id: usize,
561}
562
563impl GreptimeDB {
564 async fn postgres_query(&self, _ctx: QueryContext, query: String) -> Box<dyn Display> {
565 let mut client = self.client.lock().await;
566
567 match client.postgres_query(&query).await {
568 Ok(rows) => Box::new(PostgresqlFormatter::from(rows)),
569 Err(e) => Box::new(e),
570 }
571 }
572
573 async fn mysql_query(&self, _ctx: QueryContext, query: String) -> Box<dyn Display> {
574 let mut client = self.client.lock().await;
575
576 match client.mysql_query(&query).await {
577 Ok(res) => Box::new(MysqlFormatter::from(res)),
578 Err(e) => Box::new(e),
579 }
580 }
581
582 async fn grpc_query(&self, _ctx: QueryContext, query: String) -> Box<dyn Display> {
583 let mut client = self.client.lock().await;
584
585 match client.grpc_query(&query).await {
586 Ok(rows) => Box::new(OutputFormatter::from(rows)),
587 Err(e) => Box::new(ErrorFormatter::from(e)),
588 }
589 }
590}
591
592#[async_trait]
593impl Database for GreptimeDB {
594 async fn query(&self, ctx: QueryContext, query: String) -> Box<dyn Display> {
595 if ctx.context.contains_key("restart") && self.env.server_addrs.server_addr.is_none() {
596 self.env.restart_server(self, false).await;
597 } else if let Some(version) = ctx.context.get("version") {
598 let version_bin_dir = self
599 .env
600 .versioned_bins_dirs
601 .lock()
602 .expect("lock poison")
603 .get(version.as_str())
604 .cloned();
605
606 match version_bin_dir {
607 Some(path) if path.clone().join(PROGRAM).is_file() => {
608 *self.env.bins_dir.lock().unwrap() = Some(path.clone());
610 }
611 _ => {
612 maybe_pull_binary(version, self.env.pull_version_on_need).await;
614 let root = get_workspace_root();
615 let new_path = PathBuf::from_iter([&root, version]);
616 *self.env.bins_dir.lock().unwrap() = Some(new_path);
617 }
618 }
619
620 self.env.restart_server(self, true).await;
621 tokio::time::sleep(Duration::from_secs(5)).await;
623 }
624
625 if let Some(protocol) = ctx.context.get(PROTOCOL_KEY) {
626 if protocol == MYSQL {
628 self.mysql_query(ctx, query).await
629 } else {
630 self.postgres_query(ctx, query).await
631 }
632 } else {
633 self.grpc_query(ctx, query).await
634 }
635 }
636}
637
638impl GreptimeDB {
639 fn stop(&mut self) {
640 if let Some(server_processes) = self.server_processes.clone() {
641 let mut server_processes = server_processes.lock().unwrap();
642 for mut server_process in server_processes.drain(..) {
643 Env::stop_server(&mut server_process);
644 println!(
645 "Standalone or Datanode (pid = {}) is stopped",
646 server_process.id()
647 );
648 }
649 }
650 if let Some(mut metasrv) = self
651 .metasrv_process
652 .lock()
653 .expect("someone else panic when holding lock")
654 .take()
655 {
656 Env::stop_server(&mut metasrv);
657 println!("Metasrv (pid = {}) is stopped", metasrv.id());
658 }
659 if let Some(mut frontend) = self
660 .frontend_process
661 .lock()
662 .expect("someone else panic when holding lock")
663 .take()
664 {
665 Env::stop_server(&mut frontend);
666 println!("Frontend (pid = {}) is stopped", frontend.id());
667 }
668 if let Some(mut flownode) = self
669 .flownode_process
670 .lock()
671 .expect("someone else panic when holding lock")
672 .take()
673 {
674 Env::stop_server(&mut flownode);
675 println!("Flownode (pid = {}) is stopped", flownode.id());
676 }
677 if matches!(self.ctx.wal, WalConfig::Kafka { needs_kafka_cluster, .. } if needs_kafka_cluster)
678 {
679 util::teardown_wal();
680 }
681 }
682}
683
684impl Drop for GreptimeDB {
685 fn drop(&mut self) {
686 if self.env.server_addrs.server_addr.is_none() {
687 self.stop();
688 }
689 }
690}
691
692pub struct GreptimeDBContext {
693 time: i64,
695 datanode_id: AtomicU32,
696 wal: WalConfig,
697 store_config: StoreConfig,
698 server_modes: Vec<ServerMode>,
699}
700
701impl GreptimeDBContext {
702 pub fn new(wal: WalConfig, store_config: StoreConfig) -> Self {
703 Self {
704 time: common_time::util::current_time_millis(),
705 datanode_id: AtomicU32::new(0),
706 wal,
707 store_config,
708 server_modes: Vec::new(),
709 }
710 }
711
712 pub(crate) fn time(&self) -> i64 {
713 self.time
714 }
715
716 pub fn is_raft_engine(&self) -> bool {
717 matches!(self.wal, WalConfig::RaftEngine)
718 }
719
720 pub fn kafka_wal_broker_endpoints(&self) -> String {
721 match &self.wal {
722 WalConfig::RaftEngine => String::new(),
723 WalConfig::Kafka {
724 broker_endpoints, ..
725 } => serde_json::to_string(&broker_endpoints).unwrap(),
726 }
727 }
728
729 fn incr_datanode_id(&self) {
730 let _ = self.datanode_id.fetch_add(1, Ordering::Relaxed);
731 }
732
733 fn reset_datanode_id(&self) {
734 self.datanode_id.store(0, Ordering::Relaxed);
735 }
736
737 pub(crate) fn store_config(&self) -> StoreConfig {
738 self.store_config.clone()
739 }
740
741 fn set_server_mode(&mut self, mode: ServerMode, idx: usize) {
742 if idx >= self.server_modes.len() {
743 self.server_modes.resize(idx + 1, mode.clone());
744 }
745 self.server_modes[idx] = mode;
746 }
747
748 fn get_server_mode(&self, idx: usize) -> Option<&ServerMode> {
749 self.server_modes.get(idx)
750 }
751}