1use std::borrow::Cow;
16use std::collections::HashMap;
17use std::fmt::Display;
18use std::fs::OpenOptions;
19use std::io;
20use std::io::Write;
21use std::net::SocketAddr;
22use std::path::{Path, PathBuf};
23use std::process::{Child, Command};
24use std::sync::atomic::{AtomicU32, Ordering};
25use std::sync::{Arc, Mutex};
26use std::time::Duration;
27
28use async_trait::async_trait;
29use client::error::ServerSnafu;
30use client::{
31 Client, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, Database as DB, Error as ClientError,
32};
33use common_error::ext::ErrorExt;
34use common_query::{Output, OutputData};
35use common_recordbatch::RecordBatches;
36use datatypes::data_type::ConcreteDataType;
37use datatypes::scalars::ScalarVectorBuilder;
38use datatypes::schema::{ColumnSchema, Schema};
39use datatypes::vectors::{StringVectorBuilder, VectorRef};
40use mysql::prelude::Queryable;
41use mysql::{Conn as MySqlClient, Row as MySqlRow};
42use sqlness::{Database, EnvController, QueryContext};
43use tokio::sync::Mutex as TokioMutex;
44use tokio_postgres::{Client as PgClient, SimpleQueryMessage as PgRow};
45
46use crate::protocol_interceptor::{MYSQL, PROTOCOL_KEY};
47use crate::server_mode::ServerMode;
48use crate::util::{PROGRAM, get_workspace_root, maybe_pull_binary};
49use crate::{ServerAddr, util};
50
51const SERVER_MODE_STANDALONE_IDX: usize = 0;
53const SERVER_MODE_METASRV_IDX: usize = 0;
55const SERVER_MODE_DATANODE_START_IDX: usize = 1;
56const SERVER_MODE_FRONTEND_IDX: usize = 4;
57const SERVER_MODE_FLOWNODE_IDX: usize = 5;
58
59#[derive(Clone)]
60pub enum WalConfig {
61 RaftEngine,
62 Kafka {
63 needs_kafka_cluster: bool,
66 broker_endpoints: Vec<String>,
67 },
68}
69
70#[derive(Clone)]
71pub struct StoreConfig {
72 pub store_addrs: Vec<String>,
73 pub setup_etcd: bool,
74 pub setup_pg: bool,
75 pub setup_mysql: bool,
76}
77
78#[derive(Clone)]
79pub struct Env {
80 sqlness_home: PathBuf,
81 server_addrs: ServerAddr,
82 wal: WalConfig,
83
84 bins_dir: Arc<Mutex<Option<PathBuf>>>,
88 versioned_bins_dirs: Arc<Mutex<HashMap<String, PathBuf>>>,
90 pull_version_on_need: bool,
92 store_config: StoreConfig,
94}
95
96#[async_trait]
97impl EnvController for Env {
98 type DB = GreptimeDB;
99
100 async fn start(&self, mode: &str, id: usize, _config: Option<&Path>) -> Self::DB {
101 if self.server_addrs.server_addr.is_some() && id > 0 {
102 panic!("Parallel test mode is not supported when server address is already set.");
103 }
104
105 unsafe {
106 std::env::set_var("SQLNESS_HOME", self.sqlness_home.display().to_string());
107 }
108 match mode {
109 "standalone" => self.start_standalone(id).await,
110 "distributed" => self.start_distributed(id).await,
111 _ => panic!("Unexpected mode: {mode}"),
112 }
113 }
114
115 async fn stop(&self, _mode: &str, mut database: Self::DB) {
117 database.stop();
118 }
119}
120
121impl Env {
122 pub fn new(
123 data_home: PathBuf,
124 server_addrs: ServerAddr,
125 wal: WalConfig,
126 pull_version_on_need: bool,
127 bins_dir: Option<PathBuf>,
128 store_config: StoreConfig,
129 ) -> Self {
130 Self {
131 sqlness_home: data_home,
132 server_addrs,
133 wal,
134 pull_version_on_need,
135 bins_dir: Arc::new(Mutex::new(bins_dir.clone())),
136 versioned_bins_dirs: Arc::new(Mutex::new(HashMap::from_iter([(
137 "latest".to_string(),
138 bins_dir.clone().unwrap_or(util::get_binary_dir("debug")),
139 )]))),
140 store_config,
141 }
142 }
143
144 async fn start_standalone(&self, id: usize) -> GreptimeDB {
145 println!("Starting standalone instance id: {id}");
146
147 if self.server_addrs.server_addr.is_some() {
148 self.connect_db(&self.server_addrs, id).await
149 } else {
150 self.build_db();
151 self.setup_wal();
152 let mut db_ctx = GreptimeDBContext::new(self.wal.clone(), self.store_config.clone());
153
154 let server_mode = ServerMode::random_standalone();
155 db_ctx.set_server_mode(server_mode.clone(), SERVER_MODE_STANDALONE_IDX);
156 let server_addr = server_mode.server_addr().unwrap();
157 let server_process = self.start_server(server_mode, &db_ctx, id, true).await;
158
159 let mut greptimedb = self.connect_db(&server_addr, id).await;
160 greptimedb.server_processes = Some(Arc::new(Mutex::new(vec![server_process])));
161 greptimedb.is_standalone = true;
162 greptimedb.ctx = db_ctx;
163
164 greptimedb
165 }
166 }
167
168 async fn start_distributed(&self, id: usize) -> GreptimeDB {
169 if self.server_addrs.server_addr.is_some() {
170 self.connect_db(&self.server_addrs, id).await
171 } else {
172 self.build_db();
173 self.setup_wal();
174 self.setup_etcd();
175 self.setup_pg();
176 self.setup_mysql().await;
177 let mut db_ctx = GreptimeDBContext::new(self.wal.clone(), self.store_config.clone());
178
179 let meta_server_mode = ServerMode::random_metasrv();
181 let metasrv_port = match &meta_server_mode {
182 ServerMode::Metasrv {
183 rpc_server_addr, ..
184 } => rpc_server_addr
185 .split(':')
186 .nth(1)
187 .unwrap()
188 .parse::<u16>()
189 .unwrap(),
190 _ => panic!(
191 "metasrv mode not set, maybe running in remote mode which doesn't support restart?"
192 ),
193 };
194 db_ctx.set_server_mode(meta_server_mode.clone(), SERVER_MODE_METASRV_IDX);
195 let meta_server = self.start_server(meta_server_mode, &db_ctx, id, true).await;
196
197 let datanode_1_mode = ServerMode::random_datanode(metasrv_port, 0);
198 db_ctx.set_server_mode(datanode_1_mode.clone(), SERVER_MODE_DATANODE_START_IDX);
199 let datanode_1 = self.start_server(datanode_1_mode, &db_ctx, id, true).await;
200 let datanode_2_mode = ServerMode::random_datanode(metasrv_port, 1);
201 db_ctx.set_server_mode(datanode_2_mode.clone(), SERVER_MODE_DATANODE_START_IDX + 1);
202 let datanode_2 = self.start_server(datanode_2_mode, &db_ctx, id, true).await;
203 let datanode_3_mode = ServerMode::random_datanode(metasrv_port, 2);
204 db_ctx.set_server_mode(datanode_3_mode.clone(), SERVER_MODE_DATANODE_START_IDX + 2);
205 let datanode_3 = self.start_server(datanode_3_mode, &db_ctx, id, true).await;
206
207 let frontend_mode = ServerMode::random_frontend(metasrv_port);
208 let server_addr = frontend_mode.server_addr().unwrap();
209 db_ctx.set_server_mode(frontend_mode.clone(), SERVER_MODE_FRONTEND_IDX);
210 let frontend = self.start_server(frontend_mode, &db_ctx, id, true).await;
211
212 let flownode_mode = ServerMode::random_flownode(metasrv_port, 0);
213 db_ctx.set_server_mode(flownode_mode.clone(), SERVER_MODE_FLOWNODE_IDX);
214 let flownode = self.start_server(flownode_mode, &db_ctx, id, true).await;
215
216 let mut greptimedb = self.connect_db(&server_addr, id).await;
217
218 greptimedb.metasrv_process = Some(meta_server).into();
219 greptimedb.server_processes = Some(Arc::new(Mutex::new(vec![
220 datanode_1, datanode_2, datanode_3,
221 ])));
222 greptimedb.frontend_process = Some(frontend).into();
223 greptimedb.flownode_process = Some(flownode).into();
224 greptimedb.is_standalone = false;
225 greptimedb.ctx = db_ctx;
226
227 greptimedb
228 }
229 }
230
231 async fn create_pg_client(&self, pg_server_addr: &str) -> PgClient {
232 let sockaddr: SocketAddr = pg_server_addr.parse().expect(
233 "Failed to parse the Postgres server address. Please check if the address is in the format of `ip:port`.",
234 );
235 let mut config = tokio_postgres::config::Config::new();
236 config.host(sockaddr.ip().to_string());
237 config.port(sockaddr.port());
238 config.dbname(DEFAULT_SCHEMA_NAME);
239
240 const MAX_RETRY: usize = 3;
242 let mut backoff = Duration::from_millis(500);
243 for _ in 0..MAX_RETRY {
244 if let Ok((pg_client, conn)) = config.connect(tokio_postgres::NoTls).await {
245 tokio::spawn(conn);
246 return pg_client;
247 }
248 tokio::time::sleep(backoff).await;
249 backoff *= 2;
250 }
251 panic!("Failed to connect to Postgres server. Please check if the server is running.");
252 }
253
254 async fn create_mysql_client(&self, mysql_server_addr: &str) -> MySqlClient {
255 let sockaddr: SocketAddr = mysql_server_addr.parse().expect(
256 "Failed to parse the MySQL server address. Please check if the address is in the format of `ip:port`.",
257 );
258 let ops = mysql::OptsBuilder::new()
259 .ip_or_hostname(Some(sockaddr.ip().to_string()))
260 .tcp_port(sockaddr.port())
261 .db_name(Some(DEFAULT_SCHEMA_NAME));
262 const MAX_RETRY: usize = 3;
264 let mut backoff = Duration::from_millis(500);
265
266 for _ in 0..MAX_RETRY {
267 if let Ok(client) = mysql::Conn::new(ops.clone()) {
269 return client;
270 }
271 tokio::time::sleep(backoff).await;
272 backoff *= 2;
273 }
274
275 panic!("Failed to connect to MySQL server. Please check if the server is running.")
276 }
277
278 async fn connect_db(&self, server_addr: &ServerAddr, id: usize) -> GreptimeDB {
279 let grpc_server_addr = server_addr.server_addr.clone().unwrap();
280 let pg_server_addr = server_addr.pg_server_addr.clone().unwrap();
281 let mysql_server_addr = server_addr.mysql_server_addr.clone().unwrap();
282
283 let grpc_client = Client::with_urls(vec![grpc_server_addr.clone()]);
284 let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, grpc_client);
285 let pg_client = self.create_pg_client(&pg_server_addr).await;
286 let mysql_client = self.create_mysql_client(&mysql_server_addr).await;
287
288 GreptimeDB {
289 grpc_client: TokioMutex::new(db),
290 pg_client: TokioMutex::new(pg_client),
291 mysql_client: TokioMutex::new(mysql_client),
292 server_processes: None,
293 metasrv_process: None.into(),
294 frontend_process: None.into(),
295 flownode_process: None.into(),
296 ctx: GreptimeDBContext {
297 time: 0,
298 datanode_id: Default::default(),
299 wal: self.wal.clone(),
300 store_config: self.store_config.clone(),
301 server_modes: Vec::new(),
302 },
303 is_standalone: false,
304 env: self.clone(),
305 id,
306 }
307 }
308
309 fn stop_server(process: &mut Child) {
310 let _ = process.kill();
311 let _ = process.wait();
312 }
313
314 async fn start_server(
315 &self,
316 mode: ServerMode,
317 db_ctx: &GreptimeDBContext,
318 id: usize,
319 truncate_log: bool,
320 ) -> Child {
321 let log_file_name = match mode {
322 ServerMode::Datanode { node_id, .. } => {
323 db_ctx.incr_datanode_id();
324 format!("greptime-{}-sqlness-datanode-{}.log", id, node_id)
325 }
326 ServerMode::Flownode { .. } => format!("greptime-{}-sqlness-flownode.log", id),
327 ServerMode::Frontend { .. } => format!("greptime-{}-sqlness-frontend.log", id),
328 ServerMode::Metasrv { .. } => format!("greptime-{}-sqlness-metasrv.log", id),
329 ServerMode::Standalone { .. } => format!("greptime-{}-sqlness-standalone.log", id),
330 };
331 let stdout_file_name = self.sqlness_home.join(log_file_name).display().to_string();
332
333 println!("DB instance {id} log file at {stdout_file_name}");
334
335 let stdout_file = OpenOptions::new()
336 .create(true)
337 .write(true)
338 .truncate(truncate_log)
339 .append(!truncate_log)
340 .open(stdout_file_name)
341 .unwrap();
342
343 let args = mode.get_args(&self.sqlness_home, self, db_ctx, id);
344 let check_ip_addrs = mode.check_addrs();
345
346 for check_ip_addr in &check_ip_addrs {
347 if util::check_port(check_ip_addr.parse().unwrap(), Duration::from_secs(1)).await {
348 panic!(
349 "Port {check_ip_addr} is already in use, please check and retry.",
350 check_ip_addr = check_ip_addr
351 );
352 }
353 }
354
355 let program = PROGRAM;
356
357 let bins_dir = self.bins_dir.lock().unwrap().clone().expect(
358 "GreptimeDB binary is not available. Please pass in the path to the directory that contains the pre-built GreptimeDB binary. Or you may call `self.build_db()` beforehand.",
359 );
360
361 let abs_bins_dir = bins_dir
362 .canonicalize()
363 .expect("Failed to canonicalize bins_dir");
364
365 let mut process = Command::new(abs_bins_dir.join(program))
366 .current_dir(bins_dir.clone())
367 .env("TZ", "UTC")
368 .args(args)
369 .stdout(stdout_file)
370 .spawn()
371 .unwrap_or_else(|error| {
372 panic!(
373 "Failed to start the DB with subcommand {}, Error: {error}, path: {:?}",
374 mode.name(),
375 bins_dir.join(program)
376 );
377 });
378
379 for check_ip_addr in &check_ip_addrs {
380 if !util::check_port(check_ip_addr.parse().unwrap(), Duration::from_secs(10)).await {
381 Env::stop_server(&mut process);
382 panic!("{} doesn't up in 10 seconds, quit.", mode.name())
383 }
384 }
385
386 process
387 }
388
389 async fn restart_server(&self, db: &GreptimeDB, is_full_restart: bool) {
391 {
392 if let Some(server_process) = db.server_processes.clone() {
393 let mut server_processes = server_process.lock().unwrap();
394 for server_process in server_processes.iter_mut() {
395 Env::stop_server(server_process);
396 }
397 }
398
399 if is_full_restart {
400 if let Some(mut metasrv_process) =
401 db.metasrv_process.lock().expect("poisoned lock").take()
402 {
403 Env::stop_server(&mut metasrv_process);
404 }
405 if let Some(mut frontend_process) =
406 db.frontend_process.lock().expect("poisoned lock").take()
407 {
408 Env::stop_server(&mut frontend_process);
409 }
410 }
411
412 if let Some(mut flownode_process) =
413 db.flownode_process.lock().expect("poisoned lock").take()
414 {
415 Env::stop_server(&mut flownode_process);
416 }
417 }
418
419 let new_server_processes = if db.is_standalone {
421 let server_mode = db
422 .ctx
423 .get_server_mode(SERVER_MODE_STANDALONE_IDX)
424 .cloned()
425 .unwrap();
426 let server_addr = server_mode.server_addr().unwrap();
427 let new_server_process = self.start_server(server_mode, &db.ctx, db.id, false).await;
428
429 *db.pg_client.lock().await = self
430 .create_pg_client(&server_addr.pg_server_addr.unwrap())
431 .await;
432 *db.mysql_client.lock().await = self
433 .create_mysql_client(&server_addr.mysql_server_addr.unwrap())
434 .await;
435 vec![new_server_process]
436 } else {
437 db.ctx.reset_datanode_id();
438 if is_full_restart {
439 let metasrv_mode = db
440 .ctx
441 .get_server_mode(SERVER_MODE_METASRV_IDX)
442 .cloned()
443 .unwrap();
444 let metasrv = self.start_server(metasrv_mode, &db.ctx, db.id, false).await;
445 db.metasrv_process
446 .lock()
447 .expect("lock poisoned")
448 .replace(metasrv);
449
450 tokio::time::sleep(Duration::from_secs(5)).await;
453 }
454
455 let mut processes = vec![];
456 for i in 0..3 {
457 let datanode_mode = db
458 .ctx
459 .get_server_mode(SERVER_MODE_DATANODE_START_IDX + i)
460 .cloned()
461 .unwrap();
462 let new_server_process = self
463 .start_server(datanode_mode, &db.ctx, db.id, false)
464 .await;
465 processes.push(new_server_process);
466 }
467
468 if is_full_restart {
469 let frontend_mode = db
470 .ctx
471 .get_server_mode(SERVER_MODE_FRONTEND_IDX)
472 .cloned()
473 .unwrap();
474 let frontend = self
475 .start_server(frontend_mode, &db.ctx, db.id, false)
476 .await;
477 db.frontend_process
478 .lock()
479 .expect("lock poisoned")
480 .replace(frontend);
481 }
482
483 let flownode_mode = db
484 .ctx
485 .get_server_mode(SERVER_MODE_FLOWNODE_IDX)
486 .cloned()
487 .unwrap();
488 let flownode = self
489 .start_server(flownode_mode, &db.ctx, db.id, false)
490 .await;
491 db.flownode_process
492 .lock()
493 .expect("lock poisoned")
494 .replace(flownode);
495
496 processes
497 };
498
499 if let Some(server_processes) = db.server_processes.clone() {
500 let mut server_processes = server_processes.lock().unwrap();
501 *server_processes = new_server_processes;
502 }
503 }
504
505 fn setup_wal(&self) {
507 if matches!(self.wal, WalConfig::Kafka { needs_kafka_cluster, .. } if needs_kafka_cluster) {
508 util::setup_wal();
509 }
510 }
511
512 fn setup_etcd(&self) {
514 if self.store_config.setup_etcd {
515 let client_ports = self
516 .store_config
517 .store_addrs
518 .iter()
519 .map(|s| s.split(':').nth(1).unwrap().parse::<u16>().unwrap())
520 .collect::<Vec<_>>();
521 util::setup_etcd(client_ports, None, None);
522 }
523 }
524
525 fn setup_pg(&self) {
527 if self.store_config.setup_pg {
528 let client_ports = self
529 .store_config
530 .store_addrs
531 .iter()
532 .map(|s| s.split(':').nth(1).unwrap().parse::<u16>().unwrap())
533 .collect::<Vec<_>>();
534 let client_port = client_ports.first().unwrap_or(&5432);
535 util::setup_pg(*client_port, None);
536 }
537 }
538
539 async fn setup_mysql(&self) {
541 if self.store_config.setup_mysql {
542 let client_ports = self
543 .store_config
544 .store_addrs
545 .iter()
546 .map(|s| s.split(':').nth(1).unwrap().parse::<u16>().unwrap())
547 .collect::<Vec<_>>();
548 let client_port = client_ports.first().unwrap_or(&3306);
549 util::setup_mysql(*client_port, None);
550
551 tokio::time::sleep(Duration::from_secs(10)).await;
553 }
554 }
555
556 fn build_db(&self) {
558 if self.bins_dir.lock().unwrap().is_some() {
559 return;
560 }
561
562 println!("Going to build the DB...");
563 let output = Command::new("cargo")
564 .current_dir(util::get_workspace_root())
565 .args([
566 "build",
567 "--bin",
568 "greptime",
569 "--features",
570 "pg_kvbackend,mysql_kvbackend",
571 ])
572 .output()
573 .expect("Failed to start GreptimeDB");
574 if !output.status.success() {
575 println!("Failed to build GreptimeDB, {}", output.status);
576 println!("Cargo build stdout:");
577 io::stdout().write_all(&output.stdout).unwrap();
578 println!("Cargo build stderr:");
579 io::stderr().write_all(&output.stderr).unwrap();
580 panic!();
581 }
582
583 let _ = self
584 .bins_dir
585 .lock()
586 .unwrap()
587 .insert(util::get_binary_dir("debug"));
588 }
589}
590
591pub struct GreptimeDB {
592 server_processes: Option<Arc<Mutex<Vec<Child>>>>,
593 metasrv_process: Mutex<Option<Child>>,
594 frontend_process: Mutex<Option<Child>>,
595 flownode_process: Mutex<Option<Child>>,
596 grpc_client: TokioMutex<DB>,
597 pg_client: TokioMutex<PgClient>,
598 mysql_client: TokioMutex<MySqlClient>,
599 ctx: GreptimeDBContext,
600 is_standalone: bool,
601 env: Env,
602 id: usize,
603}
604
605impl GreptimeDB {
606 async fn postgres_query(&self, _ctx: QueryContext, query: String) -> Box<dyn Display> {
607 let client = self.pg_client.lock().await;
608 match client.simple_query(&query).await {
609 Ok(rows) => Box::new(PostgresqlFormatter { rows }),
610 Err(e) => Box::new(format!("Failed to execute query, encountered: {:?}", e)),
611 }
612 }
613
614 async fn mysql_query(&self, _ctx: QueryContext, query: String) -> Box<dyn Display> {
615 let mut conn = self.mysql_client.lock().await;
616 let result = conn.query_iter(query);
617 Box::new(match result {
618 Ok(result) => {
619 let mut rows = vec![];
620 let affected_rows = result.affected_rows();
621 for row in result {
622 match row {
623 Ok(r) => rows.push(r),
624 Err(e) => {
625 return Box::new(format!("Failed to parse query result, err: {:?}", e));
626 }
627 }
628 }
629
630 if rows.is_empty() {
631 format!("affected_rows: {}", affected_rows)
632 } else {
633 format!("{}", MysqlFormatter { rows })
634 }
635 }
636 Err(e) => format!("Failed to execute query, err: {:?}", e),
637 })
638 }
639
640 async fn grpc_query(&self, _ctx: QueryContext, query: String) -> Box<dyn Display> {
641 let mut client = self.grpc_client.lock().await;
642
643 let query_str = query.trim().to_lowercase();
644
645 if query_str.starts_with("use ") {
646 let database = query
648 .split_ascii_whitespace()
649 .nth(1)
650 .expect("Illegal `USE` statement: expecting a database.")
651 .trim_end_matches(';');
652 client.set_schema(database);
653 Box::new(ResultDisplayer {
654 result: Ok(Output::new_with_affected_rows(0)),
655 }) as _
656 } else if query_str.starts_with("set time_zone")
657 || query_str.starts_with("set session time_zone")
658 || query_str.starts_with("set local time_zone")
659 {
660 let timezone = query
662 .split('=')
663 .nth(1)
664 .expect("Illegal `SET TIMEZONE` statement: expecting a timezone expr.")
665 .trim()
666 .strip_prefix('\'')
667 .unwrap()
668 .strip_suffix("';")
669 .unwrap();
670
671 client.set_timezone(timezone);
672
673 Box::new(ResultDisplayer {
674 result: Ok(Output::new_with_affected_rows(0)),
675 }) as _
676 } else {
677 let mut result = client.sql(&query).await;
678 if let Ok(Output {
679 data: OutputData::Stream(stream),
680 ..
681 }) = result
682 {
683 match RecordBatches::try_collect(stream).await {
684 Ok(recordbatches) => {
685 result = Ok(Output::new_with_record_batches(recordbatches));
686 }
687 Err(e) => {
688 let status_code = e.status_code();
689 let msg = e.output_msg();
690 result = ServerSnafu {
691 code: status_code,
692 msg,
693 }
694 .fail();
695 }
696 }
697 }
698 Box::new(ResultDisplayer { result }) as _
699 }
700 }
701}
702
703#[async_trait]
704impl Database for GreptimeDB {
705 async fn query(&self, ctx: QueryContext, query: String) -> Box<dyn Display> {
706 if ctx.context.contains_key("restart") && self.env.server_addrs.server_addr.is_none() {
707 self.env.restart_server(self, false).await;
708 } else if let Some(version) = ctx.context.get("version") {
709 let version_bin_dir = self
710 .env
711 .versioned_bins_dirs
712 .lock()
713 .expect("lock poison")
714 .get(version.as_str())
715 .cloned();
716
717 match version_bin_dir {
718 Some(path) if path.clone().join(PROGRAM).is_file() => {
719 *self.env.bins_dir.lock().unwrap() = Some(path.clone());
721 }
722 _ => {
723 maybe_pull_binary(version, self.env.pull_version_on_need).await;
725 let root = get_workspace_root();
726 let new_path = PathBuf::from_iter([&root, version]);
727 *self.env.bins_dir.lock().unwrap() = Some(new_path);
728 }
729 }
730
731 self.env.restart_server(self, true).await;
732 tokio::time::sleep(Duration::from_secs(5)).await;
734 }
735
736 if let Some(protocol) = ctx.context.get(PROTOCOL_KEY) {
737 if protocol == MYSQL {
739 self.mysql_query(ctx, query).await
740 } else {
741 self.postgres_query(ctx, query).await
742 }
743 } else {
744 self.grpc_query(ctx, query).await
745 }
746 }
747}
748
749impl GreptimeDB {
750 fn stop(&mut self) {
751 if let Some(server_processes) = self.server_processes.clone() {
752 let mut server_processes = server_processes.lock().unwrap();
753 for mut server_process in server_processes.drain(..) {
754 Env::stop_server(&mut server_process);
755 println!(
756 "Standalone or Datanode (pid = {}) is stopped",
757 server_process.id()
758 );
759 }
760 }
761 if let Some(mut metasrv) = self
762 .metasrv_process
763 .lock()
764 .expect("someone else panic when holding lock")
765 .take()
766 {
767 Env::stop_server(&mut metasrv);
768 println!("Metasrv (pid = {}) is stopped", metasrv.id());
769 }
770 if let Some(mut frontend) = self
771 .frontend_process
772 .lock()
773 .expect("someone else panic when holding lock")
774 .take()
775 {
776 Env::stop_server(&mut frontend);
777 println!("Frontend (pid = {}) is stopped", frontend.id());
778 }
779 if let Some(mut flownode) = self
780 .flownode_process
781 .lock()
782 .expect("someone else panic when holding lock")
783 .take()
784 {
785 Env::stop_server(&mut flownode);
786 println!("Flownode (pid = {}) is stopped", flownode.id());
787 }
788 if matches!(self.ctx.wal, WalConfig::Kafka { needs_kafka_cluster, .. } if needs_kafka_cluster)
789 {
790 util::teardown_wal();
791 }
792 }
793}
794
795impl Drop for GreptimeDB {
796 fn drop(&mut self) {
797 if self.env.server_addrs.server_addr.is_none() {
798 self.stop();
799 }
800 }
801}
802
803pub struct GreptimeDBContext {
804 time: i64,
806 datanode_id: AtomicU32,
807 wal: WalConfig,
808 store_config: StoreConfig,
809 server_modes: Vec<ServerMode>,
810}
811
812impl GreptimeDBContext {
813 pub fn new(wal: WalConfig, store_config: StoreConfig) -> Self {
814 Self {
815 time: common_time::util::current_time_millis(),
816 datanode_id: AtomicU32::new(0),
817 wal,
818 store_config,
819 server_modes: Vec::new(),
820 }
821 }
822
823 pub(crate) fn time(&self) -> i64 {
824 self.time
825 }
826
827 pub fn is_raft_engine(&self) -> bool {
828 matches!(self.wal, WalConfig::RaftEngine)
829 }
830
831 pub fn kafka_wal_broker_endpoints(&self) -> String {
832 match &self.wal {
833 WalConfig::RaftEngine => String::new(),
834 WalConfig::Kafka {
835 broker_endpoints, ..
836 } => serde_json::to_string(&broker_endpoints).unwrap(),
837 }
838 }
839
840 fn incr_datanode_id(&self) {
841 let _ = self.datanode_id.fetch_add(1, Ordering::Relaxed);
842 }
843
844 fn reset_datanode_id(&self) {
845 self.datanode_id.store(0, Ordering::Relaxed);
846 }
847
848 pub(crate) fn store_config(&self) -> StoreConfig {
849 self.store_config.clone()
850 }
851
852 fn set_server_mode(&mut self, mode: ServerMode, idx: usize) {
853 if idx >= self.server_modes.len() {
854 self.server_modes.resize(idx + 1, mode.clone());
855 }
856 self.server_modes[idx] = mode;
857 }
858
859 fn get_server_mode(&self, idx: usize) -> Option<&ServerMode> {
860 self.server_modes.get(idx)
861 }
862}
863
864struct ResultDisplayer {
865 result: Result<Output, ClientError>,
866}
867
868impl Display for ResultDisplayer {
869 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
870 match &self.result {
871 Ok(result) => match &result.data {
872 OutputData::AffectedRows(rows) => {
873 write!(f, "Affected Rows: {rows}")
874 }
875 OutputData::RecordBatches(recordbatches) => {
876 let pretty = recordbatches.pretty_print().map_err(|e| e.to_string());
877 match pretty {
878 Ok(s) => write!(f, "{s}"),
879 Err(e) => {
880 write!(f, "Failed to pretty format {recordbatches:?}, error: {e}")
881 }
882 }
883 }
884 OutputData::Stream(_) => unreachable!(),
885 },
886 Err(e) => {
887 let status_code = e.status_code();
888 let root_cause = e.output_msg();
889 write!(
890 f,
891 "Error: {}({status_code}), {root_cause}",
892 status_code as u32
893 )
894 }
895 }
896 }
897}
898
899struct PostgresqlFormatter {
900 pub rows: Vec<PgRow>,
901}
902
903impl Display for PostgresqlFormatter {
904 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
905 if self.rows.is_empty() {
906 return f.write_fmt(format_args!("(Empty response)"));
907 }
908
909 let schema = match &self.rows[0] {
911 PgRow::CommandComplete(affected_rows) => {
912 write!(
913 f,
914 "{}",
915 ResultDisplayer {
916 result: Ok(Output::new_with_affected_rows(*affected_rows as usize)),
917 }
918 )?;
919 return Ok(());
920 }
921 PgRow::RowDescription(desc) => Arc::new(Schema::new(
922 desc.iter()
923 .map(|column| {
924 ColumnSchema::new(column.name(), ConcreteDataType::string_datatype(), false)
925 })
926 .collect(),
927 )),
928 _ => unreachable!(),
929 };
930 if schema.num_columns() == 0 {
931 return Ok(());
932 }
933
934 let mut columns: Vec<StringVectorBuilder> = (0..schema.num_columns())
936 .map(|_| StringVectorBuilder::with_capacity(schema.num_columns()))
937 .collect();
938 for row in self.rows.iter().skip(1) {
939 if let PgRow::Row(row) = row {
940 for (i, column) in columns.iter_mut().enumerate().take(schema.num_columns()) {
941 column.push(row.get(i));
942 }
943 }
944 }
945 let columns: Vec<VectorRef> = columns
946 .into_iter()
947 .map(|mut col| Arc::new(col.finish()) as VectorRef)
948 .collect();
949
950 let recordbatches = RecordBatches::try_from_columns(schema, columns)
952 .expect("Failed to construct recordbatches from columns. Please check the schema.");
953 let result_displayer = ResultDisplayer {
954 result: Ok(Output::new_with_record_batches(recordbatches)),
955 };
956 write!(f, "{}", result_displayer)?;
957
958 Ok(())
959 }
960}
961
962struct MysqlFormatter {
963 pub rows: Vec<MySqlRow>,
964}
965
966impl Display for MysqlFormatter {
967 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
968 if self.rows.is_empty() {
969 return f.write_fmt(format_args!("(Empty response)"));
970 }
971 let head_column = &self.rows[0];
973 let head_binding = head_column.columns();
974 let names = head_binding
975 .iter()
976 .map(|column| column.name_str())
977 .collect::<Vec<Cow<str>>>();
978 let schema = Arc::new(Schema::new(
979 names
980 .iter()
981 .map(|name| {
982 ColumnSchema::new(name.to_string(), ConcreteDataType::string_datatype(), false)
983 })
984 .collect(),
985 ));
986
987 let mut columns: Vec<StringVectorBuilder> = (0..schema.num_columns())
989 .map(|_| StringVectorBuilder::with_capacity(schema.num_columns()))
990 .collect();
991 for row in self.rows.iter() {
992 for (i, name) in names.iter().enumerate() {
993 columns[i].push(row.get::<String, &str>(name).as_deref());
994 }
995 }
996 let columns: Vec<VectorRef> = columns
997 .into_iter()
998 .map(|mut col| Arc::new(col.finish()) as VectorRef)
999 .collect();
1000
1001 let recordbatches = RecordBatches::try_from_columns(schema, columns)
1003 .expect("Failed to construct recordbatches from columns. Please check the schema.");
1004 let result_displayer = ResultDisplayer {
1005 result: Ok(Output::new_with_record_batches(recordbatches)),
1006 };
1007 write!(f, "{}", result_displayer)?;
1008
1009 Ok(())
1010 }
1011}