sqlness_runner/cmd/
bare.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::path::PathBuf;
16use std::sync::Arc;
17
18use clap::{Parser, ValueEnum};
19use sqlness::interceptor::Registry;
20use sqlness::{ConfigBuilder, Runner};
21
22use crate::cmd::SqlnessConfig;
23use crate::env::bare::{Env, ServiceProvider, StoreConfig, WalConfig};
24use crate::{protocol_interceptor, util};
25
26#[derive(ValueEnum, Debug, Clone)]
27#[clap(rename_all = "snake_case")]
28enum Wal {
29    RaftEngine,
30    Kafka,
31}
32
33// add a group to ensure that all server addresses are set together
34#[derive(clap::Args, Debug, Clone, Default)]
35pub(crate) struct ServerAddr {
36    /// Address of the grpc server.
37    #[clap(short, long)]
38    pub(crate) server_addr: Option<String>,
39
40    /// Address of the postgres server. Must be set if server_addr is set.
41    #[clap(short, long, requires = "server_addr")]
42    pub(crate) pg_server_addr: Option<String>,
43
44    /// Address of the mysql server. Must be set if server_addr is set.
45    #[clap(short, long, requires = "server_addr")]
46    pub(crate) mysql_server_addr: Option<String>,
47}
48
49#[derive(Debug, Parser)]
50/// Run sqlness tests in bare mode.
51pub struct BareCommand {
52    #[clap(flatten)]
53    config: SqlnessConfig,
54
55    /// Addresses of the server.
56    #[command(flatten)]
57    server_addr: ServerAddr,
58
59    /// The type of Wal.
60    #[clap(short, long, default_value = "raft_engine")]
61    wal: Wal,
62
63    /// The kafka wal broker endpoints. This config will suppress sqlness runner
64    /// from starting a kafka cluster, and use the given endpoint as kafka backend.
65    #[clap(short, long)]
66    kafka_wal_broker_endpoints: Option<String>,
67
68    /// The path to the directory where GreptimeDB's binaries resides.
69    /// If not set, sqlness will build GreptimeDB on the fly.
70    #[clap(long)]
71    bins_dir: Option<PathBuf>,
72
73    /// Preserve persistent state in the temporary directory.
74    /// This may affect future test runs.
75    #[clap(long)]
76    preserve_state: bool,
77
78    /// Pull Different versions of GreptimeDB on need.
79    #[clap(long, default_value = "true")]
80    pull_version_on_need: bool,
81
82    /// The store addresses for metadata, if empty, will use memory store.
83    #[clap(long)]
84    store_addrs: Vec<String>,
85
86    /// Whether to setup etcd, by default it is false.
87    #[clap(long, default_value = "false")]
88    setup_etcd: bool,
89
90    /// Whether to setup pg, by default it is false.
91    #[clap(long, default_missing_value = "", num_args(0..=1))]
92    setup_pg: Option<ServiceProvider>,
93
94    /// Whether to setup mysql, by default it is false.
95    #[clap(long, default_missing_value = "", num_args(0..=1))]
96    setup_mysql: Option<ServiceProvider>,
97
98    /// The number of jobs to run in parallel. Default to half of the cores.
99    #[clap(short, long, default_value = "0")]
100    jobs: usize,
101
102    /// Extra command line arguments when starting GreptimeDB binaries.
103    #[clap(long)]
104    extra_args: Vec<String>,
105
106    /// Enable flat format for storage engine (sets default_experimental_flat_format = true).
107    #[clap(long, default_value = "false")]
108    enable_flat_format: bool,
109}
110
111impl BareCommand {
112    pub async fn run(mut self) {
113        let temp_dir = tempfile::Builder::new()
114            .prefix("sqlness")
115            .tempdir()
116            .unwrap();
117        let sqlness_home = temp_dir.keep();
118
119        let mut interceptor_registry: Registry = Default::default();
120        interceptor_registry.register(
121            protocol_interceptor::PREFIX,
122            Arc::new(protocol_interceptor::ProtocolInterceptorFactory),
123        );
124
125        if let Some(d) = &self.config.case_dir
126            && !d.is_dir()
127        {
128            panic!("{} is not a directory", d.display());
129        }
130        if self.jobs == 0 {
131            self.jobs = num_cpus::get() / 2;
132        }
133
134        // normalize parallelism to 1 if any of the following conditions are met:
135        // Note: parallelism in pg and mysql is possible, but need configuration.
136        if self.server_addr.server_addr.is_some()
137            || self.setup_etcd
138            || self.setup_pg.is_some()
139            || self.setup_mysql.is_some()
140            || self.kafka_wal_broker_endpoints.is_some()
141            || self.config.test_filter != ".*"
142        {
143            self.jobs = 1;
144            println!(
145                "Normalizing parallelism to 1 due to server addresses, etcd/pg/mysql setup, or test filter usage"
146            );
147        }
148
149        let config = ConfigBuilder::default()
150            .case_dir(util::get_case_dir(self.config.case_dir))
151            .fail_fast(self.config.fail_fast)
152            .test_filter(self.config.test_filter)
153            .follow_links(true)
154            .env_config_file(self.config.env_config_file)
155            .interceptor_registry(interceptor_registry)
156            .parallelism(self.jobs)
157            .build()
158            .unwrap();
159
160        let wal = match self.wal {
161            Wal::RaftEngine => WalConfig::RaftEngine,
162            Wal::Kafka => WalConfig::Kafka {
163                needs_kafka_cluster: self.kafka_wal_broker_endpoints.is_none(),
164                broker_endpoints: self
165                    .kafka_wal_broker_endpoints
166                    .map(|s| s.split(',').map(|s| s.to_string()).collect())
167                    // otherwise default to the same port in `kafka-cluster.yml`
168                    .unwrap_or(vec!["127.0.0.1:9092".to_string()]),
169            },
170        };
171
172        let store = StoreConfig {
173            store_addrs: self.store_addrs.clone(),
174            setup_etcd: self.setup_etcd,
175            setup_pg: self.setup_pg,
176            setup_mysql: self.setup_mysql,
177            enable_flat_format: self.enable_flat_format,
178        };
179
180        let runner = Runner::new(
181            config,
182            Env::new(
183                sqlness_home.clone(),
184                self.server_addr,
185                wal,
186                self.pull_version_on_need,
187                self.bins_dir,
188                store,
189                self.extra_args,
190            ),
191        );
192        match runner.run().await {
193            Ok(_) => println!("\x1b[32mAll sqlness tests passed!\x1b[0m"),
194            Err(e) => {
195                println!("\x1b[31mTest failed: {}\x1b[0m", e);
196                std::process::exit(1);
197            }
198        }
199
200        // clean up and exit
201        if !self.preserve_state {
202            if self.setup_etcd {
203                println!("Stopping etcd");
204                util::stop_rm_etcd();
205            }
206            // TODO(weny): remove postgre and mysql containers
207            println!("Removing state in {:?}", sqlness_home);
208            tokio::fs::remove_dir_all(sqlness_home).await.unwrap();
209        }
210    }
211}