sqlness_runner/
util.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::io::Read;
16use std::net::SocketAddr;
17use std::path::{Path, PathBuf};
18use std::process::Command;
19use std::time::Duration;
20
21use sha2::{Digest, Sha256};
22use tokio::io::AsyncWriteExt;
23use tokio::net::TcpSocket;
24use tokio::time;
25use tokio_stream::StreamExt;
26
27/// Check port every 0.1 second.
28const PORT_CHECK_INTERVAL: Duration = Duration::from_millis(100);
29
30pub const PROGRAM: &str = "greptime";
31
32fn http_proxy() -> Option<String> {
33    for proxy in ["http_proxy", "HTTP_PROXY", "all_proxy", "ALL_PROXY"] {
34        if let Ok(proxy_addr) = std::env::var(proxy) {
35            println!("Getting Proxy from env var: {}={}", proxy, proxy_addr);
36            return Some(proxy_addr);
37        }
38    }
39    None
40}
41
42fn https_proxy() -> Option<String> {
43    for proxy in ["https_proxy", "HTTPS_PROXY", "all_proxy", "ALL_PROXY"] {
44        if let Ok(proxy_addr) = std::env::var(proxy) {
45            println!("Getting Proxy from env var: {}={}", proxy, proxy_addr);
46            return Some(proxy_addr);
47        }
48    }
49    None
50}
51
52async fn download_files(url: &str, path: &str) {
53    let proxy = if url.starts_with("http://") {
54        http_proxy().map(|proxy| reqwest::Proxy::http(proxy).unwrap())
55    } else if url.starts_with("https://") {
56        https_proxy().map(|proxy| reqwest::Proxy::https(proxy).unwrap())
57    } else {
58        None
59    };
60
61    let client = proxy
62        .map(|proxy| {
63            reqwest::Client::builder()
64                .proxy(proxy)
65                .build()
66                .expect("Failed to build client")
67        })
68        .unwrap_or(reqwest::Client::new());
69
70    let mut file = tokio::fs::File::create(path)
71        .await
72        .unwrap_or_else(|_| panic!("Failed to create file in {path}"));
73    println!("Downloading {}...", url);
74
75    let resp = client
76        .get(url)
77        .send()
78        .await
79        .expect("Failed to send download request");
80    let len = resp.content_length();
81    let mut stream = resp.bytes_stream();
82    let mut size_downloaded = 0;
83
84    while let Some(chunk_result) = stream.next().await {
85        let chunk = chunk_result.unwrap();
86        size_downloaded += chunk.len();
87        if let Some(len) = len {
88            print!("\rDownloading {}/{} bytes", size_downloaded, len);
89        } else {
90            print!("\rDownloaded {} bytes", size_downloaded);
91        }
92
93        file.write_all(&chunk).await.unwrap();
94    }
95
96    file.flush().await.unwrap();
97
98    println!("\nDownloaded {}", url);
99}
100
101fn decompress(archive: &str, dest: &str) {
102    let tar = std::fs::File::open(archive).unwrap();
103    let dec = flate2::read::GzDecoder::new(tar);
104    let mut a = tar::Archive::new(dec);
105    a.unpack(dest).unwrap();
106}
107
108/// Use curl to download the binary from the release page.
109///
110/// # Arguments
111///
112/// * `version` - The version of the binary to download. i.e. "v0.9.5"
113pub async fn pull_binary(version: &str) {
114    let os = std::env::consts::OS;
115    let arch = match std::env::consts::ARCH {
116        "x86_64" => "amd64",
117        "aarch64" => "arm64",
118        _ => panic!("Unsupported arch: {}", std::env::consts::ARCH),
119    };
120    let triple = format!("greptime-{}-{}-{}", os, arch, version);
121    let filename = format!("{triple}.tar.gz");
122
123    let url = format!(
124        "https://github.com/GreptimeTeam/greptimedb/releases/download/{version}/{filename}"
125    );
126    println!("Downloading {version} binary from {}", url);
127
128    // mkdir {version}
129    let _ = std::fs::create_dir(version);
130
131    let archive = Path::new(version).join(filename);
132    let folder_path = Path::new(version);
133
134    // download the binary to the version directory
135    download_files(&url, &archive.to_string_lossy()).await;
136
137    let checksum_file = format!("{triple}.sha256sum");
138    let checksum_url = format!(
139        "https://github.com/GreptimeTeam/greptimedb/releases/download/{version}/{checksum_file}"
140    );
141    download_files(
142        &checksum_url,
143        &PathBuf::from_iter([version, &checksum_file]).to_string_lossy(),
144    )
145    .await;
146
147    // verify the checksum
148    let mut file = std::fs::File::open(&archive).unwrap();
149    let mut sha256 = Sha256::new();
150    std::io::copy(&mut file, &mut sha256).unwrap();
151    let checksum: Vec<u8> = sha256.finalize().to_vec();
152
153    let mut expected_checksum =
154        std::fs::File::open(PathBuf::from_iter([version, &checksum_file])).unwrap();
155    let mut buf = String::new();
156    expected_checksum.read_to_string(&mut buf).unwrap();
157    let expected_checksum = hex::decode(buf.lines().next().unwrap()).unwrap();
158
159    assert_eq!(
160        checksum, expected_checksum,
161        "Checksum mismatched, downloaded file is corrupted"
162    );
163
164    decompress(&archive.to_string_lossy(), &folder_path.to_string_lossy());
165    println!("Downloaded and extracted {version} binary to {folder_path:?}");
166
167    // move the binary to the version directory
168    std::fs::rename(
169        PathBuf::from_iter([version, &triple, "greptime"]),
170        PathBuf::from_iter([version, "greptime"]),
171    )
172    .unwrap();
173
174    // remove the archive and inner folder
175    std::fs::remove_file(&archive).unwrap();
176    std::fs::remove_dir(PathBuf::from_iter([version, &triple])).unwrap();
177}
178
179/// Pull the binary if it does not exist and `pull_version_on_need` is true.
180pub async fn maybe_pull_binary(version: &str, pull_version_on_need: bool) {
181    let exist = Path::new(version).join(PROGRAM).is_file();
182    match (exist, pull_version_on_need) {
183        (true, _) => println!("Binary {version} exists"),
184        (false, false) => panic!(
185            "Binary {version} does not exist, please run with --pull-version-on-need or manually download it"
186        ),
187        (false, true) => {
188            pull_binary(version).await;
189        }
190    }
191}
192
193/// Set up a standalone etcd in docker.
194pub fn setup_etcd(client_ports: Vec<u16>, peer_port: Option<u16>, etcd_version: Option<&str>) {
195    if std::process::Command::new("docker")
196        .args(["-v"])
197        .status()
198        .is_err()
199    {
200        panic!("Docker is not installed");
201    }
202    let peer_port = peer_port.unwrap_or(2380);
203    let exposed_port: Vec<_> = client_ports.iter().chain(Some(&peer_port)).collect();
204    let exposed_port_str = exposed_port
205        .iter()
206        .flat_map(|p| ["-p".to_string(), format!("{p}:{p}")])
207        .collect::<Vec<_>>();
208    let etcd_version = etcd_version.unwrap_or("v3.5.17");
209    let etcd_image = format!("quay.io/coreos/etcd:{etcd_version}");
210    let peer_url = format!("http://0.0.0.0:{peer_port}");
211    let my_local_ip = local_ip_address::local_ip().unwrap();
212
213    let my_local_ip_str = my_local_ip.to_string();
214
215    let mut arg_list = vec![];
216    arg_list.extend([
217        "run",
218        "-d",
219        "-v",
220        "/usr/share/ca-certificates/:/etc/ssl/certs",
221    ]);
222    arg_list.extend(exposed_port_str.iter().map(std::ops::Deref::deref));
223    arg_list.extend([
224        "--name",
225        "etcd",
226        &etcd_image,
227        "etcd",
228        "-name",
229        "etcd0",
230        "-advertise-client-urls",
231    ]);
232
233    let adv_client_urls = client_ports
234        .iter()
235        .map(|p| format!("http://{my_local_ip_str}:{p}"))
236        .collect::<Vec<_>>()
237        .join(",");
238
239    arg_list.push(&adv_client_urls);
240
241    arg_list.extend(["-listen-client-urls"]);
242
243    let client_ports_fmt = client_ports
244        .iter()
245        .map(|p| format!("http://0.0.0.0:{p}"))
246        .collect::<Vec<_>>()
247        .join(",");
248
249    arg_list.push(&client_ports_fmt);
250
251    arg_list.push("-initial-advertise-peer-urls");
252    let advertise_peer_url = format!("http://{my_local_ip_str}:{peer_port}");
253    arg_list.push(&advertise_peer_url);
254
255    arg_list.extend(["-listen-peer-urls", &peer_url]);
256
257    arg_list.extend(["-initial-cluster-token", "etcd-cluster-1"]);
258
259    arg_list.push("-initial-cluster");
260
261    let init_cluster_url = format!("etcd0=http://{my_local_ip_str}:{peer_port}");
262
263    arg_list.push(&init_cluster_url);
264
265    arg_list.extend(["-initial-cluster-state", "new"]);
266
267    let mut cmd = std::process::Command::new("docker");
268
269    cmd.args(arg_list);
270
271    println!("Starting etcd with command: {:?}", cmd);
272
273    let status = cmd.status();
274    if status.is_err() {
275        panic!("Failed to start etcd: {:?}", status);
276    } else if let Ok(status) = status {
277        if status.success() {
278            println!(
279                "Started etcd with client ports {:?} and peer port {}, statues:{status:?}",
280                client_ports, peer_port
281            );
282        } else {
283            panic!("Failed to start etcd: {:?}", status);
284        }
285    }
286}
287
288/// Stop and remove the etcd container
289pub fn stop_rm_etcd() {
290    let status = std::process::Command::new("docker")
291        .args(["container", "stop", "etcd"])
292        .status();
293    if status.is_err() {
294        panic!("Failed to stop etcd: {:?}", status);
295    } else {
296        println!("Stopped etcd");
297    }
298    // rm the container
299    let status = std::process::Command::new("docker")
300        .args(["container", "rm", "etcd"])
301        .status();
302    if status.is_err() {
303        panic!("Failed to remove etcd container: {:?}", status);
304    } else {
305        println!("Removed etcd container");
306    }
307}
308
309/// Set up a PostgreSQL server in docker.
310pub fn setup_pg(pg_port: u16, pg_version: Option<&str>) {
311    if std::process::Command::new("docker")
312        .args(["-v"])
313        .status()
314        .is_err()
315    {
316        panic!("Docker is not installed");
317    }
318
319    let pg_image = if let Some(pg_version) = pg_version {
320        format!("postgres:{pg_version}")
321    } else {
322        "postgres:latest".to_string()
323    };
324    let pg_password = "admin";
325    let pg_user = "greptimedb";
326
327    let mut arg_list = vec![];
328    arg_list.extend(["run", "-d"]);
329
330    let pg_password_env = format!("POSTGRES_PASSWORD={pg_password}");
331    let pg_user_env = format!("POSTGRES_USER={pg_user}");
332    let pg_port_forward = format!("{pg_port}:5432");
333    arg_list.extend(["-e", &pg_password_env, "-e", &pg_user_env]);
334    arg_list.extend(["-p", &pg_port_forward]);
335
336    arg_list.extend(["--name", "greptimedb_pg", &pg_image]);
337
338    let mut cmd = std::process::Command::new("docker");
339
340    cmd.args(arg_list);
341
342    println!("Starting PostgreSQL with command: {:?}", cmd);
343
344    let status = cmd.status();
345    if status.is_err() {
346        panic!("Failed to start PostgreSQL: {:?}", status);
347    } else if let Ok(status) = status {
348        if status.success() {
349            println!("Started PostgreSQL with port {}", pg_port);
350        } else {
351            panic!("Failed to start PostgreSQL: {:?}", status);
352        }
353    }
354}
355
356/// Set up a MySql server in docker.
357pub fn setup_mysql(mysql_port: u16, mysql_version: Option<&str>) {
358    if std::process::Command::new("docker")
359        .args(["-v"])
360        .status()
361        .is_err()
362    {
363        panic!("Docker is not installed");
364    }
365
366    let mysql_image = if let Some(mysql_version) = mysql_version {
367        format!("greptime/mysql:{mysql_version}")
368    } else {
369        "greptime/mysql:5.7".to_string()
370    };
371    let mysql_password = "admin";
372    let mysql_user = "greptimedb";
373
374    let mut arg_list = vec![];
375    arg_list.extend(["run", "-d"]);
376
377    let mysql_password_env = format!("MYSQL_PASSWORD={mysql_password}");
378    let mysql_user_env = format!("MYSQL_USER={mysql_user}");
379    let mysql_root_password_env = format!("MYSQL_ROOT_PASSWORD={mysql_password}");
380    let mysql_port_forward = format!("{mysql_port}:3306");
381    arg_list.extend([
382        "-e",
383        &mysql_password_env,
384        "-e",
385        &mysql_user_env,
386        "-e",
387        &mysql_root_password_env,
388        "-e",
389        "MYSQL_DATABASE=mysql",
390    ]);
391    arg_list.extend(["-p", &mysql_port_forward]);
392
393    arg_list.extend(["--name", "greptimedb_mysql", &mysql_image]);
394
395    let mut cmd = std::process::Command::new("docker");
396
397    cmd.args(arg_list);
398
399    println!("Starting MySQL with command: {:?}", cmd);
400
401    let status = cmd.status();
402    if status.is_err() {
403        panic!("Failed to start MySQL: {:?}", status);
404    } else if let Ok(status) = status {
405        if status.success() {
406            println!("Started MySQL with port {}", mysql_port);
407        } else {
408            panic!("Failed to start MySQL: {:?}", status);
409        }
410    }
411}
412
413/// Get the dir of test cases. This function only works when the runner is run
414/// under the project's dir because it depends on some envs set by cargo.
415pub fn get_case_dir(case_dir: Option<PathBuf>) -> String {
416    let runner_path = match case_dir {
417        Some(path) => path,
418        None => {
419            // retrieve the manifest runner (./tests/runner)
420            let mut runner_crate_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
421            // change directory to cases' dir from runner's (should be runner/../cases)
422            let _ = runner_crate_path.pop();
423            runner_crate_path.push("cases");
424            runner_crate_path
425        }
426    };
427
428    runner_path.into_os_string().into_string().unwrap()
429}
430
431/// Get the dir that contains workspace manifest (the top-level Cargo.toml).
432pub fn get_workspace_root() -> String {
433    // retrieve the manifest runner (./tests/runner)
434    let mut runner_crate_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
435
436    // change directory to workspace's root (runner/../..)
437    let _ = runner_crate_path.pop();
438    let _ = runner_crate_path.pop();
439
440    runner_crate_path.into_os_string().into_string().unwrap()
441}
442
443pub fn get_binary_dir(mode: &str) -> PathBuf {
444    // first go to the workspace root.
445    let mut workspace_root = PathBuf::from(get_workspace_root());
446
447    // change directory to target dir (workspace/target/<build mode>/)
448    workspace_root.push("target");
449    workspace_root.push(mode);
450
451    workspace_root
452}
453
454/// Spin-waiting a socket address is available, or timeout.
455/// Returns whether the addr is up.
456pub async fn check_port(ip_addr: SocketAddr, timeout: Duration) -> bool {
457    let check_task = async {
458        loop {
459            let socket = TcpSocket::new_v4().expect("Cannot create v4 socket");
460            match socket.connect(ip_addr).await {
461                Ok(mut stream) => {
462                    let _ = stream.shutdown().await;
463                    break;
464                }
465                Err(_) => time::sleep(PORT_CHECK_INTERVAL).await,
466            }
467        }
468    };
469
470    tokio::time::timeout(timeout, check_task).await.is_ok()
471}
472
473/// Get the path of sqlness config dir `tests/conf`.
474pub fn sqlness_conf_path() -> PathBuf {
475    let mut sqlness_root_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
476    sqlness_root_path.pop();
477    sqlness_root_path.push("conf");
478    sqlness_root_path
479}
480
481/// Start kafka cluster if needed. Config file is `conf/kafka-cluster.yml`.
482///
483/// ```shell
484/// docker compose -f kafka-cluster.yml up kafka -d --wait
485/// ```
486pub fn setup_wal() {
487    let mut sqlness_root_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
488    sqlness_root_path.pop();
489    sqlness_root_path.push("conf");
490
491    Command::new("docker")
492        .current_dir(sqlness_conf_path())
493        .args([
494            "compose",
495            "-f",
496            "kafka-cluster.yml",
497            "up",
498            "kafka",
499            "-d",
500            "--wait",
501        ])
502        .output()
503        .expect("Failed to start kafka cluster");
504
505    println!("kafka cluster is up");
506}
507
508/// Stop kafka cluster if needed. Config file is `conf/kafka-cluster.yml`.
509///
510/// ```shell
511/// docker compose -f docker-compose-standalone.yml down kafka
512/// ```
513pub fn teardown_wal() {
514    let mut sqlness_root_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
515    sqlness_root_path.pop();
516    sqlness_root_path.push("conf");
517
518    Command::new("docker")
519        .current_dir(sqlness_conf_path())
520        .args(["compose", "-f", "kafka-cluster.yml", "down", "kafka"])
521        .output()
522        .expect("Failed to stop kafka cluster");
523
524    println!("kafka cluster is down");
525}
526
527/// Get a random available port by binding to port 0
528pub fn get_random_port() -> u16 {
529    use std::net::TcpListener;
530    let listener = TcpListener::bind("127.0.0.1:0").expect("Failed to bind to random port");
531    listener
532        .local_addr()
533        .expect("Failed to get local address")
534        .port()
535}