sqlness_runner/cmd/
bare.rs1use std::path::PathBuf;
16use std::sync::Arc;
17
18use clap::{Parser, ValueEnum};
19use sqlness::interceptor::Registry;
20use sqlness::{ConfigBuilder, Runner};
21
22use crate::cmd::SqlnessConfig;
23use crate::env::bare::{Env, ServiceProvider, StoreConfig, WalConfig};
24use crate::{protocol_interceptor, util};
25
26#[derive(ValueEnum, Debug, Clone)]
27#[clap(rename_all = "snake_case")]
28enum Wal {
29 RaftEngine,
30 Kafka,
31}
32
33#[derive(clap::Args, Debug, Clone, Default)]
35pub(crate) struct ServerAddr {
36 #[clap(short, long)]
38 pub(crate) server_addr: Option<String>,
39
40 #[clap(short, long, requires = "server_addr")]
42 pub(crate) pg_server_addr: Option<String>,
43
44 #[clap(short, long, requires = "server_addr")]
46 pub(crate) mysql_server_addr: Option<String>,
47}
48
49#[derive(Debug, Parser)]
50pub struct BareCommand {
52 #[clap(flatten)]
53 config: SqlnessConfig,
54
55 #[command(flatten)]
57 server_addr: ServerAddr,
58
59 #[clap(short, long, default_value = "raft_engine")]
61 wal: Wal,
62
63 #[clap(short, long)]
66 kafka_wal_broker_endpoints: Option<String>,
67
68 #[clap(long)]
71 bins_dir: Option<PathBuf>,
72
73 #[clap(long)]
76 preserve_state: bool,
77
78 #[clap(long, default_value = "true")]
80 pull_version_on_need: bool,
81
82 #[clap(long)]
84 store_addrs: Vec<String>,
85
86 #[clap(long, default_value = "false")]
88 setup_etcd: bool,
89
90 #[clap(long, default_missing_value = "", num_args(0..=1))]
92 setup_pg: Option<ServiceProvider>,
93
94 #[clap(long, default_missing_value = "", num_args(0..=1))]
96 setup_mysql: Option<ServiceProvider>,
97
98 #[clap(short, long, default_value = "0")]
100 jobs: usize,
101
102 #[clap(long)]
104 extra_args: Vec<String>,
105
106 #[clap(long, default_value = "false")]
108 enable_flat_format: bool,
109}
110
111impl BareCommand {
112 pub async fn run(mut self) {
113 let temp_dir = tempfile::Builder::new()
114 .prefix("sqlness")
115 .tempdir()
116 .unwrap();
117 let sqlness_home = temp_dir.keep();
118
119 let mut interceptor_registry: Registry = Default::default();
120 interceptor_registry.register(
121 protocol_interceptor::PREFIX,
122 Arc::new(protocol_interceptor::ProtocolInterceptorFactory),
123 );
124
125 if let Some(d) = &self.config.case_dir
126 && !d.is_dir()
127 {
128 panic!("{} is not a directory", d.display());
129 }
130 if self.jobs == 0 {
131 self.jobs = num_cpus::get() / 2;
132 }
133
134 if self.server_addr.server_addr.is_some()
137 || self.setup_etcd
138 || self.setup_pg.is_some()
139 || self.setup_mysql.is_some()
140 || self.kafka_wal_broker_endpoints.is_some()
141 || self.config.test_filter != ".*"
142 {
143 self.jobs = 1;
144 println!(
145 "Normalizing parallelism to 1 due to server addresses, etcd/pg/mysql setup, or test filter usage"
146 );
147 }
148
149 let config = ConfigBuilder::default()
150 .case_dir(util::get_case_dir(self.config.case_dir))
151 .fail_fast(self.config.fail_fast)
152 .test_filter(self.config.test_filter)
153 .follow_links(true)
154 .env_config_file(self.config.env_config_file)
155 .interceptor_registry(interceptor_registry)
156 .parallelism(self.jobs)
157 .build()
158 .unwrap();
159
160 let wal = match self.wal {
161 Wal::RaftEngine => WalConfig::RaftEngine,
162 Wal::Kafka => WalConfig::Kafka {
163 needs_kafka_cluster: self.kafka_wal_broker_endpoints.is_none(),
164 broker_endpoints: self
165 .kafka_wal_broker_endpoints
166 .map(|s| s.split(',').map(|s| s.to_string()).collect())
167 .unwrap_or(vec!["127.0.0.1:9092".to_string()]),
169 },
170 };
171
172 let store = StoreConfig {
173 store_addrs: self.store_addrs.clone(),
174 setup_etcd: self.setup_etcd,
175 setup_pg: self.setup_pg,
176 setup_mysql: self.setup_mysql,
177 enable_flat_format: self.enable_flat_format,
178 };
179
180 let runner = Runner::new(
181 config,
182 Env::new(
183 sqlness_home.clone(),
184 self.server_addr,
185 wal,
186 self.pull_version_on_need,
187 self.bins_dir,
188 store,
189 self.extra_args,
190 ),
191 );
192 match runner.run().await {
193 Ok(_) => println!("\x1b[32mAll sqlness tests passed!\x1b[0m"),
194 Err(e) => {
195 println!("\x1b[31mTest failed: {}\x1b[0m", e);
196 std::process::exit(1);
197 }
198 }
199
200 if !self.preserve_state {
202 if self.setup_etcd {
203 println!("Stopping etcd");
204 util::stop_rm_etcd();
205 }
206 println!("Removing state in {:?}", sqlness_home);
208 tokio::fs::remove_dir_all(sqlness_home).await.unwrap();
209 }
210 }
211}