sqlness_runner/cmd/
bare.rs1use std::path::PathBuf;
16use std::sync::Arc;
17
18use clap::{Parser, ValueEnum};
19use sqlness::interceptor::Registry;
20use sqlness::{ConfigBuilder, Runner};
21
22use crate::cmd::SqlnessConfig;
23use crate::env::bare::{Env, ServiceProvider, StoreConfig, WalConfig};
24use crate::{protocol_interceptor, util};
25
26#[derive(ValueEnum, Debug, Clone)]
27#[clap(rename_all = "snake_case")]
28enum Wal {
29 RaftEngine,
30 Kafka,
31}
32
33#[derive(clap::Args, Debug, Clone, Default)]
35pub(crate) struct ServerAddr {
36 #[clap(short, long)]
38 pub(crate) server_addr: Option<String>,
39
40 #[clap(short, long, requires = "server_addr")]
42 pub(crate) pg_server_addr: Option<String>,
43
44 #[clap(short, long, requires = "server_addr")]
46 pub(crate) mysql_server_addr: Option<String>,
47}
48
49#[derive(Debug, Parser)]
50pub struct BareCommand {
52 #[clap(flatten)]
53 config: SqlnessConfig,
54
55 #[command(flatten)]
57 server_addr: ServerAddr,
58
59 #[clap(short, long, default_value = "raft_engine")]
61 wal: Wal,
62
63 #[clap(short, long)]
66 kafka_wal_broker_endpoints: Option<String>,
67
68 #[clap(long)]
71 bins_dir: Option<PathBuf>,
72
73 #[clap(long)]
76 preserve_state: bool,
77
78 #[clap(long, default_value = "true")]
80 pull_version_on_need: bool,
81
82 #[clap(long)]
84 store_addrs: Vec<String>,
85
86 #[clap(long, default_value = "false")]
88 setup_etcd: bool,
89
90 #[clap(long, default_missing_value = "", num_args(0..=1))]
92 setup_pg: Option<ServiceProvider>,
93
94 #[clap(long, default_missing_value = "", num_args(0..=1))]
96 setup_mysql: Option<ServiceProvider>,
97
98 #[clap(short, long, default_value = "0")]
100 jobs: usize,
101
102 #[clap(long)]
104 extra_args: Vec<String>,
105}
106
107impl BareCommand {
108 pub async fn run(mut self) {
109 let temp_dir = tempfile::Builder::new()
110 .prefix("sqlness")
111 .tempdir()
112 .unwrap();
113 let sqlness_home = temp_dir.keep();
114
115 let mut interceptor_registry: Registry = Default::default();
116 interceptor_registry.register(
117 protocol_interceptor::PREFIX,
118 Arc::new(protocol_interceptor::ProtocolInterceptorFactory),
119 );
120
121 if let Some(d) = &self.config.case_dir
122 && !d.is_dir()
123 {
124 panic!("{} is not a directory", d.display());
125 }
126 if self.jobs == 0 {
127 self.jobs = num_cpus::get() / 2;
128 }
129
130 if self.server_addr.server_addr.is_some()
133 || self.setup_etcd
134 || self.setup_pg.is_some()
135 || self.setup_mysql.is_some()
136 || self.kafka_wal_broker_endpoints.is_some()
137 || self.config.test_filter != ".*"
138 {
139 self.jobs = 1;
140 println!(
141 "Normalizing parallelism to 1 due to server addresses, etcd/pg/mysql setup, or test filter usage"
142 );
143 }
144
145 let config = ConfigBuilder::default()
146 .case_dir(util::get_case_dir(self.config.case_dir))
147 .fail_fast(self.config.fail_fast)
148 .test_filter(self.config.test_filter)
149 .follow_links(true)
150 .env_config_file(self.config.env_config_file)
151 .interceptor_registry(interceptor_registry)
152 .parallelism(self.jobs)
153 .build()
154 .unwrap();
155
156 let wal = match self.wal {
157 Wal::RaftEngine => WalConfig::RaftEngine,
158 Wal::Kafka => WalConfig::Kafka {
159 needs_kafka_cluster: self.kafka_wal_broker_endpoints.is_none(),
160 broker_endpoints: self
161 .kafka_wal_broker_endpoints
162 .map(|s| s.split(',').map(|s| s.to_string()).collect())
163 .unwrap_or(vec!["127.0.0.1:9092".to_string()]),
165 },
166 };
167
168 let store = StoreConfig {
169 store_addrs: self.store_addrs.clone(),
170 setup_etcd: self.setup_etcd,
171 setup_pg: self.setup_pg,
172 setup_mysql: self.setup_mysql,
173 };
174
175 let runner = Runner::new(
176 config,
177 Env::new(
178 sqlness_home.clone(),
179 self.server_addr,
180 wal,
181 self.pull_version_on_need,
182 self.bins_dir,
183 store,
184 self.extra_args,
185 ),
186 );
187 match runner.run().await {
188 Ok(_) => println!("\x1b[32mAll sqlness tests passed!\x1b[0m"),
189 Err(e) => {
190 println!("\x1b[31mTest failed: {}\x1b[0m", e);
191 std::process::exit(1);
192 }
193 }
194
195 if !self.preserve_state {
197 if self.setup_etcd {
198 println!("Stopping etcd");
199 util::stop_rm_etcd();
200 }
201 println!("Removing state in {:?}", sqlness_home);
203 tokio::fs::remove_dir_all(sqlness_home).await.unwrap();
204 }
205 }
206}