sqlness_runner/cmd/
bare.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::path::PathBuf;
16use std::sync::Arc;
17
18use clap::{Parser, ValueEnum};
19use sqlness::interceptor::Registry;
20use sqlness::{ConfigBuilder, Runner};
21
22use crate::cmd::SqlnessConfig;
23use crate::env::bare::{Env, ServiceProvider, StoreConfig, WalConfig};
24use crate::{protocol_interceptor, util};
25
26#[derive(ValueEnum, Debug, Clone)]
27#[clap(rename_all = "snake_case")]
28enum Wal {
29    RaftEngine,
30    Kafka,
31}
32
33// add a group to ensure that all server addresses are set together
34#[derive(clap::Args, Debug, Clone, Default)]
35pub(crate) struct ServerAddr {
36    /// Address of the grpc server.
37    #[clap(short, long)]
38    pub(crate) server_addr: Option<String>,
39
40    /// Address of the postgres server. Must be set if server_addr is set.
41    #[clap(short, long, requires = "server_addr")]
42    pub(crate) pg_server_addr: Option<String>,
43
44    /// Address of the mysql server. Must be set if server_addr is set.
45    #[clap(short, long, requires = "server_addr")]
46    pub(crate) mysql_server_addr: Option<String>,
47}
48
49#[derive(Debug, Parser)]
50/// Run sqlness tests in bare mode.
51pub struct BareCommand {
52    #[clap(flatten)]
53    config: SqlnessConfig,
54
55    /// Addresses of the server.
56    #[command(flatten)]
57    server_addr: ServerAddr,
58
59    /// The type of Wal.
60    #[clap(short, long, default_value = "raft_engine")]
61    wal: Wal,
62
63    /// The kafka wal broker endpoints. This config will suppress sqlness runner
64    /// from starting a kafka cluster, and use the given endpoint as kafka backend.
65    #[clap(short, long)]
66    kafka_wal_broker_endpoints: Option<String>,
67
68    /// The path to the directory where GreptimeDB's binaries resides.
69    /// If not set, sqlness will build GreptimeDB on the fly.
70    #[clap(long)]
71    bins_dir: Option<PathBuf>,
72
73    /// Preserve persistent state in the temporary directory.
74    /// This may affect future test runs.
75    #[clap(long)]
76    preserve_state: bool,
77
78    /// Pull Different versions of GreptimeDB on need.
79    #[clap(long, default_value = "true")]
80    pull_version_on_need: bool,
81
82    /// The store addresses for metadata, if empty, will use memory store.
83    #[clap(long)]
84    store_addrs: Vec<String>,
85
86    /// Whether to setup etcd, by default it is false.
87    #[clap(long, default_value = "false")]
88    setup_etcd: bool,
89
90    /// Whether to setup pg, by default it is false.
91    #[clap(long, default_missing_value = "", num_args(0..=1))]
92    setup_pg: Option<ServiceProvider>,
93
94    /// Whether to setup mysql, by default it is false.
95    #[clap(long, default_missing_value = "", num_args(0..=1))]
96    setup_mysql: Option<ServiceProvider>,
97
98    /// The number of jobs to run in parallel. Default to half of the cores.
99    #[clap(short, long, default_value = "0")]
100    jobs: usize,
101
102    /// Extra command line arguments when starting GreptimeDB binaries.
103    #[clap(long)]
104    extra_args: Vec<String>,
105}
106
107impl BareCommand {
108    pub async fn run(mut self) {
109        let temp_dir = tempfile::Builder::new()
110            .prefix("sqlness")
111            .tempdir()
112            .unwrap();
113        let sqlness_home = temp_dir.keep();
114
115        let mut interceptor_registry: Registry = Default::default();
116        interceptor_registry.register(
117            protocol_interceptor::PREFIX,
118            Arc::new(protocol_interceptor::ProtocolInterceptorFactory),
119        );
120
121        if let Some(d) = &self.config.case_dir
122            && !d.is_dir()
123        {
124            panic!("{} is not a directory", d.display());
125        }
126        if self.jobs == 0 {
127            self.jobs = num_cpus::get() / 2;
128        }
129
130        // normalize parallelism to 1 if any of the following conditions are met:
131        // Note: parallelism in pg and mysql is possible, but need configuration.
132        if self.server_addr.server_addr.is_some()
133            || self.setup_etcd
134            || self.setup_pg.is_some()
135            || self.setup_mysql.is_some()
136            || self.kafka_wal_broker_endpoints.is_some()
137            || self.config.test_filter != ".*"
138        {
139            self.jobs = 1;
140            println!(
141                "Normalizing parallelism to 1 due to server addresses, etcd/pg/mysql setup, or test filter usage"
142            );
143        }
144
145        let config = ConfigBuilder::default()
146            .case_dir(util::get_case_dir(self.config.case_dir))
147            .fail_fast(self.config.fail_fast)
148            .test_filter(self.config.test_filter)
149            .follow_links(true)
150            .env_config_file(self.config.env_config_file)
151            .interceptor_registry(interceptor_registry)
152            .parallelism(self.jobs)
153            .build()
154            .unwrap();
155
156        let wal = match self.wal {
157            Wal::RaftEngine => WalConfig::RaftEngine,
158            Wal::Kafka => WalConfig::Kafka {
159                needs_kafka_cluster: self.kafka_wal_broker_endpoints.is_none(),
160                broker_endpoints: self
161                    .kafka_wal_broker_endpoints
162                    .map(|s| s.split(',').map(|s| s.to_string()).collect())
163                    // otherwise default to the same port in `kafka-cluster.yml`
164                    .unwrap_or(vec!["127.0.0.1:9092".to_string()]),
165            },
166        };
167
168        let store = StoreConfig {
169            store_addrs: self.store_addrs.clone(),
170            setup_etcd: self.setup_etcd,
171            setup_pg: self.setup_pg,
172            setup_mysql: self.setup_mysql,
173        };
174
175        let runner = Runner::new(
176            config,
177            Env::new(
178                sqlness_home.clone(),
179                self.server_addr,
180                wal,
181                self.pull_version_on_need,
182                self.bins_dir,
183                store,
184                self.extra_args,
185            ),
186        );
187        match runner.run().await {
188            Ok(_) => println!("\x1b[32mAll sqlness tests passed!\x1b[0m"),
189            Err(e) => {
190                println!("\x1b[31mTest failed: {}\x1b[0m", e);
191                std::process::exit(1);
192            }
193        }
194
195        // clean up and exit
196        if !self.preserve_state {
197            if self.setup_etcd {
198                println!("Stopping etcd");
199                util::stop_rm_etcd();
200            }
201            // TODO(weny): remove postgre and mysql containers
202            println!("Removing state in {:?}", sqlness_home);
203            tokio::fs::remove_dir_all(sqlness_home).await.unwrap();
204        }
205    }
206}