#![allow(clippy::print_stdout)]
use std::path::PathBuf;
use std::sync::Arc;
use clap::{Parser, ValueEnum};
use env::{Env, WalConfig};
use sqlness::interceptor::Registry;
use sqlness::{ConfigBuilder, Runner};
use crate::env::StoreConfig;
mod env;
mod protocol_interceptor;
mod server_mode;
mod util;
#[derive(ValueEnum, Debug, Clone)]
#[clap(rename_all = "snake_case")]
enum Wal {
RaftEngine,
Kafka,
}
#[derive(clap::Args, Debug, Clone, Default)]
#[group(multiple = true, requires_all=["server_addr", "pg_server_addr", "mysql_server_addr"])]
struct ServerAddr {
#[clap(short, long)]
server_addr: Option<String>,
#[clap(short, long, requires = "server_addr")]
pg_server_addr: Option<String>,
#[clap(short, long, requires = "server_addr")]
mysql_server_addr: Option<String>,
}
#[derive(Parser, Debug)]
#[clap(author, version, about, long_about = None)]
struct Args {
#[clap(short, long)]
case_dir: Option<PathBuf>,
#[arg(short, long, default_value = "false")]
fail_fast: bool,
#[clap(short, long, default_value = "config.toml")]
env_config_file: String,
#[clap(short, long, default_value = ".*")]
test_filter: String,
#[command(flatten)]
server_addr: ServerAddr,
#[clap(short, long, default_value = "raft_engine")]
wal: Wal,
#[clap(short, long)]
kafka_wal_broker_endpoints: Option<String>,
#[clap(long)]
bins_dir: Option<PathBuf>,
#[clap(long)]
preserve_state: bool,
#[clap(long, default_value = "true")]
pull_version_on_need: bool,
#[clap(long)]
store_addrs: Vec<String>,
#[clap(long, default_value = "false")]
setup_etcd: bool,
#[clap(long, default_value = "false")]
setup_pg: bool,
#[clap(long, default_value = "false")]
setup_mysql: bool,
#[clap(short, long, default_value = "0")]
jobs: usize,
}
#[tokio::main]
async fn main() {
let mut args = Args::parse();
let temp_dir = tempfile::Builder::new()
.prefix("sqlness")
.tempdir()
.unwrap();
let sqlness_home = temp_dir.into_path();
let mut interceptor_registry: Registry = Default::default();
interceptor_registry.register(
protocol_interceptor::PREFIX,
Arc::new(protocol_interceptor::ProtocolInterceptorFactory),
);
if let Some(d) = &args.case_dir {
if !d.is_dir() {
panic!("{} is not a directory", d.display());
}
}
if args.jobs == 0 {
args.jobs = num_cpus::get() / 2;
}
if args.server_addr.server_addr.is_some()
|| args.setup_etcd
|| args.setup_pg
|| args.setup_mysql
|| args.kafka_wal_broker_endpoints.is_some()
{
args.jobs = 1;
println!("Normalizing parallelism to 1 due to server addresses or etcd/pg/mysql setup");
}
let config = ConfigBuilder::default()
.case_dir(util::get_case_dir(args.case_dir))
.fail_fast(args.fail_fast)
.test_filter(args.test_filter)
.follow_links(true)
.env_config_file(args.env_config_file)
.interceptor_registry(interceptor_registry)
.parallelism(args.jobs)
.build()
.unwrap();
let wal = match args.wal {
Wal::RaftEngine => WalConfig::RaftEngine,
Wal::Kafka => WalConfig::Kafka {
needs_kafka_cluster: args.kafka_wal_broker_endpoints.is_none(),
broker_endpoints: args
.kafka_wal_broker_endpoints
.map(|s| s.split(',').map(|s| s.to_string()).collect())
.unwrap_or(vec!["127.0.0.1:9092".to_string()]),
},
};
let store = StoreConfig {
store_addrs: args.store_addrs.clone(),
setup_etcd: args.setup_etcd,
setup_pg: args.setup_pg,
setup_mysql: args.setup_mysql,
};
let runner = Runner::new(
config,
Env::new(
sqlness_home.clone(),
args.server_addr.clone(),
wal,
args.pull_version_on_need,
args.bins_dir,
store,
),
);
match runner.run().await {
Ok(_) => println!("\x1b[32mAll sqlness tests passed!\x1b[0m"),
Err(e) => {
println!("\x1b[31mTest failed: {}\x1b[0m", e);
std::process::exit(1);
}
}
if !args.preserve_state {
if args.setup_etcd {
println!("Stopping etcd");
util::stop_rm_etcd();
}
println!("Removing state in {:?}", sqlness_home);
tokio::fs::remove_dir_all(sqlness_home).await.unwrap();
}
}