1#![allow(clippy::print_stdout)]
16
17use std::path::PathBuf;
18use std::sync::Arc;
19
20use clap::{Parser, ValueEnum};
21use env::{Env, WalConfig};
22use sqlness::interceptor::Registry;
23use sqlness::{ConfigBuilder, Runner};
24
25use crate::env::StoreConfig;
26
27mod env;
28mod protocol_interceptor;
29mod server_mode;
30mod util;
31
32#[derive(ValueEnum, Debug, Clone)]
33#[clap(rename_all = "snake_case")]
34enum Wal {
35 RaftEngine,
36 Kafka,
37}
38
39#[derive(clap::Args, Debug, Clone, Default)]
41#[group(multiple = true, requires_all=["server_addr", "pg_server_addr", "mysql_server_addr"])]
42struct ServerAddr {
43 #[clap(short, long)]
45 server_addr: Option<String>,
46
47 #[clap(short, long, requires = "server_addr")]
49 pg_server_addr: Option<String>,
50
51 #[clap(short, long, requires = "server_addr")]
53 mysql_server_addr: Option<String>,
54}
55
56#[derive(Parser, Debug)]
57#[clap(author, version, about, long_about = None)]
58struct Args {
60 #[clap(short, long)]
62 case_dir: Option<PathBuf>,
63
64 #[arg(short, long, default_value = "false")]
66 fail_fast: bool,
67
68 #[clap(short, long, default_value = "config.toml")]
70 env_config_file: String,
71
72 #[clap(short, long, default_value = ".*")]
74 test_filter: String,
75
76 #[command(flatten)]
78 server_addr: ServerAddr,
79
80 #[clap(short, long, default_value = "raft_engine")]
82 wal: Wal,
83
84 #[clap(short, long)]
87 kafka_wal_broker_endpoints: Option<String>,
88
89 #[clap(long)]
92 bins_dir: Option<PathBuf>,
93
94 #[clap(long)]
97 preserve_state: bool,
98
99 #[clap(long, default_value = "true")]
101 pull_version_on_need: bool,
102
103 #[clap(long)]
105 store_addrs: Vec<String>,
106
107 #[clap(long, default_value = "false")]
109 setup_etcd: bool,
110
111 #[clap(long, default_value = "false")]
113 setup_pg: bool,
114
115 #[clap(long, default_value = "false")]
117 setup_mysql: bool,
118
119 #[clap(short, long, default_value = "0")]
121 jobs: usize,
122}
123
124#[tokio::main]
125async fn main() {
126 let mut args = Args::parse();
127
128 let temp_dir = tempfile::Builder::new()
129 .prefix("sqlness")
130 .tempdir()
131 .unwrap();
132 let sqlness_home = temp_dir.into_path();
133
134 let mut interceptor_registry: Registry = Default::default();
135 interceptor_registry.register(
136 protocol_interceptor::PREFIX,
137 Arc::new(protocol_interceptor::ProtocolInterceptorFactory),
138 );
139
140 if let Some(d) = &args.case_dir
141 && !d.is_dir()
142 {
143 panic!("{} is not a directory", d.display());
144 }
145 if args.jobs == 0 {
146 args.jobs = num_cpus::get() / 2;
147 }
148
149 if args.server_addr.server_addr.is_some()
152 || args.setup_etcd
153 || args.setup_pg
154 || args.setup_mysql
155 || args.kafka_wal_broker_endpoints.is_some()
156 || args.test_filter != ".*"
157 {
158 args.jobs = 1;
159 println!(
160 "Normalizing parallelism to 1 due to server addresses, etcd/pg/mysql setup, or test filter usage"
161 );
162 }
163
164 let config = ConfigBuilder::default()
165 .case_dir(util::get_case_dir(args.case_dir))
166 .fail_fast(args.fail_fast)
167 .test_filter(args.test_filter)
168 .follow_links(true)
169 .env_config_file(args.env_config_file)
170 .interceptor_registry(interceptor_registry)
171 .parallelism(args.jobs)
172 .build()
173 .unwrap();
174
175 let wal = match args.wal {
176 Wal::RaftEngine => WalConfig::RaftEngine,
177 Wal::Kafka => WalConfig::Kafka {
178 needs_kafka_cluster: args.kafka_wal_broker_endpoints.is_none(),
179 broker_endpoints: args
180 .kafka_wal_broker_endpoints
181 .map(|s| s.split(',').map(|s| s.to_string()).collect())
182 .unwrap_or(vec!["127.0.0.1:9092".to_string()]),
184 },
185 };
186
187 let store = StoreConfig {
188 store_addrs: args.store_addrs.clone(),
189 setup_etcd: args.setup_etcd,
190 setup_pg: args.setup_pg,
191 setup_mysql: args.setup_mysql,
192 };
193
194 let runner = Runner::new(
195 config,
196 Env::new(
197 sqlness_home.clone(),
198 args.server_addr.clone(),
199 wal,
200 args.pull_version_on_need,
201 args.bins_dir,
202 store,
203 ),
204 );
205 match runner.run().await {
206 Ok(_) => println!("\x1b[32mAll sqlness tests passed!\x1b[0m"),
207 Err(e) => {
208 println!("\x1b[31mTest failed: {}\x1b[0m", e);
209 std::process::exit(1);
210 }
211 }
212
213 if !args.preserve_state {
215 if args.setup_etcd {
216 println!("Stopping etcd");
217 util::stop_rm_etcd();
218 }
219 println!("Removing state in {:?}", sqlness_home);
220 tokio::fs::remove_dir_all(sqlness_home).await.unwrap();
221 }
222}