sqlness_runner/
env.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::borrow::Cow;
16use std::collections::HashMap;
17use std::fmt::Display;
18use std::fs::OpenOptions;
19use std::io;
20use std::io::Write;
21use std::net::SocketAddr;
22use std::path::{Path, PathBuf};
23use std::process::{Child, Command};
24use std::sync::atomic::{AtomicU32, Ordering};
25use std::sync::{Arc, Mutex};
26use std::time::Duration;
27
28use async_trait::async_trait;
29use client::error::ServerSnafu;
30use client::{
31    Client, Database as DB, Error as ClientError, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME,
32};
33use common_error::ext::ErrorExt;
34use common_query::{Output, OutputData};
35use common_recordbatch::RecordBatches;
36use datatypes::data_type::ConcreteDataType;
37use datatypes::scalars::ScalarVectorBuilder;
38use datatypes::schema::{ColumnSchema, Schema};
39use datatypes::vectors::{StringVectorBuilder, VectorRef};
40use mysql::prelude::Queryable;
41use mysql::{Conn as MySqlClient, Row as MySqlRow};
42use sqlness::{Database, EnvController, QueryContext};
43use tokio::sync::Mutex as TokioMutex;
44use tokio_postgres::{Client as PgClient, SimpleQueryMessage as PgRow};
45
46use crate::protocol_interceptor::{MYSQL, PROTOCOL_KEY};
47use crate::server_mode::ServerMode;
48use crate::util::{get_workspace_root, maybe_pull_binary, PROGRAM};
49use crate::{util, ServerAddr};
50
51// standalone mode
52const SERVER_MODE_STANDALONE_IDX: usize = 0;
53// distributed mode
54const SERVER_MODE_METASRV_IDX: usize = 0;
55const SERVER_MODE_DATANODE_START_IDX: usize = 1;
56const SERVER_MODE_FRONTEND_IDX: usize = 4;
57const SERVER_MODE_FLOWNODE_IDX: usize = 5;
58
59#[derive(Clone)]
60pub enum WalConfig {
61    RaftEngine,
62    Kafka {
63        /// Indicates whether the runner needs to start a kafka cluster
64        /// (it might be available in the external system environment).
65        needs_kafka_cluster: bool,
66        broker_endpoints: Vec<String>,
67    },
68}
69
70#[derive(Clone)]
71pub struct StoreConfig {
72    pub store_addrs: Vec<String>,
73    pub setup_etcd: bool,
74    pub setup_pg: bool,
75    pub setup_mysql: bool,
76}
77
78#[derive(Clone)]
79pub struct Env {
80    sqlness_home: PathBuf,
81    server_addrs: ServerAddr,
82    wal: WalConfig,
83
84    /// The path to the directory that contains the pre-built GreptimeDB binary.
85    /// When running in CI, this is expected to be set.
86    /// If not set, this runner will build the GreptimeDB binary itself when needed, and set this field by then.
87    bins_dir: Arc<Mutex<Option<PathBuf>>>,
88    /// The path to the directory that contains the old pre-built GreptimeDB binaries.
89    versioned_bins_dirs: Arc<Mutex<HashMap<String, PathBuf>>>,
90    /// Pull different versions of GreptimeDB on need.
91    pull_version_on_need: bool,
92    /// Store address for metasrv metadata
93    store_config: StoreConfig,
94}
95
96#[async_trait]
97impl EnvController for Env {
98    type DB = GreptimeDB;
99
100    async fn start(&self, mode: &str, id: usize, _config: Option<&Path>) -> Self::DB {
101        if self.server_addrs.server_addr.is_some() && id > 0 {
102            panic!("Parallel test mode is not supported when server address is already set.");
103        }
104
105        std::env::set_var("SQLNESS_HOME", self.sqlness_home.display().to_string());
106        match mode {
107            "standalone" => self.start_standalone(id).await,
108            "distributed" => self.start_distributed(id).await,
109            _ => panic!("Unexpected mode: {mode}"),
110        }
111    }
112
113    /// Stop one [`Database`].
114    async fn stop(&self, _mode: &str, mut database: Self::DB) {
115        database.stop();
116    }
117}
118
119impl Env {
120    pub fn new(
121        data_home: PathBuf,
122        server_addrs: ServerAddr,
123        wal: WalConfig,
124        pull_version_on_need: bool,
125        bins_dir: Option<PathBuf>,
126        store_config: StoreConfig,
127    ) -> Self {
128        Self {
129            sqlness_home: data_home,
130            server_addrs,
131            wal,
132            pull_version_on_need,
133            bins_dir: Arc::new(Mutex::new(bins_dir.clone())),
134            versioned_bins_dirs: Arc::new(Mutex::new(HashMap::from_iter([(
135                "latest".to_string(),
136                bins_dir.clone().unwrap_or(util::get_binary_dir("debug")),
137            )]))),
138            store_config,
139        }
140    }
141
142    async fn start_standalone(&self, id: usize) -> GreptimeDB {
143        println!("Starting standalone instance {id}");
144
145        if self.server_addrs.server_addr.is_some() {
146            self.connect_db(&self.server_addrs, id).await
147        } else {
148            self.build_db();
149            self.setup_wal();
150            let mut db_ctx = GreptimeDBContext::new(self.wal.clone(), self.store_config.clone());
151
152            let server_mode = ServerMode::random_standalone();
153            db_ctx.set_server_mode(server_mode.clone(), SERVER_MODE_STANDALONE_IDX);
154            let server_addr = server_mode.server_addr().unwrap();
155            let server_process = self.start_server(server_mode, &db_ctx, id, true).await;
156
157            let mut greptimedb = self.connect_db(&server_addr, id).await;
158            greptimedb.server_processes = Some(Arc::new(Mutex::new(vec![server_process])));
159            greptimedb.is_standalone = true;
160            greptimedb.ctx = db_ctx;
161
162            greptimedb
163        }
164    }
165
166    async fn start_distributed(&self, id: usize) -> GreptimeDB {
167        if self.server_addrs.server_addr.is_some() {
168            self.connect_db(&self.server_addrs, id).await
169        } else {
170            self.build_db();
171            self.setup_wal();
172            self.setup_etcd();
173            self.setup_pg();
174            self.setup_mysql().await;
175            let mut db_ctx = GreptimeDBContext::new(self.wal.clone(), self.store_config.clone());
176
177            // start a distributed GreptimeDB
178            let meta_server_mode = ServerMode::random_metasrv();
179            let metasrv_port = match &meta_server_mode {
180                ServerMode::Metasrv { rpc_server_addr, .. } => rpc_server_addr
181                    .split(':')
182                    .nth(1)
183                    .unwrap()
184                    .parse::<u16>()
185                    .unwrap(),
186                _ => panic!("metasrv mode not set, maybe running in remote mode which doesn't support restart?"),
187            };
188            db_ctx.set_server_mode(meta_server_mode.clone(), SERVER_MODE_METASRV_IDX);
189            let meta_server = self.start_server(meta_server_mode, &db_ctx, id, true).await;
190
191            let datanode_1_mode = ServerMode::random_datanode(metasrv_port, 0);
192            db_ctx.set_server_mode(datanode_1_mode.clone(), SERVER_MODE_DATANODE_START_IDX);
193            let datanode_1 = self.start_server(datanode_1_mode, &db_ctx, id, true).await;
194            let datanode_2_mode = ServerMode::random_datanode(metasrv_port, 1);
195            db_ctx.set_server_mode(datanode_2_mode.clone(), SERVER_MODE_DATANODE_START_IDX + 1);
196            let datanode_2 = self.start_server(datanode_2_mode, &db_ctx, id, true).await;
197            let datanode_3_mode = ServerMode::random_datanode(metasrv_port, 2);
198            db_ctx.set_server_mode(datanode_3_mode.clone(), SERVER_MODE_DATANODE_START_IDX + 2);
199            let datanode_3 = self.start_server(datanode_3_mode, &db_ctx, id, true).await;
200
201            let frontend_mode = ServerMode::random_frontend(metasrv_port);
202            let server_addr = frontend_mode.server_addr().unwrap();
203            db_ctx.set_server_mode(frontend_mode.clone(), SERVER_MODE_FRONTEND_IDX);
204            let frontend = self.start_server(frontend_mode, &db_ctx, id, true).await;
205
206            let flownode_mode = ServerMode::random_flownode(metasrv_port, 0);
207            db_ctx.set_server_mode(flownode_mode.clone(), SERVER_MODE_FLOWNODE_IDX);
208            let flownode = self.start_server(flownode_mode, &db_ctx, id, true).await;
209
210            let mut greptimedb = self.connect_db(&server_addr, id).await;
211
212            greptimedb.metasrv_process = Some(meta_server).into();
213            greptimedb.server_processes = Some(Arc::new(Mutex::new(vec![
214                datanode_1, datanode_2, datanode_3,
215            ])));
216            greptimedb.frontend_process = Some(frontend).into();
217            greptimedb.flownode_process = Some(flownode).into();
218            greptimedb.is_standalone = false;
219            greptimedb.ctx = db_ctx;
220
221            greptimedb
222        }
223    }
224
225    async fn create_pg_client(&self, pg_server_addr: &str) -> PgClient {
226        let sockaddr: SocketAddr = pg_server_addr.parse().expect(
227            "Failed to parse the Postgres server address. Please check if the address is in the format of `ip:port`.",
228        );
229        let mut config = tokio_postgres::config::Config::new();
230        config.host(sockaddr.ip().to_string());
231        config.port(sockaddr.port());
232        config.dbname(DEFAULT_SCHEMA_NAME);
233
234        // retry to connect to Postgres server until success
235        const MAX_RETRY: usize = 3;
236        let mut backoff = Duration::from_millis(500);
237        for _ in 0..MAX_RETRY {
238            if let Ok((pg_client, conn)) = config.connect(tokio_postgres::NoTls).await {
239                tokio::spawn(conn);
240                return pg_client;
241            }
242            tokio::time::sleep(backoff).await;
243            backoff *= 2;
244        }
245        panic!("Failed to connect to Postgres server. Please check if the server is running.");
246    }
247
248    async fn create_mysql_client(&self, mysql_server_addr: &str) -> MySqlClient {
249        let sockaddr: SocketAddr = mysql_server_addr.parse().expect(
250            "Failed to parse the MySQL server address. Please check if the address is in the format of `ip:port`.",
251        );
252        let ops = mysql::OptsBuilder::new()
253            .ip_or_hostname(Some(sockaddr.ip().to_string()))
254            .tcp_port(sockaddr.port())
255            .db_name(Some(DEFAULT_SCHEMA_NAME));
256        // retry to connect to MySQL server until success
257        const MAX_RETRY: usize = 3;
258        let mut backoff = Duration::from_millis(500);
259
260        for _ in 0..MAX_RETRY {
261            // exponential backoff
262            if let Ok(client) = mysql::Conn::new(ops.clone()) {
263                return client;
264            }
265            tokio::time::sleep(backoff).await;
266            backoff *= 2;
267        }
268
269        panic!("Failed to connect to MySQL server. Please check if the server is running.")
270    }
271
272    async fn connect_db(&self, server_addr: &ServerAddr, id: usize) -> GreptimeDB {
273        let grpc_server_addr = server_addr.server_addr.clone().unwrap();
274        let pg_server_addr = server_addr.pg_server_addr.clone().unwrap();
275        let mysql_server_addr = server_addr.mysql_server_addr.clone().unwrap();
276
277        let grpc_client = Client::with_urls(vec![grpc_server_addr.clone()]);
278        let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, grpc_client);
279        let pg_client = self.create_pg_client(&pg_server_addr).await;
280        let mysql_client = self.create_mysql_client(&mysql_server_addr).await;
281
282        GreptimeDB {
283            grpc_client: TokioMutex::new(db),
284            pg_client: TokioMutex::new(pg_client),
285            mysql_client: TokioMutex::new(mysql_client),
286            server_processes: None,
287            metasrv_process: None.into(),
288            frontend_process: None.into(),
289            flownode_process: None.into(),
290            ctx: GreptimeDBContext {
291                time: 0,
292                datanode_id: Default::default(),
293                wal: self.wal.clone(),
294                store_config: self.store_config.clone(),
295                server_modes: Vec::new(),
296            },
297            is_standalone: false,
298            env: self.clone(),
299            id,
300        }
301    }
302
303    fn stop_server(process: &mut Child) {
304        let _ = process.kill();
305        let _ = process.wait();
306    }
307
308    async fn start_server(
309        &self,
310        mode: ServerMode,
311        db_ctx: &GreptimeDBContext,
312        id: usize,
313        truncate_log: bool,
314    ) -> Child {
315        let log_file_name = match mode {
316            ServerMode::Datanode { node_id, .. } => {
317                db_ctx.incr_datanode_id();
318                format!("greptime-{}-sqlness-datanode-{}.log", id, node_id)
319            }
320            ServerMode::Flownode { .. } => format!("greptime-{}-sqlness-flownode.log", id),
321            ServerMode::Frontend { .. } => format!("greptime-{}-sqlness-frontend.log", id),
322            ServerMode::Metasrv { .. } => format!("greptime-{}-sqlness-metasrv.log", id),
323            ServerMode::Standalone { .. } => format!("greptime-{}-sqlness-standalone.log", id),
324        };
325        let stdout_file_name = self.sqlness_home.join(log_file_name).display().to_string();
326
327        println!("DB instance {id} log file at {stdout_file_name}");
328
329        let stdout_file = OpenOptions::new()
330            .create(true)
331            .write(true)
332            .truncate(truncate_log)
333            .append(!truncate_log)
334            .open(stdout_file_name)
335            .unwrap();
336
337        let args = mode.get_args(&self.sqlness_home, self, db_ctx, id);
338        let check_ip_addrs = mode.check_addrs();
339
340        for check_ip_addr in &check_ip_addrs {
341            if util::check_port(check_ip_addr.parse().unwrap(), Duration::from_secs(1)).await {
342                panic!(
343                    "Port {check_ip_addr} is already in use, please check and retry.",
344                    check_ip_addr = check_ip_addr
345                );
346            }
347        }
348
349        let program = PROGRAM;
350
351        let bins_dir = self.bins_dir.lock().unwrap().clone().expect(
352            "GreptimeDB binary is not available. Please pass in the path to the directory that contains the pre-built GreptimeDB binary. Or you may call `self.build_db()` beforehand.",
353        );
354
355        let mut process = Command::new(program)
356            .current_dir(bins_dir.clone())
357            .env("TZ", "UTC")
358            .args(args)
359            .stdout(stdout_file)
360            .spawn()
361            .unwrap_or_else(|error| {
362                panic!(
363                    "Failed to start the DB with subcommand {}, Error: {error}, path: {:?}",
364                    mode.name(),
365                    bins_dir.join(program)
366                );
367            });
368
369        for check_ip_addr in &check_ip_addrs {
370            if !util::check_port(check_ip_addr.parse().unwrap(), Duration::from_secs(10)).await {
371                Env::stop_server(&mut process);
372                panic!("{} doesn't up in 10 seconds, quit.", mode.name())
373            }
374        }
375
376        process
377    }
378
379    /// stop and restart the server process
380    async fn restart_server(&self, db: &GreptimeDB, is_full_restart: bool) {
381        {
382            if let Some(server_process) = db.server_processes.clone() {
383                let mut server_processes = server_process.lock().unwrap();
384                for server_process in server_processes.iter_mut() {
385                    Env::stop_server(server_process);
386                }
387            }
388
389            if is_full_restart {
390                if let Some(mut metasrv_process) =
391                    db.metasrv_process.lock().expect("poisoned lock").take()
392                {
393                    Env::stop_server(&mut metasrv_process);
394                }
395                if let Some(mut frontend_process) =
396                    db.frontend_process.lock().expect("poisoned lock").take()
397                {
398                    Env::stop_server(&mut frontend_process);
399                }
400            }
401
402            if let Some(mut flownode_process) =
403                db.flownode_process.lock().expect("poisoned lock").take()
404            {
405                Env::stop_server(&mut flownode_process);
406            }
407        }
408
409        // check if the server is distributed or standalone
410        let new_server_processes = if db.is_standalone {
411            let server_mode = db
412                .ctx
413                .get_server_mode(SERVER_MODE_STANDALONE_IDX)
414                .cloned()
415                .unwrap();
416            let server_addr = server_mode.server_addr().unwrap();
417            let new_server_process = self.start_server(server_mode, &db.ctx, db.id, false).await;
418
419            *db.pg_client.lock().await = self
420                .create_pg_client(&server_addr.pg_server_addr.unwrap())
421                .await;
422            *db.mysql_client.lock().await = self
423                .create_mysql_client(&server_addr.mysql_server_addr.unwrap())
424                .await;
425            vec![new_server_process]
426        } else {
427            db.ctx.reset_datanode_id();
428            if is_full_restart {
429                let metasrv_mode = db
430                    .ctx
431                    .get_server_mode(SERVER_MODE_METASRV_IDX)
432                    .cloned()
433                    .unwrap();
434                let metasrv = self.start_server(metasrv_mode, &db.ctx, db.id, false).await;
435                db.metasrv_process
436                    .lock()
437                    .expect("lock poisoned")
438                    .replace(metasrv);
439
440                // wait for metasrv to start
441                // since it seems older version of db might take longer to complete election
442                tokio::time::sleep(Duration::from_secs(5)).await;
443            }
444
445            let mut processes = vec![];
446            for i in 0..3 {
447                let datanode_mode = db
448                    .ctx
449                    .get_server_mode(SERVER_MODE_DATANODE_START_IDX + i)
450                    .cloned()
451                    .unwrap();
452                let new_server_process = self
453                    .start_server(datanode_mode, &db.ctx, db.id, false)
454                    .await;
455                processes.push(new_server_process);
456            }
457
458            if is_full_restart {
459                let frontend_mode = db
460                    .ctx
461                    .get_server_mode(SERVER_MODE_FRONTEND_IDX)
462                    .cloned()
463                    .unwrap();
464                let frontend = self
465                    .start_server(frontend_mode, &db.ctx, db.id, false)
466                    .await;
467                db.frontend_process
468                    .lock()
469                    .expect("lock poisoned")
470                    .replace(frontend);
471            }
472
473            let flownode_mode = db
474                .ctx
475                .get_server_mode(SERVER_MODE_FLOWNODE_IDX)
476                .cloned()
477                .unwrap();
478            let flownode = self
479                .start_server(flownode_mode, &db.ctx, db.id, false)
480                .await;
481            db.flownode_process
482                .lock()
483                .expect("lock poisoned")
484                .replace(flownode);
485
486            processes
487        };
488
489        if let Some(server_processes) = db.server_processes.clone() {
490            let mut server_processes = server_processes.lock().unwrap();
491            *server_processes = new_server_processes;
492        }
493    }
494
495    /// Setup kafka wal cluster if needed. The counterpart is in [GreptimeDB::stop].
496    fn setup_wal(&self) {
497        if matches!(self.wal, WalConfig::Kafka { needs_kafka_cluster, .. } if needs_kafka_cluster) {
498            util::setup_wal();
499        }
500    }
501
502    /// Setup etcd if needed.
503    fn setup_etcd(&self) {
504        if self.store_config.setup_etcd {
505            let client_ports = self
506                .store_config
507                .store_addrs
508                .iter()
509                .map(|s| s.split(':').nth(1).unwrap().parse::<u16>().unwrap())
510                .collect::<Vec<_>>();
511            util::setup_etcd(client_ports, None, None);
512        }
513    }
514
515    /// Setup PostgreSql if needed.
516    fn setup_pg(&self) {
517        if self.store_config.setup_pg {
518            let client_ports = self
519                .store_config
520                .store_addrs
521                .iter()
522                .map(|s| s.split(':').nth(1).unwrap().parse::<u16>().unwrap())
523                .collect::<Vec<_>>();
524            let client_port = client_ports.first().unwrap_or(&5432);
525            util::setup_pg(*client_port, None);
526        }
527    }
528
529    /// Setup MySql if needed.
530    async fn setup_mysql(&self) {
531        if self.store_config.setup_mysql {
532            let client_ports = self
533                .store_config
534                .store_addrs
535                .iter()
536                .map(|s| s.split(':').nth(1).unwrap().parse::<u16>().unwrap())
537                .collect::<Vec<_>>();
538            let client_port = client_ports.first().unwrap_or(&3306);
539            util::setup_mysql(*client_port, None);
540
541            // Docker of MySQL starts slowly, so we need to wait for a while
542            tokio::time::sleep(Duration::from_secs(10)).await;
543        }
544    }
545
546    /// Build the DB with `cargo build --bin greptime`
547    fn build_db(&self) {
548        if self.bins_dir.lock().unwrap().is_some() {
549            return;
550        }
551
552        println!("Going to build the DB...");
553        let output = Command::new("cargo")
554            .current_dir(util::get_workspace_root())
555            .args([
556                "build",
557                "--bin",
558                "greptime",
559                "--features",
560                "pg_kvbackend,mysql_kvbackend",
561            ])
562            .output()
563            .expect("Failed to start GreptimeDB");
564        if !output.status.success() {
565            println!("Failed to build GreptimeDB, {}", output.status);
566            println!("Cargo build stdout:");
567            io::stdout().write_all(&output.stdout).unwrap();
568            println!("Cargo build stderr:");
569            io::stderr().write_all(&output.stderr).unwrap();
570            panic!();
571        }
572
573        let _ = self
574            .bins_dir
575            .lock()
576            .unwrap()
577            .insert(util::get_binary_dir("debug"));
578    }
579}
580
581pub struct GreptimeDB {
582    server_processes: Option<Arc<Mutex<Vec<Child>>>>,
583    metasrv_process: Mutex<Option<Child>>,
584    frontend_process: Mutex<Option<Child>>,
585    flownode_process: Mutex<Option<Child>>,
586    grpc_client: TokioMutex<DB>,
587    pg_client: TokioMutex<PgClient>,
588    mysql_client: TokioMutex<MySqlClient>,
589    ctx: GreptimeDBContext,
590    is_standalone: bool,
591    env: Env,
592    id: usize,
593}
594
595impl GreptimeDB {
596    async fn postgres_query(&self, _ctx: QueryContext, query: String) -> Box<dyn Display> {
597        let client = self.pg_client.lock().await;
598        match client.simple_query(&query).await {
599            Ok(rows) => Box::new(PostgresqlFormatter { rows }),
600            Err(e) => Box::new(format!("Failed to execute query, encountered: {:?}", e)),
601        }
602    }
603
604    async fn mysql_query(&self, _ctx: QueryContext, query: String) -> Box<dyn Display> {
605        let mut conn = self.mysql_client.lock().await;
606        let result = conn.query_iter(query);
607        Box::new(match result {
608            Ok(result) => {
609                let mut rows = vec![];
610                let affected_rows = result.affected_rows();
611                for row in result {
612                    match row {
613                        Ok(r) => rows.push(r),
614                        Err(e) => {
615                            return Box::new(format!("Failed to parse query result, err: {:?}", e))
616                        }
617                    }
618                }
619
620                if rows.is_empty() {
621                    format!("affected_rows: {}", affected_rows)
622                } else {
623                    format!("{}", MysqlFormatter { rows })
624                }
625            }
626            Err(e) => format!("Failed to execute query, err: {:?}", e),
627        })
628    }
629
630    async fn grpc_query(&self, _ctx: QueryContext, query: String) -> Box<dyn Display> {
631        let mut client = self.grpc_client.lock().await;
632
633        let query_str = query.trim().to_lowercase();
634
635        if query_str.starts_with("use ") {
636            // use [db]
637            let database = query
638                .split_ascii_whitespace()
639                .nth(1)
640                .expect("Illegal `USE` statement: expecting a database.")
641                .trim_end_matches(';');
642            client.set_schema(database);
643            Box::new(ResultDisplayer {
644                result: Ok(Output::new_with_affected_rows(0)),
645            }) as _
646        } else if query_str.starts_with("set time_zone")
647            || query_str.starts_with("set session time_zone")
648            || query_str.starts_with("set local time_zone")
649        {
650            // set time_zone='xxx'
651            let timezone = query
652                .split('=')
653                .nth(1)
654                .expect("Illegal `SET TIMEZONE` statement: expecting a timezone expr.")
655                .trim()
656                .strip_prefix('\'')
657                .unwrap()
658                .strip_suffix("';")
659                .unwrap();
660
661            client.set_timezone(timezone);
662
663            Box::new(ResultDisplayer {
664                result: Ok(Output::new_with_affected_rows(0)),
665            }) as _
666        } else {
667            let mut result = client.sql(&query).await;
668            if let Ok(Output {
669                data: OutputData::Stream(stream),
670                ..
671            }) = result
672            {
673                match RecordBatches::try_collect(stream).await {
674                    Ok(recordbatches) => {
675                        result = Ok(Output::new_with_record_batches(recordbatches));
676                    }
677                    Err(e) => {
678                        let status_code = e.status_code();
679                        let msg = e.output_msg();
680                        result = ServerSnafu {
681                            code: status_code,
682                            msg,
683                        }
684                        .fail();
685                    }
686                }
687            }
688            Box::new(ResultDisplayer { result }) as _
689        }
690    }
691}
692
693#[async_trait]
694impl Database for GreptimeDB {
695    async fn query(&self, ctx: QueryContext, query: String) -> Box<dyn Display> {
696        if ctx.context.contains_key("restart") && self.env.server_addrs.server_addr.is_none() {
697            self.env.restart_server(self, false).await;
698        } else if let Some(version) = ctx.context.get("version") {
699            let version_bin_dir = self
700                .env
701                .versioned_bins_dirs
702                .lock()
703                .expect("lock poison")
704                .get(version.as_str())
705                .cloned();
706
707            match version_bin_dir {
708                Some(path) if path.clone().join(PROGRAM).is_file() => {
709                    // use version in versioned_bins_dirs
710                    *self.env.bins_dir.lock().unwrap() = Some(path.clone());
711                }
712                _ => {
713                    // use version in dir files
714                    maybe_pull_binary(version, self.env.pull_version_on_need).await;
715                    let root = get_workspace_root();
716                    let new_path = PathBuf::from_iter([&root, version]);
717                    *self.env.bins_dir.lock().unwrap() = Some(new_path);
718                }
719            }
720
721            self.env.restart_server(self, true).await;
722            // sleep for a while to wait for the server to fully boot up
723            tokio::time::sleep(Duration::from_secs(5)).await;
724        }
725
726        if let Some(protocol) = ctx.context.get(PROTOCOL_KEY) {
727            // protocol is bound to be either "mysql" or "postgres"
728            if protocol == MYSQL {
729                self.mysql_query(ctx, query).await
730            } else {
731                self.postgres_query(ctx, query).await
732            }
733        } else {
734            self.grpc_query(ctx, query).await
735        }
736    }
737}
738
739impl GreptimeDB {
740    fn stop(&mut self) {
741        if let Some(server_processes) = self.server_processes.clone() {
742            let mut server_processes = server_processes.lock().unwrap();
743            for mut server_process in server_processes.drain(..) {
744                Env::stop_server(&mut server_process);
745                println!(
746                    "Standalone or Datanode (pid = {}) is stopped",
747                    server_process.id()
748                );
749            }
750        }
751        if let Some(mut metasrv) = self
752            .metasrv_process
753            .lock()
754            .expect("someone else panic when holding lock")
755            .take()
756        {
757            Env::stop_server(&mut metasrv);
758            println!("Metasrv (pid = {}) is stopped", metasrv.id());
759        }
760        if let Some(mut frontend) = self
761            .frontend_process
762            .lock()
763            .expect("someone else panic when holding lock")
764            .take()
765        {
766            Env::stop_server(&mut frontend);
767            println!("Frontend (pid = {}) is stopped", frontend.id());
768        }
769        if let Some(mut flownode) = self
770            .flownode_process
771            .lock()
772            .expect("someone else panic when holding lock")
773            .take()
774        {
775            Env::stop_server(&mut flownode);
776            println!("Flownode (pid = {}) is stopped", flownode.id());
777        }
778        if matches!(self.ctx.wal, WalConfig::Kafka { needs_kafka_cluster, .. } if needs_kafka_cluster)
779        {
780            util::teardown_wal();
781        }
782    }
783}
784
785impl Drop for GreptimeDB {
786    fn drop(&mut self) {
787        if self.env.server_addrs.server_addr.is_none() {
788            self.stop();
789        }
790    }
791}
792
793pub struct GreptimeDBContext {
794    /// Start time in millisecond
795    time: i64,
796    datanode_id: AtomicU32,
797    wal: WalConfig,
798    store_config: StoreConfig,
799    server_modes: Vec<ServerMode>,
800}
801
802impl GreptimeDBContext {
803    pub fn new(wal: WalConfig, store_config: StoreConfig) -> Self {
804        Self {
805            time: common_time::util::current_time_millis(),
806            datanode_id: AtomicU32::new(0),
807            wal,
808            store_config,
809            server_modes: Vec::new(),
810        }
811    }
812
813    pub(crate) fn time(&self) -> i64 {
814        self.time
815    }
816
817    pub fn is_raft_engine(&self) -> bool {
818        matches!(self.wal, WalConfig::RaftEngine)
819    }
820
821    pub fn kafka_wal_broker_endpoints(&self) -> String {
822        match &self.wal {
823            WalConfig::RaftEngine => String::new(),
824            WalConfig::Kafka {
825                broker_endpoints, ..
826            } => serde_json::to_string(&broker_endpoints).unwrap(),
827        }
828    }
829
830    fn incr_datanode_id(&self) {
831        let _ = self.datanode_id.fetch_add(1, Ordering::Relaxed);
832    }
833
834    fn reset_datanode_id(&self) {
835        self.datanode_id.store(0, Ordering::Relaxed);
836    }
837
838    pub(crate) fn store_config(&self) -> StoreConfig {
839        self.store_config.clone()
840    }
841
842    fn set_server_mode(&mut self, mode: ServerMode, idx: usize) {
843        if idx >= self.server_modes.len() {
844            self.server_modes.resize(idx + 1, mode.clone());
845        }
846        self.server_modes[idx] = mode;
847    }
848
849    fn get_server_mode(&self, idx: usize) -> Option<&ServerMode> {
850        self.server_modes.get(idx)
851    }
852}
853
854struct ResultDisplayer {
855    result: Result<Output, ClientError>,
856}
857
858impl Display for ResultDisplayer {
859    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
860        match &self.result {
861            Ok(result) => match &result.data {
862                OutputData::AffectedRows(rows) => {
863                    write!(f, "Affected Rows: {rows}")
864                }
865                OutputData::RecordBatches(recordbatches) => {
866                    let pretty = recordbatches.pretty_print().map_err(|e| e.to_string());
867                    match pretty {
868                        Ok(s) => write!(f, "{s}"),
869                        Err(e) => {
870                            write!(f, "Failed to pretty format {recordbatches:?}, error: {e}")
871                        }
872                    }
873                }
874                OutputData::Stream(_) => unreachable!(),
875            },
876            Err(e) => {
877                let status_code = e.status_code();
878                let root_cause = e.output_msg();
879                write!(
880                    f,
881                    "Error: {}({status_code}), {root_cause}",
882                    status_code as u32
883                )
884            }
885        }
886    }
887}
888
889struct PostgresqlFormatter {
890    pub rows: Vec<PgRow>,
891}
892
893impl Display for PostgresqlFormatter {
894    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
895        if self.rows.is_empty() {
896            return f.write_fmt(format_args!("(Empty response)"));
897        }
898
899        // create schema
900        let schema = match &self.rows[0] {
901            PgRow::CommandComplete(affected_rows) => {
902                write!(
903                    f,
904                    "{}",
905                    ResultDisplayer {
906                        result: Ok(Output::new_with_affected_rows(*affected_rows as usize)),
907                    }
908                )?;
909                return Ok(());
910            }
911            PgRow::RowDescription(desc) => Arc::new(Schema::new(
912                desc.iter()
913                    .map(|column| {
914                        ColumnSchema::new(column.name(), ConcreteDataType::string_datatype(), false)
915                    })
916                    .collect(),
917            )),
918            _ => unreachable!(),
919        };
920        if schema.num_columns() == 0 {
921            return Ok(());
922        }
923
924        // convert to string vectors
925        let mut columns: Vec<StringVectorBuilder> = (0..schema.num_columns())
926            .map(|_| StringVectorBuilder::with_capacity(schema.num_columns()))
927            .collect();
928        for row in self.rows.iter().skip(1) {
929            if let PgRow::Row(row) = row {
930                for (i, column) in columns.iter_mut().enumerate().take(schema.num_columns()) {
931                    column.push(row.get(i));
932                }
933            }
934        }
935        let columns: Vec<VectorRef> = columns
936            .into_iter()
937            .map(|mut col| Arc::new(col.finish()) as VectorRef)
938            .collect();
939
940        // construct recordbatch
941        let recordbatches = RecordBatches::try_from_columns(schema, columns)
942            .expect("Failed to construct recordbatches from columns. Please check the schema.");
943        let result_displayer = ResultDisplayer {
944            result: Ok(Output::new_with_record_batches(recordbatches)),
945        };
946        write!(f, "{}", result_displayer)?;
947
948        Ok(())
949    }
950}
951
952struct MysqlFormatter {
953    pub rows: Vec<MySqlRow>,
954}
955
956impl Display for MysqlFormatter {
957    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
958        if self.rows.is_empty() {
959            return f.write_fmt(format_args!("(Empty response)"));
960        }
961        // create schema
962        let head_column = &self.rows[0];
963        let head_binding = head_column.columns();
964        let names = head_binding
965            .iter()
966            .map(|column| column.name_str())
967            .collect::<Vec<Cow<str>>>();
968        let schema = Arc::new(Schema::new(
969            names
970                .iter()
971                .map(|name| {
972                    ColumnSchema::new(name.to_string(), ConcreteDataType::string_datatype(), false)
973                })
974                .collect(),
975        ));
976
977        // convert to string vectors
978        let mut columns: Vec<StringVectorBuilder> = (0..schema.num_columns())
979            .map(|_| StringVectorBuilder::with_capacity(schema.num_columns()))
980            .collect();
981        for row in self.rows.iter() {
982            for (i, name) in names.iter().enumerate() {
983                columns[i].push(row.get::<String, &str>(name).as_deref());
984            }
985        }
986        let columns: Vec<VectorRef> = columns
987            .into_iter()
988            .map(|mut col| Arc::new(col.finish()) as VectorRef)
989            .collect();
990
991        // construct recordbatch
992        let recordbatches = RecordBatches::try_from_columns(schema, columns)
993            .expect("Failed to construct recordbatches from columns. Please check the schema.");
994        let result_displayer = ResultDisplayer {
995            result: Ok(Output::new_with_record_batches(recordbatches)),
996        };
997        write!(f, "{}", result_displayer)?;
998
999        Ok(())
1000    }
1001}