sqlness_runner/
util.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::io::Read;
16use std::net::SocketAddr;
17use std::path::{Path, PathBuf};
18use std::process::Command;
19use std::time::Duration;
20
21use sha2::{Digest, Sha256};
22use tokio::io::AsyncWriteExt;
23use tokio::net::TcpSocket;
24use tokio::time;
25use tokio_stream::StreamExt;
26
27/// Check port every 0.1 second.
28const PORT_CHECK_INTERVAL: Duration = Duration::from_millis(100);
29
30#[cfg(not(windows))]
31pub const PROGRAM: &str = "./greptime";
32#[cfg(windows)]
33pub const PROGRAM: &str = "greptime.exe";
34
35fn http_proxy() -> Option<String> {
36    for proxy in ["http_proxy", "HTTP_PROXY", "all_proxy", "ALL_PROXY"] {
37        if let Ok(proxy_addr) = std::env::var(proxy) {
38            println!("Getting Proxy from env var: {}={}", proxy, proxy_addr);
39            return Some(proxy_addr);
40        }
41    }
42    None
43}
44
45fn https_proxy() -> Option<String> {
46    for proxy in ["https_proxy", "HTTPS_PROXY", "all_proxy", "ALL_PROXY"] {
47        if let Ok(proxy_addr) = std::env::var(proxy) {
48            println!("Getting Proxy from env var: {}={}", proxy, proxy_addr);
49            return Some(proxy_addr);
50        }
51    }
52    None
53}
54
55async fn download_files(url: &str, path: &str) {
56    let proxy = if url.starts_with("http://") {
57        http_proxy().map(|proxy| reqwest::Proxy::http(proxy).unwrap())
58    } else if url.starts_with("https://") {
59        https_proxy().map(|proxy| reqwest::Proxy::https(proxy).unwrap())
60    } else {
61        None
62    };
63
64    let client = proxy
65        .map(|proxy| {
66            reqwest::Client::builder()
67                .proxy(proxy)
68                .build()
69                .expect("Failed to build client")
70        })
71        .unwrap_or(reqwest::Client::new());
72
73    let mut file = tokio::fs::File::create(path)
74        .await
75        .unwrap_or_else(|_| panic!("Failed to create file in {path}"));
76    println!("Downloading {}...", url);
77
78    let resp = client
79        .get(url)
80        .send()
81        .await
82        .expect("Failed to send download request");
83    let len = resp.content_length();
84    let mut stream = resp.bytes_stream();
85    let mut size_downloaded = 0;
86
87    while let Some(chunk_result) = stream.next().await {
88        let chunk = chunk_result.unwrap();
89        size_downloaded += chunk.len();
90        if let Some(len) = len {
91            print!("\rDownloading {}/{} bytes", size_downloaded, len);
92        } else {
93            print!("\rDownloaded {} bytes", size_downloaded);
94        }
95
96        file.write_all(&chunk).await.unwrap();
97    }
98
99    file.flush().await.unwrap();
100
101    println!("\nDownloaded {}", url);
102}
103
104fn decompress(archive: &str, dest: &str) {
105    let tar = std::fs::File::open(archive).unwrap();
106    let dec = flate2::read::GzDecoder::new(tar);
107    let mut a = tar::Archive::new(dec);
108    a.unpack(dest).unwrap();
109}
110
111/// Use curl to download the binary from the release page.
112///
113/// # Arguments
114///
115/// * `version` - The version of the binary to download. i.e. "v0.9.5"
116pub async fn pull_binary(version: &str) {
117    let os = std::env::consts::OS;
118    let arch = match std::env::consts::ARCH {
119        "x86_64" => "amd64",
120        "aarch64" => "arm64",
121        _ => panic!("Unsupported arch: {}", std::env::consts::ARCH),
122    };
123    let triple = format!("greptime-{}-{}-{}", os, arch, version);
124    let filename = format!("{triple}.tar.gz");
125
126    let url = format!(
127        "https://github.com/GreptimeTeam/greptimedb/releases/download/{version}/{filename}"
128    );
129    println!("Downloading {version} binary from {}", url);
130
131    // mkdir {version}
132    let _ = std::fs::create_dir(version);
133
134    let archive = Path::new(version).join(filename);
135    let folder_path = Path::new(version);
136
137    // download the binary to the version directory
138    download_files(&url, &archive.to_string_lossy()).await;
139
140    let checksum_file = format!("{triple}.sha256sum");
141    let checksum_url = format!(
142        "https://github.com/GreptimeTeam/greptimedb/releases/download/{version}/{checksum_file}"
143    );
144    download_files(
145        &checksum_url,
146        &PathBuf::from_iter([version, &checksum_file]).to_string_lossy(),
147    )
148    .await;
149
150    // verify the checksum
151    let mut file = std::fs::File::open(&archive).unwrap();
152    let mut sha256 = Sha256::new();
153    std::io::copy(&mut file, &mut sha256).unwrap();
154    let checksum: Vec<u8> = sha256.finalize().to_vec();
155
156    let mut expected_checksum =
157        std::fs::File::open(PathBuf::from_iter([version, &checksum_file])).unwrap();
158    let mut buf = String::new();
159    expected_checksum.read_to_string(&mut buf).unwrap();
160    let expected_checksum = hex::decode(buf.lines().next().unwrap()).unwrap();
161
162    assert_eq!(
163        checksum, expected_checksum,
164        "Checksum mismatched, downloaded file is corrupted"
165    );
166
167    decompress(&archive.to_string_lossy(), &folder_path.to_string_lossy());
168    println!("Downloaded and extracted {version} binary to {folder_path:?}");
169
170    // move the binary to the version directory
171    std::fs::rename(
172        PathBuf::from_iter([version, &triple, "greptime"]),
173        PathBuf::from_iter([version, "greptime"]),
174    )
175    .unwrap();
176
177    // remove the archive and inner folder
178    std::fs::remove_file(&archive).unwrap();
179    std::fs::remove_dir(PathBuf::from_iter([version, &triple])).unwrap();
180}
181
182/// Pull the binary if it does not exist and `pull_version_on_need` is true.
183pub async fn maybe_pull_binary(version: &str, pull_version_on_need: bool) {
184    let exist = Path::new(version).join(PROGRAM).is_file();
185    match (exist, pull_version_on_need){
186        (true, _) => println!("Binary {version} exists"),
187        (false, false) => panic!("Binary {version} does not exist, please run with --pull-version-on-need or manually download it"),
188        (false, true) => { pull_binary(version).await; },
189    }
190}
191
192/// Set up a standalone etcd in docker.
193pub fn setup_etcd(client_ports: Vec<u16>, peer_port: Option<u16>, etcd_version: Option<&str>) {
194    if std::process::Command::new("docker")
195        .args(["-v"])
196        .status()
197        .is_err()
198    {
199        panic!("Docker is not installed");
200    }
201    let peer_port = peer_port.unwrap_or(2380);
202    let exposed_port: Vec<_> = client_ports.iter().chain(Some(&peer_port)).collect();
203    let exposed_port_str = exposed_port
204        .iter()
205        .flat_map(|p| ["-p".to_string(), format!("{p}:{p}")])
206        .collect::<Vec<_>>();
207    let etcd_version = etcd_version.unwrap_or("v3.5.17");
208    let etcd_image = format!("quay.io/coreos/etcd:{etcd_version}");
209    let peer_url = format!("http://0.0.0.0:{peer_port}");
210    let my_local_ip = local_ip_address::local_ip().unwrap();
211
212    let my_local_ip_str = my_local_ip.to_string();
213
214    let mut arg_list = vec![];
215    arg_list.extend([
216        "run",
217        "-d",
218        "-v",
219        "/usr/share/ca-certificates/:/etc/ssl/certs",
220    ]);
221    arg_list.extend(exposed_port_str.iter().map(std::ops::Deref::deref));
222    arg_list.extend([
223        "--name",
224        "etcd",
225        &etcd_image,
226        "etcd",
227        "-name",
228        "etcd0",
229        "-advertise-client-urls",
230    ]);
231
232    let adv_client_urls = client_ports
233        .iter()
234        .map(|p| format!("http://{my_local_ip_str}:{p}"))
235        .collect::<Vec<_>>()
236        .join(",");
237
238    arg_list.push(&adv_client_urls);
239
240    arg_list.extend(["-listen-client-urls"]);
241
242    let client_ports_fmt = client_ports
243        .iter()
244        .map(|p| format!("http://0.0.0.0:{p}"))
245        .collect::<Vec<_>>()
246        .join(",");
247
248    arg_list.push(&client_ports_fmt);
249
250    arg_list.push("-initial-advertise-peer-urls");
251    let advertise_peer_url = format!("http://{my_local_ip_str}:{peer_port}");
252    arg_list.push(&advertise_peer_url);
253
254    arg_list.extend(["-listen-peer-urls", &peer_url]);
255
256    arg_list.extend(["-initial-cluster-token", "etcd-cluster-1"]);
257
258    arg_list.push("-initial-cluster");
259
260    let init_cluster_url = format!("etcd0=http://{my_local_ip_str}:{peer_port}");
261
262    arg_list.push(&init_cluster_url);
263
264    arg_list.extend(["-initial-cluster-state", "new"]);
265
266    let mut cmd = std::process::Command::new("docker");
267
268    cmd.args(arg_list);
269
270    println!("Starting etcd with command: {:?}", cmd);
271
272    let status = cmd.status();
273    if status.is_err() {
274        panic!("Failed to start etcd: {:?}", status);
275    } else if let Ok(status) = status {
276        if status.success() {
277            println!(
278                "Started etcd with client ports {:?} and peer port {}, statues:{status:?}",
279                client_ports, peer_port
280            );
281        } else {
282            panic!("Failed to start etcd: {:?}", status);
283        }
284    }
285}
286
287/// Stop and remove the etcd container
288pub fn stop_rm_etcd() {
289    let status = std::process::Command::new("docker")
290        .args(["container", "stop", "etcd"])
291        .status();
292    if status.is_err() {
293        panic!("Failed to stop etcd: {:?}", status);
294    } else {
295        println!("Stopped etcd");
296    }
297    // rm the container
298    let status = std::process::Command::new("docker")
299        .args(["container", "rm", "etcd"])
300        .status();
301    if status.is_err() {
302        panic!("Failed to remove etcd container: {:?}", status);
303    } else {
304        println!("Removed etcd container");
305    }
306}
307
308/// Set up a PostgreSQL server in docker.
309pub fn setup_pg(pg_port: u16, pg_version: Option<&str>) {
310    if std::process::Command::new("docker")
311        .args(["-v"])
312        .status()
313        .is_err()
314    {
315        panic!("Docker is not installed");
316    }
317
318    let pg_image = if let Some(pg_version) = pg_version {
319        format!("postgres:{pg_version}")
320    } else {
321        "postgres:latest".to_string()
322    };
323    let pg_password = "admin";
324    let pg_user = "greptimedb";
325
326    let mut arg_list = vec![];
327    arg_list.extend(["run", "-d"]);
328
329    let pg_password_env = format!("POSTGRES_PASSWORD={pg_password}");
330    let pg_user_env = format!("POSTGRES_USER={pg_user}");
331    let pg_port_forward = format!("{pg_port}:5432");
332    arg_list.extend(["-e", &pg_password_env, "-e", &pg_user_env]);
333    arg_list.extend(["-p", &pg_port_forward]);
334
335    arg_list.extend(["--name", "greptimedb_pg", &pg_image]);
336
337    let mut cmd = std::process::Command::new("docker");
338
339    cmd.args(arg_list);
340
341    println!("Starting PostgreSQL with command: {:?}", cmd);
342
343    let status = cmd.status();
344    if status.is_err() {
345        panic!("Failed to start PostgreSQL: {:?}", status);
346    } else if let Ok(status) = status {
347        if status.success() {
348            println!("Started PostgreSQL with port {}", pg_port);
349        } else {
350            panic!("Failed to start PostgreSQL: {:?}", status);
351        }
352    }
353}
354
355/// Set up a MySql server in docker.
356pub fn setup_mysql(mysql_port: u16, mysql_version: Option<&str>) {
357    if std::process::Command::new("docker")
358        .args(["-v"])
359        .status()
360        .is_err()
361    {
362        panic!("Docker is not installed");
363    }
364
365    let mysql_image = if let Some(mysql_version) = mysql_version {
366        format!("bitnami/mysql:{mysql_version}")
367    } else {
368        "bitnami/mysql:5.7".to_string()
369    };
370    let mysql_password = "admin";
371    let mysql_user = "greptimedb";
372
373    let mut arg_list = vec![];
374    arg_list.extend(["run", "-d"]);
375
376    let mysql_password_env = format!("MYSQL_PASSWORD={mysql_password}");
377    let mysql_user_env = format!("MYSQL_USER={mysql_user}");
378    let mysql_root_password_env = format!("MYSQL_ROOT_PASSWORD={mysql_password}");
379    let mysql_port_forward = format!("{mysql_port}:3306");
380    arg_list.extend([
381        "-e",
382        &mysql_password_env,
383        "-e",
384        &mysql_user_env,
385        "-e",
386        &mysql_root_password_env,
387        "-e",
388        "MYSQL_DATABASE=mysql",
389    ]);
390    arg_list.extend(["-p", &mysql_port_forward]);
391
392    arg_list.extend(["--name", "greptimedb_mysql", &mysql_image]);
393
394    let mut cmd = std::process::Command::new("docker");
395
396    cmd.args(arg_list);
397
398    println!("Starting MySQL with command: {:?}", cmd);
399
400    let status = cmd.status();
401    if status.is_err() {
402        panic!("Failed to start MySQL: {:?}", status);
403    } else if let Ok(status) = status {
404        if status.success() {
405            println!("Started MySQL with port {}", mysql_port);
406        } else {
407            panic!("Failed to start MySQL: {:?}", status);
408        }
409    }
410}
411
412/// Get the dir of test cases. This function only works when the runner is run
413/// under the project's dir because it depends on some envs set by cargo.
414pub fn get_case_dir(case_dir: Option<PathBuf>) -> String {
415    let runner_path = match case_dir {
416        Some(path) => path,
417        None => {
418            // retrieve the manifest runner (./tests/runner)
419            let mut runner_crate_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
420            // change directory to cases' dir from runner's (should be runner/../cases)
421            let _ = runner_crate_path.pop();
422            runner_crate_path.push("cases");
423            runner_crate_path
424        }
425    };
426
427    runner_path.into_os_string().into_string().unwrap()
428}
429
430/// Get the dir that contains workspace manifest (the top-level Cargo.toml).
431pub fn get_workspace_root() -> String {
432    // retrieve the manifest runner (./tests/runner)
433    let mut runner_crate_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
434
435    // change directory to workspace's root (runner/../..)
436    let _ = runner_crate_path.pop();
437    let _ = runner_crate_path.pop();
438
439    runner_crate_path.into_os_string().into_string().unwrap()
440}
441
442pub fn get_binary_dir(mode: &str) -> PathBuf {
443    // first go to the workspace root.
444    let mut workspace_root = PathBuf::from(get_workspace_root());
445
446    // change directory to target dir (workspace/target/<build mode>/)
447    workspace_root.push("target");
448    workspace_root.push(mode);
449
450    workspace_root
451}
452
453/// Spin-waiting a socket address is available, or timeout.
454/// Returns whether the addr is up.
455pub async fn check_port(ip_addr: SocketAddr, timeout: Duration) -> bool {
456    let check_task = async {
457        loop {
458            let socket = TcpSocket::new_v4().expect("Cannot create v4 socket");
459            match socket.connect(ip_addr).await {
460                Ok(mut stream) => {
461                    let _ = stream.shutdown().await;
462                    break;
463                }
464                Err(_) => time::sleep(PORT_CHECK_INTERVAL).await,
465            }
466        }
467    };
468
469    tokio::time::timeout(timeout, check_task).await.is_ok()
470}
471
472/// Get the path of sqlness config dir `tests/conf`.
473pub fn sqlness_conf_path() -> PathBuf {
474    let mut sqlness_root_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
475    sqlness_root_path.pop();
476    sqlness_root_path.push("conf");
477    sqlness_root_path
478}
479
480/// Start kafka cluster if needed. Config file is `conf/kafka-cluster.yml`.
481///
482/// ```shell
483/// docker compose -f kafka-cluster.yml up kafka -d --wait
484/// ```
485pub fn setup_wal() {
486    let mut sqlness_root_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
487    sqlness_root_path.pop();
488    sqlness_root_path.push("conf");
489
490    Command::new("docker")
491        .current_dir(sqlness_conf_path())
492        .args([
493            "compose",
494            "-f",
495            "kafka-cluster.yml",
496            "up",
497            "kafka",
498            "-d",
499            "--wait",
500        ])
501        .output()
502        .expect("Failed to start kafka cluster");
503
504    println!("kafka cluster is up");
505}
506
507/// Stop kafka cluster if needed. Config file is `conf/kafka-cluster.yml`.
508///
509/// ```shell
510/// docker compose -f docker-compose-standalone.yml down kafka
511/// ```
512pub fn teardown_wal() {
513    let mut sqlness_root_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
514    sqlness_root_path.pop();
515    sqlness_root_path.push("conf");
516
517    Command::new("docker")
518        .current_dir(sqlness_conf_path())
519        .args(["compose", "-f", "kafka-cluster.yml", "down", "kafka"])
520        .output()
521        .expect("Failed to stop kafka cluster");
522
523    println!("kafka cluster is down");
524}
525
526/// Get a random available port by binding to port 0
527pub fn get_random_port() -> u16 {
528    use std::net::TcpListener;
529    let listener = TcpListener::bind("127.0.0.1:0").expect("Failed to bind to random port");
530    listener
531        .local_addr()
532        .expect("Failed to get local address")
533        .port()
534}