1#![allow(clippy::print_stdout)]
16
17use std::path::PathBuf;
18use std::sync::Arc;
19
20use clap::{Parser, ValueEnum};
21use env::{Env, WalConfig};
22use sqlness::interceptor::Registry;
23use sqlness::{ConfigBuilder, Runner};
24
25use crate::env::StoreConfig;
26
27mod env;
28mod protocol_interceptor;
29mod server_mode;
30mod util;
31
32#[derive(ValueEnum, Debug, Clone)]
33#[clap(rename_all = "snake_case")]
34enum Wal {
35 RaftEngine,
36 Kafka,
37}
38
39#[derive(clap::Args, Debug, Clone, Default)]
41#[group(multiple = true, requires_all=["server_addr", "pg_server_addr", "mysql_server_addr"])]
42struct ServerAddr {
43 #[clap(short, long)]
45 server_addr: Option<String>,
46
47 #[clap(short, long, requires = "server_addr")]
49 pg_server_addr: Option<String>,
50
51 #[clap(short, long, requires = "server_addr")]
53 mysql_server_addr: Option<String>,
54}
55
56#[derive(Parser, Debug)]
57#[clap(author, version, about, long_about = None)]
58struct Args {
60 #[clap(short, long)]
62 case_dir: Option<PathBuf>,
63
64 #[arg(short, long, default_value = "false")]
66 fail_fast: bool,
67
68 #[clap(short, long, default_value = "config.toml")]
70 env_config_file: String,
71
72 #[clap(short, long, default_value = ".*")]
74 test_filter: String,
75
76 #[command(flatten)]
78 server_addr: ServerAddr,
79
80 #[clap(short, long, default_value = "raft_engine")]
82 wal: Wal,
83
84 #[clap(short, long)]
87 kafka_wal_broker_endpoints: Option<String>,
88
89 #[clap(long)]
92 bins_dir: Option<PathBuf>,
93
94 #[clap(long)]
97 preserve_state: bool,
98
99 #[clap(long, default_value = "true")]
101 pull_version_on_need: bool,
102
103 #[clap(long)]
105 store_addrs: Vec<String>,
106
107 #[clap(long, default_value = "false")]
109 setup_etcd: bool,
110
111 #[clap(long, default_value = "false")]
113 setup_pg: bool,
114
115 #[clap(long, default_value = "false")]
117 setup_mysql: bool,
118
119 #[clap(short, long, default_value = "0")]
121 jobs: usize,
122}
123
124#[tokio::main]
125async fn main() {
126 let mut args = Args::parse();
127
128 let temp_dir = tempfile::Builder::new()
129 .prefix("sqlness")
130 .tempdir()
131 .unwrap();
132 let sqlness_home = temp_dir.into_path();
133
134 let mut interceptor_registry: Registry = Default::default();
135 interceptor_registry.register(
136 protocol_interceptor::PREFIX,
137 Arc::new(protocol_interceptor::ProtocolInterceptorFactory),
138 );
139
140 if let Some(d) = &args.case_dir {
141 if !d.is_dir() {
142 panic!("{} is not a directory", d.display());
143 }
144 }
145 if args.jobs == 0 {
146 args.jobs = num_cpus::get() / 2;
147 }
148
149 if args.server_addr.server_addr.is_some()
152 || args.setup_etcd
153 || args.setup_pg
154 || args.setup_mysql
155 || args.kafka_wal_broker_endpoints.is_some()
156 {
157 args.jobs = 1;
158 println!("Normalizing parallelism to 1 due to server addresses or etcd/pg/mysql setup");
159 }
160
161 let config = ConfigBuilder::default()
162 .case_dir(util::get_case_dir(args.case_dir))
163 .fail_fast(args.fail_fast)
164 .test_filter(args.test_filter)
165 .follow_links(true)
166 .env_config_file(args.env_config_file)
167 .interceptor_registry(interceptor_registry)
168 .parallelism(args.jobs)
169 .build()
170 .unwrap();
171
172 let wal = match args.wal {
173 Wal::RaftEngine => WalConfig::RaftEngine,
174 Wal::Kafka => WalConfig::Kafka {
175 needs_kafka_cluster: args.kafka_wal_broker_endpoints.is_none(),
176 broker_endpoints: args
177 .kafka_wal_broker_endpoints
178 .map(|s| s.split(',').map(|s| s.to_string()).collect())
179 .unwrap_or(vec!["127.0.0.1:9092".to_string()]),
181 },
182 };
183
184 let store = StoreConfig {
185 store_addrs: args.store_addrs.clone(),
186 setup_etcd: args.setup_etcd,
187 setup_pg: args.setup_pg,
188 setup_mysql: args.setup_mysql,
189 };
190
191 let runner = Runner::new(
192 config,
193 Env::new(
194 sqlness_home.clone(),
195 args.server_addr.clone(),
196 wal,
197 args.pull_version_on_need,
198 args.bins_dir,
199 store,
200 ),
201 );
202 match runner.run().await {
203 Ok(_) => println!("\x1b[32mAll sqlness tests passed!\x1b[0m"),
204 Err(e) => {
205 println!("\x1b[31mTest failed: {}\x1b[0m", e);
206 std::process::exit(1);
207 }
208 }
209
210 if !args.preserve_state {
212 if args.setup_etcd {
213 println!("Stopping etcd");
214 util::stop_rm_etcd();
215 }
216 println!("Removing state in {:?}", sqlness_home);
217 tokio::fs::remove_dir_all(sqlness_home).await.unwrap();
218 }
219}