1use std::borrow::Cow;
16use std::collections::HashMap;
17use std::fmt::Display;
18use std::fs::OpenOptions;
19use std::io;
20use std::io::Write;
21use std::net::SocketAddr;
22use std::path::{Path, PathBuf};
23use std::process::{Child, Command};
24use std::sync::atomic::{AtomicU32, Ordering};
25use std::sync::{Arc, Mutex};
26use std::time::Duration;
27
28use async_trait::async_trait;
29use client::error::ServerSnafu;
30use client::{
31 Client, Database as DB, Error as ClientError, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME,
32};
33use common_error::ext::ErrorExt;
34use common_query::{Output, OutputData};
35use common_recordbatch::RecordBatches;
36use datatypes::data_type::ConcreteDataType;
37use datatypes::scalars::ScalarVectorBuilder;
38use datatypes::schema::{ColumnSchema, Schema};
39use datatypes::vectors::{StringVectorBuilder, VectorRef};
40use mysql::prelude::Queryable;
41use mysql::{Conn as MySqlClient, Row as MySqlRow};
42use sqlness::{Database, EnvController, QueryContext};
43use tokio::sync::Mutex as TokioMutex;
44use tokio_postgres::{Client as PgClient, SimpleQueryMessage as PgRow};
45
46use crate::protocol_interceptor::{MYSQL, PROTOCOL_KEY};
47use crate::server_mode::ServerMode;
48use crate::util::{get_workspace_root, maybe_pull_binary, PROGRAM};
49use crate::{util, ServerAddr};
50
51const SERVER_MODE_STANDALONE_IDX: usize = 0;
53const SERVER_MODE_METASRV_IDX: usize = 0;
55const SERVER_MODE_DATANODE_START_IDX: usize = 1;
56const SERVER_MODE_FRONTEND_IDX: usize = 4;
57const SERVER_MODE_FLOWNODE_IDX: usize = 5;
58
59#[derive(Clone)]
60pub enum WalConfig {
61 RaftEngine,
62 Kafka {
63 needs_kafka_cluster: bool,
66 broker_endpoints: Vec<String>,
67 },
68}
69
70#[derive(Clone)]
71pub struct StoreConfig {
72 pub store_addrs: Vec<String>,
73 pub setup_etcd: bool,
74 pub setup_pg: bool,
75 pub setup_mysql: bool,
76}
77
78#[derive(Clone)]
79pub struct Env {
80 sqlness_home: PathBuf,
81 server_addrs: ServerAddr,
82 wal: WalConfig,
83
84 bins_dir: Arc<Mutex<Option<PathBuf>>>,
88 versioned_bins_dirs: Arc<Mutex<HashMap<String, PathBuf>>>,
90 pull_version_on_need: bool,
92 store_config: StoreConfig,
94}
95
96#[async_trait]
97impl EnvController for Env {
98 type DB = GreptimeDB;
99
100 async fn start(&self, mode: &str, id: usize, _config: Option<&Path>) -> Self::DB {
101 if self.server_addrs.server_addr.is_some() && id > 0 {
102 panic!("Parallel test mode is not supported when server address is already set.");
103 }
104
105 std::env::set_var("SQLNESS_HOME", self.sqlness_home.display().to_string());
106 match mode {
107 "standalone" => self.start_standalone(id).await,
108 "distributed" => self.start_distributed(id).await,
109 _ => panic!("Unexpected mode: {mode}"),
110 }
111 }
112
113 async fn stop(&self, _mode: &str, mut database: Self::DB) {
115 database.stop();
116 }
117}
118
119impl Env {
120 pub fn new(
121 data_home: PathBuf,
122 server_addrs: ServerAddr,
123 wal: WalConfig,
124 pull_version_on_need: bool,
125 bins_dir: Option<PathBuf>,
126 store_config: StoreConfig,
127 ) -> Self {
128 Self {
129 sqlness_home: data_home,
130 server_addrs,
131 wal,
132 pull_version_on_need,
133 bins_dir: Arc::new(Mutex::new(bins_dir.clone())),
134 versioned_bins_dirs: Arc::new(Mutex::new(HashMap::from_iter([(
135 "latest".to_string(),
136 bins_dir.clone().unwrap_or(util::get_binary_dir("debug")),
137 )]))),
138 store_config,
139 }
140 }
141
142 async fn start_standalone(&self, id: usize) -> GreptimeDB {
143 println!("Starting standalone instance {id}");
144
145 if self.server_addrs.server_addr.is_some() {
146 self.connect_db(&self.server_addrs, id).await
147 } else {
148 self.build_db();
149 self.setup_wal();
150 let mut db_ctx = GreptimeDBContext::new(self.wal.clone(), self.store_config.clone());
151
152 let server_mode = ServerMode::random_standalone();
153 db_ctx.set_server_mode(server_mode.clone(), SERVER_MODE_STANDALONE_IDX);
154 let server_addr = server_mode.server_addr().unwrap();
155 let server_process = self.start_server(server_mode, &db_ctx, id, true).await;
156
157 let mut greptimedb = self.connect_db(&server_addr, id).await;
158 greptimedb.server_processes = Some(Arc::new(Mutex::new(vec![server_process])));
159 greptimedb.is_standalone = true;
160 greptimedb.ctx = db_ctx;
161
162 greptimedb
163 }
164 }
165
166 async fn start_distributed(&self, id: usize) -> GreptimeDB {
167 if self.server_addrs.server_addr.is_some() {
168 self.connect_db(&self.server_addrs, id).await
169 } else {
170 self.build_db();
171 self.setup_wal();
172 self.setup_etcd();
173 self.setup_pg();
174 self.setup_mysql().await;
175 let mut db_ctx = GreptimeDBContext::new(self.wal.clone(), self.store_config.clone());
176
177 let meta_server_mode = ServerMode::random_metasrv();
179 let metasrv_port = match &meta_server_mode {
180 ServerMode::Metasrv { rpc_server_addr, .. } => rpc_server_addr
181 .split(':')
182 .nth(1)
183 .unwrap()
184 .parse::<u16>()
185 .unwrap(),
186 _ => panic!("metasrv mode not set, maybe running in remote mode which doesn't support restart?"),
187 };
188 db_ctx.set_server_mode(meta_server_mode.clone(), SERVER_MODE_METASRV_IDX);
189 let meta_server = self.start_server(meta_server_mode, &db_ctx, id, true).await;
190
191 let datanode_1_mode = ServerMode::random_datanode(metasrv_port, 0);
192 db_ctx.set_server_mode(datanode_1_mode.clone(), SERVER_MODE_DATANODE_START_IDX);
193 let datanode_1 = self.start_server(datanode_1_mode, &db_ctx, id, true).await;
194 let datanode_2_mode = ServerMode::random_datanode(metasrv_port, 1);
195 db_ctx.set_server_mode(datanode_2_mode.clone(), SERVER_MODE_DATANODE_START_IDX + 1);
196 let datanode_2 = self.start_server(datanode_2_mode, &db_ctx, id, true).await;
197 let datanode_3_mode = ServerMode::random_datanode(metasrv_port, 2);
198 db_ctx.set_server_mode(datanode_3_mode.clone(), SERVER_MODE_DATANODE_START_IDX + 2);
199 let datanode_3 = self.start_server(datanode_3_mode, &db_ctx, id, true).await;
200
201 let frontend_mode = ServerMode::random_frontend(metasrv_port);
202 let server_addr = frontend_mode.server_addr().unwrap();
203 db_ctx.set_server_mode(frontend_mode.clone(), SERVER_MODE_FRONTEND_IDX);
204 let frontend = self.start_server(frontend_mode, &db_ctx, id, true).await;
205
206 let flownode_mode = ServerMode::random_flownode(metasrv_port, 0);
207 db_ctx.set_server_mode(flownode_mode.clone(), SERVER_MODE_FLOWNODE_IDX);
208 let flownode = self.start_server(flownode_mode, &db_ctx, id, true).await;
209
210 let mut greptimedb = self.connect_db(&server_addr, id).await;
211
212 greptimedb.metasrv_process = Some(meta_server).into();
213 greptimedb.server_processes = Some(Arc::new(Mutex::new(vec![
214 datanode_1, datanode_2, datanode_3,
215 ])));
216 greptimedb.frontend_process = Some(frontend).into();
217 greptimedb.flownode_process = Some(flownode).into();
218 greptimedb.is_standalone = false;
219 greptimedb.ctx = db_ctx;
220
221 greptimedb
222 }
223 }
224
225 async fn create_pg_client(&self, pg_server_addr: &str) -> PgClient {
226 let sockaddr: SocketAddr = pg_server_addr.parse().expect(
227 "Failed to parse the Postgres server address. Please check if the address is in the format of `ip:port`.",
228 );
229 let mut config = tokio_postgres::config::Config::new();
230 config.host(sockaddr.ip().to_string());
231 config.port(sockaddr.port());
232 config.dbname(DEFAULT_SCHEMA_NAME);
233
234 const MAX_RETRY: usize = 3;
236 let mut backoff = Duration::from_millis(500);
237 for _ in 0..MAX_RETRY {
238 if let Ok((pg_client, conn)) = config.connect(tokio_postgres::NoTls).await {
239 tokio::spawn(conn);
240 return pg_client;
241 }
242 tokio::time::sleep(backoff).await;
243 backoff *= 2;
244 }
245 panic!("Failed to connect to Postgres server. Please check if the server is running.");
246 }
247
248 async fn create_mysql_client(&self, mysql_server_addr: &str) -> MySqlClient {
249 let sockaddr: SocketAddr = mysql_server_addr.parse().expect(
250 "Failed to parse the MySQL server address. Please check if the address is in the format of `ip:port`.",
251 );
252 let ops = mysql::OptsBuilder::new()
253 .ip_or_hostname(Some(sockaddr.ip().to_string()))
254 .tcp_port(sockaddr.port())
255 .db_name(Some(DEFAULT_SCHEMA_NAME));
256 const MAX_RETRY: usize = 3;
258 let mut backoff = Duration::from_millis(500);
259
260 for _ in 0..MAX_RETRY {
261 if let Ok(client) = mysql::Conn::new(ops.clone()) {
263 return client;
264 }
265 tokio::time::sleep(backoff).await;
266 backoff *= 2;
267 }
268
269 panic!("Failed to connect to MySQL server. Please check if the server is running.")
270 }
271
272 async fn connect_db(&self, server_addr: &ServerAddr, id: usize) -> GreptimeDB {
273 let grpc_server_addr = server_addr.server_addr.clone().unwrap();
274 let pg_server_addr = server_addr.pg_server_addr.clone().unwrap();
275 let mysql_server_addr = server_addr.mysql_server_addr.clone().unwrap();
276
277 let grpc_client = Client::with_urls(vec![grpc_server_addr.clone()]);
278 let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, grpc_client);
279 let pg_client = self.create_pg_client(&pg_server_addr).await;
280 let mysql_client = self.create_mysql_client(&mysql_server_addr).await;
281
282 GreptimeDB {
283 grpc_client: TokioMutex::new(db),
284 pg_client: TokioMutex::new(pg_client),
285 mysql_client: TokioMutex::new(mysql_client),
286 server_processes: None,
287 metasrv_process: None.into(),
288 frontend_process: None.into(),
289 flownode_process: None.into(),
290 ctx: GreptimeDBContext {
291 time: 0,
292 datanode_id: Default::default(),
293 wal: self.wal.clone(),
294 store_config: self.store_config.clone(),
295 server_modes: Vec::new(),
296 },
297 is_standalone: false,
298 env: self.clone(),
299 id,
300 }
301 }
302
303 fn stop_server(process: &mut Child) {
304 let _ = process.kill();
305 let _ = process.wait();
306 }
307
308 async fn start_server(
309 &self,
310 mode: ServerMode,
311 db_ctx: &GreptimeDBContext,
312 id: usize,
313 truncate_log: bool,
314 ) -> Child {
315 let log_file_name = match mode {
316 ServerMode::Datanode { node_id, .. } => {
317 db_ctx.incr_datanode_id();
318 format!("greptime-{}-sqlness-datanode-{}.log", id, node_id)
319 }
320 ServerMode::Flownode { .. } => format!("greptime-{}-sqlness-flownode.log", id),
321 ServerMode::Frontend { .. } => format!("greptime-{}-sqlness-frontend.log", id),
322 ServerMode::Metasrv { .. } => format!("greptime-{}-sqlness-metasrv.log", id),
323 ServerMode::Standalone { .. } => format!("greptime-{}-sqlness-standalone.log", id),
324 };
325 let stdout_file_name = self.sqlness_home.join(log_file_name).display().to_string();
326
327 println!("DB instance {id} log file at {stdout_file_name}");
328
329 let stdout_file = OpenOptions::new()
330 .create(true)
331 .write(true)
332 .truncate(truncate_log)
333 .append(!truncate_log)
334 .open(stdout_file_name)
335 .unwrap();
336
337 let args = mode.get_args(&self.sqlness_home, self, db_ctx, id);
338 let check_ip_addrs = mode.check_addrs();
339
340 for check_ip_addr in &check_ip_addrs {
341 if util::check_port(check_ip_addr.parse().unwrap(), Duration::from_secs(1)).await {
342 panic!(
343 "Port {check_ip_addr} is already in use, please check and retry.",
344 check_ip_addr = check_ip_addr
345 );
346 }
347 }
348
349 let program = PROGRAM;
350
351 let bins_dir = self.bins_dir.lock().unwrap().clone().expect(
352 "GreptimeDB binary is not available. Please pass in the path to the directory that contains the pre-built GreptimeDB binary. Or you may call `self.build_db()` beforehand.",
353 );
354
355 let mut process = Command::new(program)
356 .current_dir(bins_dir.clone())
357 .env("TZ", "UTC")
358 .args(args)
359 .stdout(stdout_file)
360 .spawn()
361 .unwrap_or_else(|error| {
362 panic!(
363 "Failed to start the DB with subcommand {}, Error: {error}, path: {:?}",
364 mode.name(),
365 bins_dir.join(program)
366 );
367 });
368
369 for check_ip_addr in &check_ip_addrs {
370 if !util::check_port(check_ip_addr.parse().unwrap(), Duration::from_secs(10)).await {
371 Env::stop_server(&mut process);
372 panic!("{} doesn't up in 10 seconds, quit.", mode.name())
373 }
374 }
375
376 process
377 }
378
379 async fn restart_server(&self, db: &GreptimeDB, is_full_restart: bool) {
381 {
382 if let Some(server_process) = db.server_processes.clone() {
383 let mut server_processes = server_process.lock().unwrap();
384 for server_process in server_processes.iter_mut() {
385 Env::stop_server(server_process);
386 }
387 }
388
389 if is_full_restart {
390 if let Some(mut metasrv_process) =
391 db.metasrv_process.lock().expect("poisoned lock").take()
392 {
393 Env::stop_server(&mut metasrv_process);
394 }
395 if let Some(mut frontend_process) =
396 db.frontend_process.lock().expect("poisoned lock").take()
397 {
398 Env::stop_server(&mut frontend_process);
399 }
400 }
401
402 if let Some(mut flownode_process) =
403 db.flownode_process.lock().expect("poisoned lock").take()
404 {
405 Env::stop_server(&mut flownode_process);
406 }
407 }
408
409 let new_server_processes = if db.is_standalone {
411 let server_mode = db
412 .ctx
413 .get_server_mode(SERVER_MODE_STANDALONE_IDX)
414 .cloned()
415 .unwrap();
416 let server_addr = server_mode.server_addr().unwrap();
417 let new_server_process = self.start_server(server_mode, &db.ctx, db.id, false).await;
418
419 *db.pg_client.lock().await = self
420 .create_pg_client(&server_addr.pg_server_addr.unwrap())
421 .await;
422 *db.mysql_client.lock().await = self
423 .create_mysql_client(&server_addr.mysql_server_addr.unwrap())
424 .await;
425 vec![new_server_process]
426 } else {
427 db.ctx.reset_datanode_id();
428 if is_full_restart {
429 let metasrv_mode = db
430 .ctx
431 .get_server_mode(SERVER_MODE_METASRV_IDX)
432 .cloned()
433 .unwrap();
434 let metasrv = self.start_server(metasrv_mode, &db.ctx, db.id, false).await;
435 db.metasrv_process
436 .lock()
437 .expect("lock poisoned")
438 .replace(metasrv);
439
440 tokio::time::sleep(Duration::from_secs(5)).await;
443 }
444
445 let mut processes = vec![];
446 for i in 0..3 {
447 let datanode_mode = db
448 .ctx
449 .get_server_mode(SERVER_MODE_DATANODE_START_IDX + i)
450 .cloned()
451 .unwrap();
452 let new_server_process = self
453 .start_server(datanode_mode, &db.ctx, db.id, false)
454 .await;
455 processes.push(new_server_process);
456 }
457
458 if is_full_restart {
459 let frontend_mode = db
460 .ctx
461 .get_server_mode(SERVER_MODE_FRONTEND_IDX)
462 .cloned()
463 .unwrap();
464 let frontend = self
465 .start_server(frontend_mode, &db.ctx, db.id, false)
466 .await;
467 db.frontend_process
468 .lock()
469 .expect("lock poisoned")
470 .replace(frontend);
471 }
472
473 let flownode_mode = db
474 .ctx
475 .get_server_mode(SERVER_MODE_FLOWNODE_IDX)
476 .cloned()
477 .unwrap();
478 let flownode = self
479 .start_server(flownode_mode, &db.ctx, db.id, false)
480 .await;
481 db.flownode_process
482 .lock()
483 .expect("lock poisoned")
484 .replace(flownode);
485
486 processes
487 };
488
489 if let Some(server_processes) = db.server_processes.clone() {
490 let mut server_processes = server_processes.lock().unwrap();
491 *server_processes = new_server_processes;
492 }
493 }
494
495 fn setup_wal(&self) {
497 if matches!(self.wal, WalConfig::Kafka { needs_kafka_cluster, .. } if needs_kafka_cluster) {
498 util::setup_wal();
499 }
500 }
501
502 fn setup_etcd(&self) {
504 if self.store_config.setup_etcd {
505 let client_ports = self
506 .store_config
507 .store_addrs
508 .iter()
509 .map(|s| s.split(':').nth(1).unwrap().parse::<u16>().unwrap())
510 .collect::<Vec<_>>();
511 util::setup_etcd(client_ports, None, None);
512 }
513 }
514
515 fn setup_pg(&self) {
517 if self.store_config.setup_pg {
518 let client_ports = self
519 .store_config
520 .store_addrs
521 .iter()
522 .map(|s| s.split(':').nth(1).unwrap().parse::<u16>().unwrap())
523 .collect::<Vec<_>>();
524 let client_port = client_ports.first().unwrap_or(&5432);
525 util::setup_pg(*client_port, None);
526 }
527 }
528
529 async fn setup_mysql(&self) {
531 if self.store_config.setup_mysql {
532 let client_ports = self
533 .store_config
534 .store_addrs
535 .iter()
536 .map(|s| s.split(':').nth(1).unwrap().parse::<u16>().unwrap())
537 .collect::<Vec<_>>();
538 let client_port = client_ports.first().unwrap_or(&3306);
539 util::setup_mysql(*client_port, None);
540
541 tokio::time::sleep(Duration::from_secs(10)).await;
543 }
544 }
545
546 fn build_db(&self) {
548 if self.bins_dir.lock().unwrap().is_some() {
549 return;
550 }
551
552 println!("Going to build the DB...");
553 let output = Command::new("cargo")
554 .current_dir(util::get_workspace_root())
555 .args([
556 "build",
557 "--bin",
558 "greptime",
559 "--features",
560 "pg_kvbackend,mysql_kvbackend",
561 ])
562 .output()
563 .expect("Failed to start GreptimeDB");
564 if !output.status.success() {
565 println!("Failed to build GreptimeDB, {}", output.status);
566 println!("Cargo build stdout:");
567 io::stdout().write_all(&output.stdout).unwrap();
568 println!("Cargo build stderr:");
569 io::stderr().write_all(&output.stderr).unwrap();
570 panic!();
571 }
572
573 let _ = self
574 .bins_dir
575 .lock()
576 .unwrap()
577 .insert(util::get_binary_dir("debug"));
578 }
579}
580
581pub struct GreptimeDB {
582 server_processes: Option<Arc<Mutex<Vec<Child>>>>,
583 metasrv_process: Mutex<Option<Child>>,
584 frontend_process: Mutex<Option<Child>>,
585 flownode_process: Mutex<Option<Child>>,
586 grpc_client: TokioMutex<DB>,
587 pg_client: TokioMutex<PgClient>,
588 mysql_client: TokioMutex<MySqlClient>,
589 ctx: GreptimeDBContext,
590 is_standalone: bool,
591 env: Env,
592 id: usize,
593}
594
595impl GreptimeDB {
596 async fn postgres_query(&self, _ctx: QueryContext, query: String) -> Box<dyn Display> {
597 let client = self.pg_client.lock().await;
598 match client.simple_query(&query).await {
599 Ok(rows) => Box::new(PostgresqlFormatter { rows }),
600 Err(e) => Box::new(format!("Failed to execute query, encountered: {:?}", e)),
601 }
602 }
603
604 async fn mysql_query(&self, _ctx: QueryContext, query: String) -> Box<dyn Display> {
605 let mut conn = self.mysql_client.lock().await;
606 let result = conn.query_iter(query);
607 Box::new(match result {
608 Ok(result) => {
609 let mut rows = vec![];
610 let affected_rows = result.affected_rows();
611 for row in result {
612 match row {
613 Ok(r) => rows.push(r),
614 Err(e) => {
615 return Box::new(format!("Failed to parse query result, err: {:?}", e))
616 }
617 }
618 }
619
620 if rows.is_empty() {
621 format!("affected_rows: {}", affected_rows)
622 } else {
623 format!("{}", MysqlFormatter { rows })
624 }
625 }
626 Err(e) => format!("Failed to execute query, err: {:?}", e),
627 })
628 }
629
630 async fn grpc_query(&self, _ctx: QueryContext, query: String) -> Box<dyn Display> {
631 let mut client = self.grpc_client.lock().await;
632
633 let query_str = query.trim().to_lowercase();
634
635 if query_str.starts_with("use ") {
636 let database = query
638 .split_ascii_whitespace()
639 .nth(1)
640 .expect("Illegal `USE` statement: expecting a database.")
641 .trim_end_matches(';');
642 client.set_schema(database);
643 Box::new(ResultDisplayer {
644 result: Ok(Output::new_with_affected_rows(0)),
645 }) as _
646 } else if query_str.starts_with("set time_zone")
647 || query_str.starts_with("set session time_zone")
648 || query_str.starts_with("set local time_zone")
649 {
650 let timezone = query
652 .split('=')
653 .nth(1)
654 .expect("Illegal `SET TIMEZONE` statement: expecting a timezone expr.")
655 .trim()
656 .strip_prefix('\'')
657 .unwrap()
658 .strip_suffix("';")
659 .unwrap();
660
661 client.set_timezone(timezone);
662
663 Box::new(ResultDisplayer {
664 result: Ok(Output::new_with_affected_rows(0)),
665 }) as _
666 } else {
667 let mut result = client.sql(&query).await;
668 if let Ok(Output {
669 data: OutputData::Stream(stream),
670 ..
671 }) = result
672 {
673 match RecordBatches::try_collect(stream).await {
674 Ok(recordbatches) => {
675 result = Ok(Output::new_with_record_batches(recordbatches));
676 }
677 Err(e) => {
678 let status_code = e.status_code();
679 let msg = e.output_msg();
680 result = ServerSnafu {
681 code: status_code,
682 msg,
683 }
684 .fail();
685 }
686 }
687 }
688 Box::new(ResultDisplayer { result }) as _
689 }
690 }
691}
692
693#[async_trait]
694impl Database for GreptimeDB {
695 async fn query(&self, ctx: QueryContext, query: String) -> Box<dyn Display> {
696 if ctx.context.contains_key("restart") && self.env.server_addrs.server_addr.is_none() {
697 self.env.restart_server(self, false).await;
698 } else if let Some(version) = ctx.context.get("version") {
699 let version_bin_dir = self
700 .env
701 .versioned_bins_dirs
702 .lock()
703 .expect("lock poison")
704 .get(version.as_str())
705 .cloned();
706
707 match version_bin_dir {
708 Some(path) if path.clone().join(PROGRAM).is_file() => {
709 *self.env.bins_dir.lock().unwrap() = Some(path.clone());
711 }
712 _ => {
713 maybe_pull_binary(version, self.env.pull_version_on_need).await;
715 let root = get_workspace_root();
716 let new_path = PathBuf::from_iter([&root, version]);
717 *self.env.bins_dir.lock().unwrap() = Some(new_path);
718 }
719 }
720
721 self.env.restart_server(self, true).await;
722 tokio::time::sleep(Duration::from_secs(5)).await;
724 }
725
726 if let Some(protocol) = ctx.context.get(PROTOCOL_KEY) {
727 if protocol == MYSQL {
729 self.mysql_query(ctx, query).await
730 } else {
731 self.postgres_query(ctx, query).await
732 }
733 } else {
734 self.grpc_query(ctx, query).await
735 }
736 }
737}
738
739impl GreptimeDB {
740 fn stop(&mut self) {
741 if let Some(server_processes) = self.server_processes.clone() {
742 let mut server_processes = server_processes.lock().unwrap();
743 for mut server_process in server_processes.drain(..) {
744 Env::stop_server(&mut server_process);
745 println!(
746 "Standalone or Datanode (pid = {}) is stopped",
747 server_process.id()
748 );
749 }
750 }
751 if let Some(mut metasrv) = self
752 .metasrv_process
753 .lock()
754 .expect("someone else panic when holding lock")
755 .take()
756 {
757 Env::stop_server(&mut metasrv);
758 println!("Metasrv (pid = {}) is stopped", metasrv.id());
759 }
760 if let Some(mut frontend) = self
761 .frontend_process
762 .lock()
763 .expect("someone else panic when holding lock")
764 .take()
765 {
766 Env::stop_server(&mut frontend);
767 println!("Frontend (pid = {}) is stopped", frontend.id());
768 }
769 if let Some(mut flownode) = self
770 .flownode_process
771 .lock()
772 .expect("someone else panic when holding lock")
773 .take()
774 {
775 Env::stop_server(&mut flownode);
776 println!("Flownode (pid = {}) is stopped", flownode.id());
777 }
778 if matches!(self.ctx.wal, WalConfig::Kafka { needs_kafka_cluster, .. } if needs_kafka_cluster)
779 {
780 util::teardown_wal();
781 }
782 }
783}
784
785impl Drop for GreptimeDB {
786 fn drop(&mut self) {
787 if self.env.server_addrs.server_addr.is_none() {
788 self.stop();
789 }
790 }
791}
792
793pub struct GreptimeDBContext {
794 time: i64,
796 datanode_id: AtomicU32,
797 wal: WalConfig,
798 store_config: StoreConfig,
799 server_modes: Vec<ServerMode>,
800}
801
802impl GreptimeDBContext {
803 pub fn new(wal: WalConfig, store_config: StoreConfig) -> Self {
804 Self {
805 time: common_time::util::current_time_millis(),
806 datanode_id: AtomicU32::new(0),
807 wal,
808 store_config,
809 server_modes: Vec::new(),
810 }
811 }
812
813 pub(crate) fn time(&self) -> i64 {
814 self.time
815 }
816
817 pub fn is_raft_engine(&self) -> bool {
818 matches!(self.wal, WalConfig::RaftEngine)
819 }
820
821 pub fn kafka_wal_broker_endpoints(&self) -> String {
822 match &self.wal {
823 WalConfig::RaftEngine => String::new(),
824 WalConfig::Kafka {
825 broker_endpoints, ..
826 } => serde_json::to_string(&broker_endpoints).unwrap(),
827 }
828 }
829
830 fn incr_datanode_id(&self) {
831 let _ = self.datanode_id.fetch_add(1, Ordering::Relaxed);
832 }
833
834 fn reset_datanode_id(&self) {
835 self.datanode_id.store(0, Ordering::Relaxed);
836 }
837
838 pub(crate) fn store_config(&self) -> StoreConfig {
839 self.store_config.clone()
840 }
841
842 fn set_server_mode(&mut self, mode: ServerMode, idx: usize) {
843 if idx >= self.server_modes.len() {
844 self.server_modes.resize(idx + 1, mode.clone());
845 }
846 self.server_modes[idx] = mode;
847 }
848
849 fn get_server_mode(&self, idx: usize) -> Option<&ServerMode> {
850 self.server_modes.get(idx)
851 }
852}
853
854struct ResultDisplayer {
855 result: Result<Output, ClientError>,
856}
857
858impl Display for ResultDisplayer {
859 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
860 match &self.result {
861 Ok(result) => match &result.data {
862 OutputData::AffectedRows(rows) => {
863 write!(f, "Affected Rows: {rows}")
864 }
865 OutputData::RecordBatches(recordbatches) => {
866 let pretty = recordbatches.pretty_print().map_err(|e| e.to_string());
867 match pretty {
868 Ok(s) => write!(f, "{s}"),
869 Err(e) => {
870 write!(f, "Failed to pretty format {recordbatches:?}, error: {e}")
871 }
872 }
873 }
874 OutputData::Stream(_) => unreachable!(),
875 },
876 Err(e) => {
877 let status_code = e.status_code();
878 let root_cause = e.output_msg();
879 write!(
880 f,
881 "Error: {}({status_code}), {root_cause}",
882 status_code as u32
883 )
884 }
885 }
886 }
887}
888
889struct PostgresqlFormatter {
890 pub rows: Vec<PgRow>,
891}
892
893impl Display for PostgresqlFormatter {
894 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
895 if self.rows.is_empty() {
896 return f.write_fmt(format_args!("(Empty response)"));
897 }
898
899 let schema = match &self.rows[0] {
901 PgRow::CommandComplete(affected_rows) => {
902 write!(
903 f,
904 "{}",
905 ResultDisplayer {
906 result: Ok(Output::new_with_affected_rows(*affected_rows as usize)),
907 }
908 )?;
909 return Ok(());
910 }
911 PgRow::RowDescription(desc) => Arc::new(Schema::new(
912 desc.iter()
913 .map(|column| {
914 ColumnSchema::new(column.name(), ConcreteDataType::string_datatype(), false)
915 })
916 .collect(),
917 )),
918 _ => unreachable!(),
919 };
920 if schema.num_columns() == 0 {
921 return Ok(());
922 }
923
924 let mut columns: Vec<StringVectorBuilder> = (0..schema.num_columns())
926 .map(|_| StringVectorBuilder::with_capacity(schema.num_columns()))
927 .collect();
928 for row in self.rows.iter().skip(1) {
929 if let PgRow::Row(row) = row {
930 for (i, column) in columns.iter_mut().enumerate().take(schema.num_columns()) {
931 column.push(row.get(i));
932 }
933 }
934 }
935 let columns: Vec<VectorRef> = columns
936 .into_iter()
937 .map(|mut col| Arc::new(col.finish()) as VectorRef)
938 .collect();
939
940 let recordbatches = RecordBatches::try_from_columns(schema, columns)
942 .expect("Failed to construct recordbatches from columns. Please check the schema.");
943 let result_displayer = ResultDisplayer {
944 result: Ok(Output::new_with_record_batches(recordbatches)),
945 };
946 write!(f, "{}", result_displayer)?;
947
948 Ok(())
949 }
950}
951
952struct MysqlFormatter {
953 pub rows: Vec<MySqlRow>,
954}
955
956impl Display for MysqlFormatter {
957 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
958 if self.rows.is_empty() {
959 return f.write_fmt(format_args!("(Empty response)"));
960 }
961 let head_column = &self.rows[0];
963 let head_binding = head_column.columns();
964 let names = head_binding
965 .iter()
966 .map(|column| column.name_str())
967 .collect::<Vec<Cow<str>>>();
968 let schema = Arc::new(Schema::new(
969 names
970 .iter()
971 .map(|name| {
972 ColumnSchema::new(name.to_string(), ConcreteDataType::string_datatype(), false)
973 })
974 .collect(),
975 ));
976
977 let mut columns: Vec<StringVectorBuilder> = (0..schema.num_columns())
979 .map(|_| StringVectorBuilder::with_capacity(schema.num_columns()))
980 .collect();
981 for row in self.rows.iter() {
982 for (i, name) in names.iter().enumerate() {
983 columns[i].push(row.get::<String, &str>(name).as_deref());
984 }
985 }
986 let columns: Vec<VectorRef> = columns
987 .into_iter()
988 .map(|mut col| Arc::new(col.finish()) as VectorRef)
989 .collect();
990
991 let recordbatches = RecordBatches::try_from_columns(schema, columns)
993 .expect("Failed to construct recordbatches from columns. Please check the schema.");
994 let result_displayer = ResultDisplayer {
995 result: Ok(Output::new_with_record_batches(recordbatches)),
996 };
997 write!(f, "{}", result_displayer)?;
998
999 Ok(())
1000 }
1001}