sqlness_runner/
main.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![allow(clippy::print_stdout)]
16
17use std::path::PathBuf;
18use std::sync::Arc;
19
20use clap::{Parser, ValueEnum};
21use env::{Env, WalConfig};
22use sqlness::interceptor::Registry;
23use sqlness::{ConfigBuilder, Runner};
24
25use crate::env::StoreConfig;
26
27mod env;
28mod protocol_interceptor;
29mod server_mode;
30mod util;
31
32#[derive(ValueEnum, Debug, Clone)]
33#[clap(rename_all = "snake_case")]
34enum Wal {
35    RaftEngine,
36    Kafka,
37}
38
39// add a group to ensure that all server addresses are set together
40#[derive(clap::Args, Debug, Clone, Default)]
41#[group(multiple = true, requires_all=["server_addr", "pg_server_addr", "mysql_server_addr"])]
42struct ServerAddr {
43    /// Address of the grpc server.
44    #[clap(short, long)]
45    server_addr: Option<String>,
46
47    /// Address of the postgres server. Must be set if server_addr is set.
48    #[clap(short, long, requires = "server_addr")]
49    pg_server_addr: Option<String>,
50
51    /// Address of the mysql server. Must be set if server_addr is set.
52    #[clap(short, long, requires = "server_addr")]
53    mysql_server_addr: Option<String>,
54}
55
56#[derive(Parser, Debug)]
57#[clap(author, version, about, long_about = None)]
58/// SQL Harness for GrepTimeDB
59struct Args {
60    /// Directory of test cases
61    #[clap(short, long)]
62    case_dir: Option<PathBuf>,
63
64    /// Fail this run as soon as one case fails if true
65    #[arg(short, long, default_value = "false")]
66    fail_fast: bool,
67
68    /// Environment Configuration File
69    #[clap(short, long, default_value = "config.toml")]
70    env_config_file: String,
71
72    /// Name of test cases to run. Accept as a regexp.
73    #[clap(short, long, default_value = ".*")]
74    test_filter: String,
75
76    /// Addresses of the server.
77    #[command(flatten)]
78    server_addr: ServerAddr,
79
80    /// The type of Wal.
81    #[clap(short, long, default_value = "raft_engine")]
82    wal: Wal,
83
84    /// The kafka wal broker endpoints. This config will suppress sqlness runner
85    /// from starting a kafka cluster, and use the given endpoint as kafka backend.
86    #[clap(short, long)]
87    kafka_wal_broker_endpoints: Option<String>,
88
89    /// The path to the directory where GreptimeDB's binaries resides.
90    /// If not set, sqlness will build GreptimeDB on the fly.
91    #[clap(long)]
92    bins_dir: Option<PathBuf>,
93
94    /// Preserve persistent state in the temporary directory.
95    /// This may affect future test runs.
96    #[clap(long)]
97    preserve_state: bool,
98
99    /// Pull Different versions of GreptimeDB on need.
100    #[clap(long, default_value = "true")]
101    pull_version_on_need: bool,
102
103    /// The store addresses for metadata, if empty, will use memory store.
104    #[clap(long)]
105    store_addrs: Vec<String>,
106
107    /// Whether to setup etcd, by default it is false.
108    #[clap(long, default_value = "false")]
109    setup_etcd: bool,
110
111    /// Whether to setup pg, by default it is false.
112    #[clap(long, default_value = "false")]
113    setup_pg: bool,
114
115    /// Whether to setup mysql, by default it is false.
116    #[clap(long, default_value = "false")]
117    setup_mysql: bool,
118
119    /// The number of jobs to run in parallel. Default to half of the cores.
120    #[clap(short, long, default_value = "0")]
121    jobs: usize,
122}
123
124#[tokio::main]
125async fn main() {
126    let mut args = Args::parse();
127
128    let temp_dir = tempfile::Builder::new()
129        .prefix("sqlness")
130        .tempdir()
131        .unwrap();
132    let sqlness_home = temp_dir.into_path();
133
134    let mut interceptor_registry: Registry = Default::default();
135    interceptor_registry.register(
136        protocol_interceptor::PREFIX,
137        Arc::new(protocol_interceptor::ProtocolInterceptorFactory),
138    );
139
140    if let Some(d) = &args.case_dir {
141        if !d.is_dir() {
142            panic!("{} is not a directory", d.display());
143        }
144    }
145    if args.jobs == 0 {
146        args.jobs = num_cpus::get() / 2;
147    }
148
149    // normalize parallelism to 1 if any of the following conditions are met:
150    // Note: parallelism in pg and mysql is possible, but need configuration.
151    if args.server_addr.server_addr.is_some()
152        || args.setup_etcd
153        || args.setup_pg
154        || args.setup_mysql
155        || args.kafka_wal_broker_endpoints.is_some()
156    {
157        args.jobs = 1;
158        println!("Normalizing parallelism to 1 due to server addresses or etcd/pg/mysql setup");
159    }
160
161    let config = ConfigBuilder::default()
162        .case_dir(util::get_case_dir(args.case_dir))
163        .fail_fast(args.fail_fast)
164        .test_filter(args.test_filter)
165        .follow_links(true)
166        .env_config_file(args.env_config_file)
167        .interceptor_registry(interceptor_registry)
168        .parallelism(args.jobs)
169        .build()
170        .unwrap();
171
172    let wal = match args.wal {
173        Wal::RaftEngine => WalConfig::RaftEngine,
174        Wal::Kafka => WalConfig::Kafka {
175            needs_kafka_cluster: args.kafka_wal_broker_endpoints.is_none(),
176            broker_endpoints: args
177                .kafka_wal_broker_endpoints
178                .map(|s| s.split(',').map(|s| s.to_string()).collect())
179                // otherwise default to the same port in `kafka-cluster.yml`
180                .unwrap_or(vec!["127.0.0.1:9092".to_string()]),
181        },
182    };
183
184    let store = StoreConfig {
185        store_addrs: args.store_addrs.clone(),
186        setup_etcd: args.setup_etcd,
187        setup_pg: args.setup_pg,
188        setup_mysql: args.setup_mysql,
189    };
190
191    let runner = Runner::new(
192        config,
193        Env::new(
194            sqlness_home.clone(),
195            args.server_addr.clone(),
196            wal,
197            args.pull_version_on_need,
198            args.bins_dir,
199            store,
200        ),
201    );
202    match runner.run().await {
203        Ok(_) => println!("\x1b[32mAll sqlness tests passed!\x1b[0m"),
204        Err(e) => {
205            println!("\x1b[31mTest failed: {}\x1b[0m", e);
206            std::process::exit(1);
207        }
208    }
209
210    // clean up and exit
211    if !args.preserve_state {
212        if args.setup_etcd {
213            println!("Stopping etcd");
214            util::stop_rm_etcd();
215        }
216        println!("Removing state in {:?}", sqlness_home);
217        tokio::fs::remove_dir_all(sqlness_home).await.unwrap();
218    }
219}