sqlness_runner/
env.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::borrow::Cow;
16use std::collections::HashMap;
17use std::fmt::Display;
18use std::fs::OpenOptions;
19use std::io;
20use std::io::Write;
21use std::net::SocketAddr;
22use std::path::{Path, PathBuf};
23use std::process::{Child, Command};
24use std::sync::atomic::{AtomicU32, Ordering};
25use std::sync::{Arc, Mutex};
26use std::time::Duration;
27
28use async_trait::async_trait;
29use client::error::ServerSnafu;
30use client::{
31    Client, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, Database as DB, Error as ClientError,
32};
33use common_error::ext::ErrorExt;
34use common_query::{Output, OutputData};
35use common_recordbatch::RecordBatches;
36use datatypes::data_type::ConcreteDataType;
37use datatypes::scalars::ScalarVectorBuilder;
38use datatypes::schema::{ColumnSchema, Schema};
39use datatypes::vectors::{StringVectorBuilder, VectorRef};
40use mysql::prelude::Queryable;
41use mysql::{Conn as MySqlClient, Row as MySqlRow};
42use sqlness::{Database, EnvController, QueryContext};
43use tokio::sync::Mutex as TokioMutex;
44use tokio_postgres::{Client as PgClient, SimpleQueryMessage as PgRow};
45
46use crate::protocol_interceptor::{MYSQL, PROTOCOL_KEY};
47use crate::server_mode::ServerMode;
48use crate::util::{PROGRAM, get_workspace_root, maybe_pull_binary};
49use crate::{ServerAddr, util};
50
51// standalone mode
52const SERVER_MODE_STANDALONE_IDX: usize = 0;
53// distributed mode
54const SERVER_MODE_METASRV_IDX: usize = 0;
55const SERVER_MODE_DATANODE_START_IDX: usize = 1;
56const SERVER_MODE_FRONTEND_IDX: usize = 4;
57const SERVER_MODE_FLOWNODE_IDX: usize = 5;
58
59#[derive(Clone)]
60pub enum WalConfig {
61    RaftEngine,
62    Kafka {
63        /// Indicates whether the runner needs to start a kafka cluster
64        /// (it might be available in the external system environment).
65        needs_kafka_cluster: bool,
66        broker_endpoints: Vec<String>,
67    },
68}
69
70#[derive(Clone)]
71pub struct StoreConfig {
72    pub store_addrs: Vec<String>,
73    pub setup_etcd: bool,
74    pub setup_pg: bool,
75    pub setup_mysql: bool,
76}
77
78#[derive(Clone)]
79pub struct Env {
80    sqlness_home: PathBuf,
81    server_addrs: ServerAddr,
82    wal: WalConfig,
83
84    /// The path to the directory that contains the pre-built GreptimeDB binary.
85    /// When running in CI, this is expected to be set.
86    /// If not set, this runner will build the GreptimeDB binary itself when needed, and set this field by then.
87    bins_dir: Arc<Mutex<Option<PathBuf>>>,
88    /// The path to the directory that contains the old pre-built GreptimeDB binaries.
89    versioned_bins_dirs: Arc<Mutex<HashMap<String, PathBuf>>>,
90    /// Pull different versions of GreptimeDB on need.
91    pull_version_on_need: bool,
92    /// Store address for metasrv metadata
93    store_config: StoreConfig,
94}
95
96#[async_trait]
97impl EnvController for Env {
98    type DB = GreptimeDB;
99
100    async fn start(&self, mode: &str, id: usize, _config: Option<&Path>) -> Self::DB {
101        if self.server_addrs.server_addr.is_some() && id > 0 {
102            panic!("Parallel test mode is not supported when server address is already set.");
103        }
104
105        unsafe {
106            std::env::set_var("SQLNESS_HOME", self.sqlness_home.display().to_string());
107        }
108        match mode {
109            "standalone" => self.start_standalone(id).await,
110            "distributed" => self.start_distributed(id).await,
111            _ => panic!("Unexpected mode: {mode}"),
112        }
113    }
114
115    /// Stop one [`Database`].
116    async fn stop(&self, _mode: &str, mut database: Self::DB) {
117        database.stop();
118    }
119}
120
121impl Env {
122    pub fn new(
123        data_home: PathBuf,
124        server_addrs: ServerAddr,
125        wal: WalConfig,
126        pull_version_on_need: bool,
127        bins_dir: Option<PathBuf>,
128        store_config: StoreConfig,
129    ) -> Self {
130        Self {
131            sqlness_home: data_home,
132            server_addrs,
133            wal,
134            pull_version_on_need,
135            bins_dir: Arc::new(Mutex::new(bins_dir.clone())),
136            versioned_bins_dirs: Arc::new(Mutex::new(HashMap::from_iter([(
137                "latest".to_string(),
138                bins_dir.clone().unwrap_or(util::get_binary_dir("debug")),
139            )]))),
140            store_config,
141        }
142    }
143
144    async fn start_standalone(&self, id: usize) -> GreptimeDB {
145        println!("Starting standalone instance id: {id}");
146
147        if self.server_addrs.server_addr.is_some() {
148            self.connect_db(&self.server_addrs, id).await
149        } else {
150            self.build_db();
151            self.setup_wal();
152            let mut db_ctx = GreptimeDBContext::new(self.wal.clone(), self.store_config.clone());
153
154            let server_mode = ServerMode::random_standalone();
155            db_ctx.set_server_mode(server_mode.clone(), SERVER_MODE_STANDALONE_IDX);
156            let server_addr = server_mode.server_addr().unwrap();
157            let server_process = self.start_server(server_mode, &db_ctx, id, true).await;
158
159            let mut greptimedb = self.connect_db(&server_addr, id).await;
160            greptimedb.server_processes = Some(Arc::new(Mutex::new(vec![server_process])));
161            greptimedb.is_standalone = true;
162            greptimedb.ctx = db_ctx;
163
164            greptimedb
165        }
166    }
167
168    async fn start_distributed(&self, id: usize) -> GreptimeDB {
169        if self.server_addrs.server_addr.is_some() {
170            self.connect_db(&self.server_addrs, id).await
171        } else {
172            self.build_db();
173            self.setup_wal();
174            self.setup_etcd();
175            self.setup_pg();
176            self.setup_mysql().await;
177            let mut db_ctx = GreptimeDBContext::new(self.wal.clone(), self.store_config.clone());
178
179            // start a distributed GreptimeDB
180            let meta_server_mode = ServerMode::random_metasrv();
181            let metasrv_port = match &meta_server_mode {
182                ServerMode::Metasrv {
183                    rpc_server_addr, ..
184                } => rpc_server_addr
185                    .split(':')
186                    .nth(1)
187                    .unwrap()
188                    .parse::<u16>()
189                    .unwrap(),
190                _ => panic!(
191                    "metasrv mode not set, maybe running in remote mode which doesn't support restart?"
192                ),
193            };
194            db_ctx.set_server_mode(meta_server_mode.clone(), SERVER_MODE_METASRV_IDX);
195            let meta_server = self.start_server(meta_server_mode, &db_ctx, id, true).await;
196
197            let datanode_1_mode = ServerMode::random_datanode(metasrv_port, 0);
198            db_ctx.set_server_mode(datanode_1_mode.clone(), SERVER_MODE_DATANODE_START_IDX);
199            let datanode_1 = self.start_server(datanode_1_mode, &db_ctx, id, true).await;
200            let datanode_2_mode = ServerMode::random_datanode(metasrv_port, 1);
201            db_ctx.set_server_mode(datanode_2_mode.clone(), SERVER_MODE_DATANODE_START_IDX + 1);
202            let datanode_2 = self.start_server(datanode_2_mode, &db_ctx, id, true).await;
203            let datanode_3_mode = ServerMode::random_datanode(metasrv_port, 2);
204            db_ctx.set_server_mode(datanode_3_mode.clone(), SERVER_MODE_DATANODE_START_IDX + 2);
205            let datanode_3 = self.start_server(datanode_3_mode, &db_ctx, id, true).await;
206
207            let frontend_mode = ServerMode::random_frontend(metasrv_port);
208            let server_addr = frontend_mode.server_addr().unwrap();
209            db_ctx.set_server_mode(frontend_mode.clone(), SERVER_MODE_FRONTEND_IDX);
210            let frontend = self.start_server(frontend_mode, &db_ctx, id, true).await;
211
212            let flownode_mode = ServerMode::random_flownode(metasrv_port, 0);
213            db_ctx.set_server_mode(flownode_mode.clone(), SERVER_MODE_FLOWNODE_IDX);
214            let flownode = self.start_server(flownode_mode, &db_ctx, id, true).await;
215
216            let mut greptimedb = self.connect_db(&server_addr, id).await;
217
218            greptimedb.metasrv_process = Some(meta_server).into();
219            greptimedb.server_processes = Some(Arc::new(Mutex::new(vec![
220                datanode_1, datanode_2, datanode_3,
221            ])));
222            greptimedb.frontend_process = Some(frontend).into();
223            greptimedb.flownode_process = Some(flownode).into();
224            greptimedb.is_standalone = false;
225            greptimedb.ctx = db_ctx;
226
227            greptimedb
228        }
229    }
230
231    async fn create_pg_client(&self, pg_server_addr: &str) -> PgClient {
232        let sockaddr: SocketAddr = pg_server_addr.parse().expect(
233            "Failed to parse the Postgres server address. Please check if the address is in the format of `ip:port`.",
234        );
235        let mut config = tokio_postgres::config::Config::new();
236        config.host(sockaddr.ip().to_string());
237        config.port(sockaddr.port());
238        config.dbname(DEFAULT_SCHEMA_NAME);
239
240        // retry to connect to Postgres server until success
241        const MAX_RETRY: usize = 3;
242        let mut backoff = Duration::from_millis(500);
243        for _ in 0..MAX_RETRY {
244            if let Ok((pg_client, conn)) = config.connect(tokio_postgres::NoTls).await {
245                tokio::spawn(conn);
246                return pg_client;
247            }
248            tokio::time::sleep(backoff).await;
249            backoff *= 2;
250        }
251        panic!("Failed to connect to Postgres server. Please check if the server is running.");
252    }
253
254    async fn create_mysql_client(&self, mysql_server_addr: &str) -> MySqlClient {
255        let sockaddr: SocketAddr = mysql_server_addr.parse().expect(
256            "Failed to parse the MySQL server address. Please check if the address is in the format of `ip:port`.",
257        );
258        let ops = mysql::OptsBuilder::new()
259            .ip_or_hostname(Some(sockaddr.ip().to_string()))
260            .tcp_port(sockaddr.port())
261            .db_name(Some(DEFAULT_SCHEMA_NAME));
262        // retry to connect to MySQL server until success
263        const MAX_RETRY: usize = 3;
264        let mut backoff = Duration::from_millis(500);
265
266        for _ in 0..MAX_RETRY {
267            // exponential backoff
268            if let Ok(client) = mysql::Conn::new(ops.clone()) {
269                return client;
270            }
271            tokio::time::sleep(backoff).await;
272            backoff *= 2;
273        }
274
275        panic!("Failed to connect to MySQL server. Please check if the server is running.")
276    }
277
278    async fn connect_db(&self, server_addr: &ServerAddr, id: usize) -> GreptimeDB {
279        let grpc_server_addr = server_addr.server_addr.clone().unwrap();
280        let pg_server_addr = server_addr.pg_server_addr.clone().unwrap();
281        let mysql_server_addr = server_addr.mysql_server_addr.clone().unwrap();
282
283        let grpc_client = Client::with_urls(vec![grpc_server_addr.clone()]);
284        let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, grpc_client);
285        let pg_client = self.create_pg_client(&pg_server_addr).await;
286        let mysql_client = self.create_mysql_client(&mysql_server_addr).await;
287
288        GreptimeDB {
289            grpc_client: TokioMutex::new(db),
290            pg_client: TokioMutex::new(pg_client),
291            mysql_client: TokioMutex::new(mysql_client),
292            server_processes: None,
293            metasrv_process: None.into(),
294            frontend_process: None.into(),
295            flownode_process: None.into(),
296            ctx: GreptimeDBContext {
297                time: 0,
298                datanode_id: Default::default(),
299                wal: self.wal.clone(),
300                store_config: self.store_config.clone(),
301                server_modes: Vec::new(),
302            },
303            is_standalone: false,
304            env: self.clone(),
305            id,
306        }
307    }
308
309    fn stop_server(process: &mut Child) {
310        let _ = process.kill();
311        let _ = process.wait();
312    }
313
314    async fn start_server(
315        &self,
316        mode: ServerMode,
317        db_ctx: &GreptimeDBContext,
318        id: usize,
319        truncate_log: bool,
320    ) -> Child {
321        let log_file_name = match mode {
322            ServerMode::Datanode { node_id, .. } => {
323                db_ctx.incr_datanode_id();
324                format!("greptime-{}-sqlness-datanode-{}.log", id, node_id)
325            }
326            ServerMode::Flownode { .. } => format!("greptime-{}-sqlness-flownode.log", id),
327            ServerMode::Frontend { .. } => format!("greptime-{}-sqlness-frontend.log", id),
328            ServerMode::Metasrv { .. } => format!("greptime-{}-sqlness-metasrv.log", id),
329            ServerMode::Standalone { .. } => format!("greptime-{}-sqlness-standalone.log", id),
330        };
331        let stdout_file_name = self.sqlness_home.join(log_file_name).display().to_string();
332
333        println!("DB instance {id} log file at {stdout_file_name}");
334
335        let stdout_file = OpenOptions::new()
336            .create(true)
337            .write(true)
338            .truncate(truncate_log)
339            .append(!truncate_log)
340            .open(stdout_file_name)
341            .unwrap();
342
343        let args = mode.get_args(&self.sqlness_home, self, db_ctx, id);
344        let check_ip_addrs = mode.check_addrs();
345
346        for check_ip_addr in &check_ip_addrs {
347            if util::check_port(check_ip_addr.parse().unwrap(), Duration::from_secs(1)).await {
348                panic!(
349                    "Port {check_ip_addr} is already in use, please check and retry.",
350                    check_ip_addr = check_ip_addr
351                );
352            }
353        }
354
355        let program = PROGRAM;
356
357        let bins_dir = self.bins_dir.lock().unwrap().clone().expect(
358            "GreptimeDB binary is not available. Please pass in the path to the directory that contains the pre-built GreptimeDB binary. Or you may call `self.build_db()` beforehand.",
359        );
360
361        let abs_bins_dir = bins_dir
362            .canonicalize()
363            .expect("Failed to canonicalize bins_dir");
364
365        let mut process = Command::new(abs_bins_dir.join(program))
366            .current_dir(bins_dir.clone())
367            .env("TZ", "UTC")
368            .args(args)
369            .stdout(stdout_file)
370            .spawn()
371            .unwrap_or_else(|error| {
372                panic!(
373                    "Failed to start the DB with subcommand {}, Error: {error}, path: {:?}",
374                    mode.name(),
375                    bins_dir.join(program)
376                );
377            });
378
379        for check_ip_addr in &check_ip_addrs {
380            if !util::check_port(check_ip_addr.parse().unwrap(), Duration::from_secs(10)).await {
381                Env::stop_server(&mut process);
382                panic!("{} doesn't up in 10 seconds, quit.", mode.name())
383            }
384        }
385
386        process
387    }
388
389    /// stop and restart the server process
390    async fn restart_server(&self, db: &GreptimeDB, is_full_restart: bool) {
391        {
392            if let Some(server_process) = db.server_processes.clone() {
393                let mut server_processes = server_process.lock().unwrap();
394                for server_process in server_processes.iter_mut() {
395                    Env::stop_server(server_process);
396                }
397            }
398
399            if is_full_restart {
400                if let Some(mut metasrv_process) =
401                    db.metasrv_process.lock().expect("poisoned lock").take()
402                {
403                    Env::stop_server(&mut metasrv_process);
404                }
405                if let Some(mut frontend_process) =
406                    db.frontend_process.lock().expect("poisoned lock").take()
407                {
408                    Env::stop_server(&mut frontend_process);
409                }
410            }
411
412            if let Some(mut flownode_process) =
413                db.flownode_process.lock().expect("poisoned lock").take()
414            {
415                Env::stop_server(&mut flownode_process);
416            }
417        }
418
419        // check if the server is distributed or standalone
420        let new_server_processes = if db.is_standalone {
421            let server_mode = db
422                .ctx
423                .get_server_mode(SERVER_MODE_STANDALONE_IDX)
424                .cloned()
425                .unwrap();
426            let server_addr = server_mode.server_addr().unwrap();
427            let new_server_process = self.start_server(server_mode, &db.ctx, db.id, false).await;
428
429            *db.pg_client.lock().await = self
430                .create_pg_client(&server_addr.pg_server_addr.unwrap())
431                .await;
432            *db.mysql_client.lock().await = self
433                .create_mysql_client(&server_addr.mysql_server_addr.unwrap())
434                .await;
435            vec![new_server_process]
436        } else {
437            db.ctx.reset_datanode_id();
438            if is_full_restart {
439                let metasrv_mode = db
440                    .ctx
441                    .get_server_mode(SERVER_MODE_METASRV_IDX)
442                    .cloned()
443                    .unwrap();
444                let metasrv = self.start_server(metasrv_mode, &db.ctx, db.id, false).await;
445                db.metasrv_process
446                    .lock()
447                    .expect("lock poisoned")
448                    .replace(metasrv);
449
450                // wait for metasrv to start
451                // since it seems older version of db might take longer to complete election
452                tokio::time::sleep(Duration::from_secs(5)).await;
453            }
454
455            let mut processes = vec![];
456            for i in 0..3 {
457                let datanode_mode = db
458                    .ctx
459                    .get_server_mode(SERVER_MODE_DATANODE_START_IDX + i)
460                    .cloned()
461                    .unwrap();
462                let new_server_process = self
463                    .start_server(datanode_mode, &db.ctx, db.id, false)
464                    .await;
465                processes.push(new_server_process);
466            }
467
468            if is_full_restart {
469                let frontend_mode = db
470                    .ctx
471                    .get_server_mode(SERVER_MODE_FRONTEND_IDX)
472                    .cloned()
473                    .unwrap();
474                let frontend = self
475                    .start_server(frontend_mode, &db.ctx, db.id, false)
476                    .await;
477                db.frontend_process
478                    .lock()
479                    .expect("lock poisoned")
480                    .replace(frontend);
481            }
482
483            let flownode_mode = db
484                .ctx
485                .get_server_mode(SERVER_MODE_FLOWNODE_IDX)
486                .cloned()
487                .unwrap();
488            let flownode = self
489                .start_server(flownode_mode, &db.ctx, db.id, false)
490                .await;
491            db.flownode_process
492                .lock()
493                .expect("lock poisoned")
494                .replace(flownode);
495
496            processes
497        };
498
499        if let Some(server_processes) = db.server_processes.clone() {
500            let mut server_processes = server_processes.lock().unwrap();
501            *server_processes = new_server_processes;
502        }
503    }
504
505    /// Setup kafka wal cluster if needed. The counterpart is in [GreptimeDB::stop].
506    fn setup_wal(&self) {
507        if matches!(self.wal, WalConfig::Kafka { needs_kafka_cluster, .. } if needs_kafka_cluster) {
508            util::setup_wal();
509        }
510    }
511
512    /// Setup etcd if needed.
513    fn setup_etcd(&self) {
514        if self.store_config.setup_etcd {
515            let client_ports = self
516                .store_config
517                .store_addrs
518                .iter()
519                .map(|s| s.split(':').nth(1).unwrap().parse::<u16>().unwrap())
520                .collect::<Vec<_>>();
521            util::setup_etcd(client_ports, None, None);
522        }
523    }
524
525    /// Setup PostgreSql if needed.
526    fn setup_pg(&self) {
527        if self.store_config.setup_pg {
528            let client_ports = self
529                .store_config
530                .store_addrs
531                .iter()
532                .map(|s| s.split(':').nth(1).unwrap().parse::<u16>().unwrap())
533                .collect::<Vec<_>>();
534            let client_port = client_ports.first().unwrap_or(&5432);
535            util::setup_pg(*client_port, None);
536        }
537    }
538
539    /// Setup MySql if needed.
540    async fn setup_mysql(&self) {
541        if self.store_config.setup_mysql {
542            let client_ports = self
543                .store_config
544                .store_addrs
545                .iter()
546                .map(|s| s.split(':').nth(1).unwrap().parse::<u16>().unwrap())
547                .collect::<Vec<_>>();
548            let client_port = client_ports.first().unwrap_or(&3306);
549            util::setup_mysql(*client_port, None);
550
551            // Docker of MySQL starts slowly, so we need to wait for a while
552            tokio::time::sleep(Duration::from_secs(10)).await;
553        }
554    }
555
556    /// Build the DB with `cargo build --bin greptime`
557    fn build_db(&self) {
558        if self.bins_dir.lock().unwrap().is_some() {
559            return;
560        }
561
562        println!("Going to build the DB...");
563        let output = Command::new("cargo")
564            .current_dir(util::get_workspace_root())
565            .args([
566                "build",
567                "--bin",
568                "greptime",
569                "--features",
570                "pg_kvbackend,mysql_kvbackend",
571            ])
572            .output()
573            .expect("Failed to start GreptimeDB");
574        if !output.status.success() {
575            println!("Failed to build GreptimeDB, {}", output.status);
576            println!("Cargo build stdout:");
577            io::stdout().write_all(&output.stdout).unwrap();
578            println!("Cargo build stderr:");
579            io::stderr().write_all(&output.stderr).unwrap();
580            panic!();
581        }
582
583        let _ = self
584            .bins_dir
585            .lock()
586            .unwrap()
587            .insert(util::get_binary_dir("debug"));
588    }
589}
590
591pub struct GreptimeDB {
592    server_processes: Option<Arc<Mutex<Vec<Child>>>>,
593    metasrv_process: Mutex<Option<Child>>,
594    frontend_process: Mutex<Option<Child>>,
595    flownode_process: Mutex<Option<Child>>,
596    grpc_client: TokioMutex<DB>,
597    pg_client: TokioMutex<PgClient>,
598    mysql_client: TokioMutex<MySqlClient>,
599    ctx: GreptimeDBContext,
600    is_standalone: bool,
601    env: Env,
602    id: usize,
603}
604
605impl GreptimeDB {
606    async fn postgres_query(&self, _ctx: QueryContext, query: String) -> Box<dyn Display> {
607        let client = self.pg_client.lock().await;
608        match client.simple_query(&query).await {
609            Ok(rows) => Box::new(PostgresqlFormatter { rows }),
610            Err(e) => Box::new(format!("Failed to execute query, encountered: {:?}", e)),
611        }
612    }
613
614    async fn mysql_query(&self, _ctx: QueryContext, query: String) -> Box<dyn Display> {
615        let mut conn = self.mysql_client.lock().await;
616        let result = conn.query_iter(query);
617        Box::new(match result {
618            Ok(result) => {
619                let mut rows = vec![];
620                let affected_rows = result.affected_rows();
621                for row in result {
622                    match row {
623                        Ok(r) => rows.push(r),
624                        Err(e) => {
625                            return Box::new(format!("Failed to parse query result, err: {:?}", e));
626                        }
627                    }
628                }
629
630                if rows.is_empty() {
631                    format!("affected_rows: {}", affected_rows)
632                } else {
633                    format!("{}", MysqlFormatter { rows })
634                }
635            }
636            Err(e) => format!("Failed to execute query, err: {:?}", e),
637        })
638    }
639
640    async fn grpc_query(&self, _ctx: QueryContext, query: String) -> Box<dyn Display> {
641        let mut client = self.grpc_client.lock().await;
642
643        let query_str = query.trim().to_lowercase();
644
645        if query_str.starts_with("use ") {
646            // use [db]
647            let database = query
648                .split_ascii_whitespace()
649                .nth(1)
650                .expect("Illegal `USE` statement: expecting a database.")
651                .trim_end_matches(';');
652            client.set_schema(database);
653            Box::new(ResultDisplayer {
654                result: Ok(Output::new_with_affected_rows(0)),
655            }) as _
656        } else if query_str.starts_with("set time_zone")
657            || query_str.starts_with("set session time_zone")
658            || query_str.starts_with("set local time_zone")
659        {
660            // set time_zone='xxx'
661            let timezone = query
662                .split('=')
663                .nth(1)
664                .expect("Illegal `SET TIMEZONE` statement: expecting a timezone expr.")
665                .trim()
666                .strip_prefix('\'')
667                .unwrap()
668                .strip_suffix("';")
669                .unwrap();
670
671            client.set_timezone(timezone);
672
673            Box::new(ResultDisplayer {
674                result: Ok(Output::new_with_affected_rows(0)),
675            }) as _
676        } else {
677            let mut result = client.sql(&query).await;
678            if let Ok(Output {
679                data: OutputData::Stream(stream),
680                ..
681            }) = result
682            {
683                match RecordBatches::try_collect(stream).await {
684                    Ok(recordbatches) => {
685                        result = Ok(Output::new_with_record_batches(recordbatches));
686                    }
687                    Err(e) => {
688                        let status_code = e.status_code();
689                        let msg = e.output_msg();
690                        result = ServerSnafu {
691                            code: status_code,
692                            msg,
693                        }
694                        .fail();
695                    }
696                }
697            }
698            Box::new(ResultDisplayer { result }) as _
699        }
700    }
701}
702
703#[async_trait]
704impl Database for GreptimeDB {
705    async fn query(&self, ctx: QueryContext, query: String) -> Box<dyn Display> {
706        if ctx.context.contains_key("restart") && self.env.server_addrs.server_addr.is_none() {
707            self.env.restart_server(self, false).await;
708        } else if let Some(version) = ctx.context.get("version") {
709            let version_bin_dir = self
710                .env
711                .versioned_bins_dirs
712                .lock()
713                .expect("lock poison")
714                .get(version.as_str())
715                .cloned();
716
717            match version_bin_dir {
718                Some(path) if path.clone().join(PROGRAM).is_file() => {
719                    // use version in versioned_bins_dirs
720                    *self.env.bins_dir.lock().unwrap() = Some(path.clone());
721                }
722                _ => {
723                    // use version in dir files
724                    maybe_pull_binary(version, self.env.pull_version_on_need).await;
725                    let root = get_workspace_root();
726                    let new_path = PathBuf::from_iter([&root, version]);
727                    *self.env.bins_dir.lock().unwrap() = Some(new_path);
728                }
729            }
730
731            self.env.restart_server(self, true).await;
732            // sleep for a while to wait for the server to fully boot up
733            tokio::time::sleep(Duration::from_secs(5)).await;
734        }
735
736        if let Some(protocol) = ctx.context.get(PROTOCOL_KEY) {
737            // protocol is bound to be either "mysql" or "postgres"
738            if protocol == MYSQL {
739                self.mysql_query(ctx, query).await
740            } else {
741                self.postgres_query(ctx, query).await
742            }
743        } else {
744            self.grpc_query(ctx, query).await
745        }
746    }
747}
748
749impl GreptimeDB {
750    fn stop(&mut self) {
751        if let Some(server_processes) = self.server_processes.clone() {
752            let mut server_processes = server_processes.lock().unwrap();
753            for mut server_process in server_processes.drain(..) {
754                Env::stop_server(&mut server_process);
755                println!(
756                    "Standalone or Datanode (pid = {}) is stopped",
757                    server_process.id()
758                );
759            }
760        }
761        if let Some(mut metasrv) = self
762            .metasrv_process
763            .lock()
764            .expect("someone else panic when holding lock")
765            .take()
766        {
767            Env::stop_server(&mut metasrv);
768            println!("Metasrv (pid = {}) is stopped", metasrv.id());
769        }
770        if let Some(mut frontend) = self
771            .frontend_process
772            .lock()
773            .expect("someone else panic when holding lock")
774            .take()
775        {
776            Env::stop_server(&mut frontend);
777            println!("Frontend (pid = {}) is stopped", frontend.id());
778        }
779        if let Some(mut flownode) = self
780            .flownode_process
781            .lock()
782            .expect("someone else panic when holding lock")
783            .take()
784        {
785            Env::stop_server(&mut flownode);
786            println!("Flownode (pid = {}) is stopped", flownode.id());
787        }
788        if matches!(self.ctx.wal, WalConfig::Kafka { needs_kafka_cluster, .. } if needs_kafka_cluster)
789        {
790            util::teardown_wal();
791        }
792    }
793}
794
795impl Drop for GreptimeDB {
796    fn drop(&mut self) {
797        if self.env.server_addrs.server_addr.is_none() {
798            self.stop();
799        }
800    }
801}
802
803pub struct GreptimeDBContext {
804    /// Start time in millisecond
805    time: i64,
806    datanode_id: AtomicU32,
807    wal: WalConfig,
808    store_config: StoreConfig,
809    server_modes: Vec<ServerMode>,
810}
811
812impl GreptimeDBContext {
813    pub fn new(wal: WalConfig, store_config: StoreConfig) -> Self {
814        Self {
815            time: common_time::util::current_time_millis(),
816            datanode_id: AtomicU32::new(0),
817            wal,
818            store_config,
819            server_modes: Vec::new(),
820        }
821    }
822
823    pub(crate) fn time(&self) -> i64 {
824        self.time
825    }
826
827    pub fn is_raft_engine(&self) -> bool {
828        matches!(self.wal, WalConfig::RaftEngine)
829    }
830
831    pub fn kafka_wal_broker_endpoints(&self) -> String {
832        match &self.wal {
833            WalConfig::RaftEngine => String::new(),
834            WalConfig::Kafka {
835                broker_endpoints, ..
836            } => serde_json::to_string(&broker_endpoints).unwrap(),
837        }
838    }
839
840    fn incr_datanode_id(&self) {
841        let _ = self.datanode_id.fetch_add(1, Ordering::Relaxed);
842    }
843
844    fn reset_datanode_id(&self) {
845        self.datanode_id.store(0, Ordering::Relaxed);
846    }
847
848    pub(crate) fn store_config(&self) -> StoreConfig {
849        self.store_config.clone()
850    }
851
852    fn set_server_mode(&mut self, mode: ServerMode, idx: usize) {
853        if idx >= self.server_modes.len() {
854            self.server_modes.resize(idx + 1, mode.clone());
855        }
856        self.server_modes[idx] = mode;
857    }
858
859    fn get_server_mode(&self, idx: usize) -> Option<&ServerMode> {
860        self.server_modes.get(idx)
861    }
862}
863
864struct ResultDisplayer {
865    result: Result<Output, ClientError>,
866}
867
868impl Display for ResultDisplayer {
869    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
870        match &self.result {
871            Ok(result) => match &result.data {
872                OutputData::AffectedRows(rows) => {
873                    write!(f, "Affected Rows: {rows}")
874                }
875                OutputData::RecordBatches(recordbatches) => {
876                    let pretty = recordbatches.pretty_print().map_err(|e| e.to_string());
877                    match pretty {
878                        Ok(s) => write!(f, "{s}"),
879                        Err(e) => {
880                            write!(f, "Failed to pretty format {recordbatches:?}, error: {e}")
881                        }
882                    }
883                }
884                OutputData::Stream(_) => unreachable!(),
885            },
886            Err(e) => {
887                let status_code = e.status_code();
888                let root_cause = e.output_msg();
889                write!(
890                    f,
891                    "Error: {}({status_code}), {root_cause}",
892                    status_code as u32
893                )
894            }
895        }
896    }
897}
898
899struct PostgresqlFormatter {
900    pub rows: Vec<PgRow>,
901}
902
903impl Display for PostgresqlFormatter {
904    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
905        if self.rows.is_empty() {
906            return f.write_fmt(format_args!("(Empty response)"));
907        }
908
909        // create schema
910        let schema = match &self.rows[0] {
911            PgRow::CommandComplete(affected_rows) => {
912                write!(
913                    f,
914                    "{}",
915                    ResultDisplayer {
916                        result: Ok(Output::new_with_affected_rows(*affected_rows as usize)),
917                    }
918                )?;
919                return Ok(());
920            }
921            PgRow::RowDescription(desc) => Arc::new(Schema::new(
922                desc.iter()
923                    .map(|column| {
924                        ColumnSchema::new(column.name(), ConcreteDataType::string_datatype(), false)
925                    })
926                    .collect(),
927            )),
928            _ => unreachable!(),
929        };
930        if schema.num_columns() == 0 {
931            return Ok(());
932        }
933
934        // convert to string vectors
935        let mut columns: Vec<StringVectorBuilder> = (0..schema.num_columns())
936            .map(|_| StringVectorBuilder::with_capacity(schema.num_columns()))
937            .collect();
938        for row in self.rows.iter().skip(1) {
939            if let PgRow::Row(row) = row {
940                for (i, column) in columns.iter_mut().enumerate().take(schema.num_columns()) {
941                    column.push(row.get(i));
942                }
943            }
944        }
945        let columns: Vec<VectorRef> = columns
946            .into_iter()
947            .map(|mut col| Arc::new(col.finish()) as VectorRef)
948            .collect();
949
950        // construct recordbatch
951        let recordbatches = RecordBatches::try_from_columns(schema, columns)
952            .expect("Failed to construct recordbatches from columns. Please check the schema.");
953        let result_displayer = ResultDisplayer {
954            result: Ok(Output::new_with_record_batches(recordbatches)),
955        };
956        write!(f, "{}", result_displayer)?;
957
958        Ok(())
959    }
960}
961
962struct MysqlFormatter {
963    pub rows: Vec<MySqlRow>,
964}
965
966impl Display for MysqlFormatter {
967    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
968        if self.rows.is_empty() {
969            return f.write_fmt(format_args!("(Empty response)"));
970        }
971        // create schema
972        let head_column = &self.rows[0];
973        let head_binding = head_column.columns();
974        let names = head_binding
975            .iter()
976            .map(|column| column.name_str())
977            .collect::<Vec<Cow<str>>>();
978        let schema = Arc::new(Schema::new(
979            names
980                .iter()
981                .map(|name| {
982                    ColumnSchema::new(name.to_string(), ConcreteDataType::string_datatype(), false)
983                })
984                .collect(),
985        ));
986
987        // convert to string vectors
988        let mut columns: Vec<StringVectorBuilder> = (0..schema.num_columns())
989            .map(|_| StringVectorBuilder::with_capacity(schema.num_columns()))
990            .collect();
991        for row in self.rows.iter() {
992            for (i, name) in names.iter().enumerate() {
993                columns[i].push(row.get::<String, &str>(name).as_deref());
994            }
995        }
996        let columns: Vec<VectorRef> = columns
997            .into_iter()
998            .map(|mut col| Arc::new(col.finish()) as VectorRef)
999            .collect();
1000
1001        // construct recordbatch
1002        let recordbatches = RecordBatches::try_from_columns(schema, columns)
1003            .expect("Failed to construct recordbatches from columns. Please check the schema.");
1004        let result_displayer = ResultDisplayer {
1005            result: Ok(Output::new_with_record_batches(recordbatches)),
1006        };
1007        write!(f, "{}", result_displayer)?;
1008
1009        Ok(())
1010    }
1011}