1use std::io::Read;
16use std::net::SocketAddr;
17use std::path::{Path, PathBuf};
18use std::process::Command;
19use std::time::Duration;
20
21use sha2::{Digest, Sha256};
22use tokio::io::AsyncWriteExt;
23use tokio::net::TcpSocket;
24use tokio::time;
25use tokio_stream::StreamExt;
26
27const PORT_CHECK_INTERVAL: Duration = Duration::from_millis(100);
29
30#[cfg(not(windows))]
31pub const PROGRAM: &str = "./greptime";
32#[cfg(windows)]
33pub const PROGRAM: &str = "greptime.exe";
34
35fn http_proxy() -> Option<String> {
36 for proxy in ["http_proxy", "HTTP_PROXY", "all_proxy", "ALL_PROXY"] {
37 if let Ok(proxy_addr) = std::env::var(proxy) {
38 println!("Getting Proxy from env var: {}={}", proxy, proxy_addr);
39 return Some(proxy_addr);
40 }
41 }
42 None
43}
44
45fn https_proxy() -> Option<String> {
46 for proxy in ["https_proxy", "HTTPS_PROXY", "all_proxy", "ALL_PROXY"] {
47 if let Ok(proxy_addr) = std::env::var(proxy) {
48 println!("Getting Proxy from env var: {}={}", proxy, proxy_addr);
49 return Some(proxy_addr);
50 }
51 }
52 None
53}
54
55async fn download_files(url: &str, path: &str) {
56 let proxy = if url.starts_with("http://") {
57 http_proxy().map(|proxy| reqwest::Proxy::http(proxy).unwrap())
58 } else if url.starts_with("https://") {
59 https_proxy().map(|proxy| reqwest::Proxy::https(proxy).unwrap())
60 } else {
61 None
62 };
63
64 let client = proxy
65 .map(|proxy| {
66 reqwest::Client::builder()
67 .proxy(proxy)
68 .build()
69 .expect("Failed to build client")
70 })
71 .unwrap_or(reqwest::Client::new());
72
73 let mut file = tokio::fs::File::create(path)
74 .await
75 .unwrap_or_else(|_| panic!("Failed to create file in {path}"));
76 println!("Downloading {}...", url);
77
78 let resp = client
79 .get(url)
80 .send()
81 .await
82 .expect("Failed to send download request");
83 let len = resp.content_length();
84 let mut stream = resp.bytes_stream();
85 let mut size_downloaded = 0;
86
87 while let Some(chunk_result) = stream.next().await {
88 let chunk = chunk_result.unwrap();
89 size_downloaded += chunk.len();
90 if let Some(len) = len {
91 print!("\rDownloading {}/{} bytes", size_downloaded, len);
92 } else {
93 print!("\rDownloaded {} bytes", size_downloaded);
94 }
95
96 file.write_all(&chunk).await.unwrap();
97 }
98
99 file.flush().await.unwrap();
100
101 println!("\nDownloaded {}", url);
102}
103
104fn decompress(archive: &str, dest: &str) {
105 let tar = std::fs::File::open(archive).unwrap();
106 let dec = flate2::read::GzDecoder::new(tar);
107 let mut a = tar::Archive::new(dec);
108 a.unpack(dest).unwrap();
109}
110
111pub async fn pull_binary(version: &str) {
117 let os = std::env::consts::OS;
118 let arch = match std::env::consts::ARCH {
119 "x86_64" => "amd64",
120 "aarch64" => "arm64",
121 _ => panic!("Unsupported arch: {}", std::env::consts::ARCH),
122 };
123 let triple = format!("greptime-{}-{}-{}", os, arch, version);
124 let filename = format!("{triple}.tar.gz");
125
126 let url = format!(
127 "https://github.com/GreptimeTeam/greptimedb/releases/download/{version}/{filename}"
128 );
129 println!("Downloading {version} binary from {}", url);
130
131 let _ = std::fs::create_dir(version);
133
134 let archive = Path::new(version).join(filename);
135 let folder_path = Path::new(version);
136
137 download_files(&url, &archive.to_string_lossy()).await;
139
140 let checksum_file = format!("{triple}.sha256sum");
141 let checksum_url = format!(
142 "https://github.com/GreptimeTeam/greptimedb/releases/download/{version}/{checksum_file}"
143 );
144 download_files(
145 &checksum_url,
146 &PathBuf::from_iter([version, &checksum_file]).to_string_lossy(),
147 )
148 .await;
149
150 let mut file = std::fs::File::open(&archive).unwrap();
152 let mut sha256 = Sha256::new();
153 std::io::copy(&mut file, &mut sha256).unwrap();
154 let checksum: Vec<u8> = sha256.finalize().to_vec();
155
156 let mut expected_checksum =
157 std::fs::File::open(PathBuf::from_iter([version, &checksum_file])).unwrap();
158 let mut buf = String::new();
159 expected_checksum.read_to_string(&mut buf).unwrap();
160 let expected_checksum = hex::decode(buf.lines().next().unwrap()).unwrap();
161
162 assert_eq!(
163 checksum, expected_checksum,
164 "Checksum mismatched, downloaded file is corrupted"
165 );
166
167 decompress(&archive.to_string_lossy(), &folder_path.to_string_lossy());
168 println!("Downloaded and extracted {version} binary to {folder_path:?}");
169
170 std::fs::rename(
172 PathBuf::from_iter([version, &triple, "greptime"]),
173 PathBuf::from_iter([version, "greptime"]),
174 )
175 .unwrap();
176
177 std::fs::remove_file(&archive).unwrap();
179 std::fs::remove_dir(PathBuf::from_iter([version, &triple])).unwrap();
180}
181
182pub async fn maybe_pull_binary(version: &str, pull_version_on_need: bool) {
184 let exist = Path::new(version).join(PROGRAM).is_file();
185 match (exist, pull_version_on_need){
186 (true, _) => println!("Binary {version} exists"),
187 (false, false) => panic!("Binary {version} does not exist, please run with --pull-version-on-need or manually download it"),
188 (false, true) => { pull_binary(version).await; },
189 }
190}
191
192pub fn setup_etcd(client_ports: Vec<u16>, peer_port: Option<u16>, etcd_version: Option<&str>) {
194 if std::process::Command::new("docker")
195 .args(["-v"])
196 .status()
197 .is_err()
198 {
199 panic!("Docker is not installed");
200 }
201 let peer_port = peer_port.unwrap_or(2380);
202 let exposed_port: Vec<_> = client_ports.iter().chain(Some(&peer_port)).collect();
203 let exposed_port_str = exposed_port
204 .iter()
205 .flat_map(|p| ["-p".to_string(), format!("{p}:{p}")])
206 .collect::<Vec<_>>();
207 let etcd_version = etcd_version.unwrap_or("v3.5.17");
208 let etcd_image = format!("quay.io/coreos/etcd:{etcd_version}");
209 let peer_url = format!("http://0.0.0.0:{peer_port}");
210 let my_local_ip = local_ip_address::local_ip().unwrap();
211
212 let my_local_ip_str = my_local_ip.to_string();
213
214 let mut arg_list = vec![];
215 arg_list.extend([
216 "run",
217 "-d",
218 "-v",
219 "/usr/share/ca-certificates/:/etc/ssl/certs",
220 ]);
221 arg_list.extend(exposed_port_str.iter().map(std::ops::Deref::deref));
222 arg_list.extend([
223 "--name",
224 "etcd",
225 &etcd_image,
226 "etcd",
227 "-name",
228 "etcd0",
229 "-advertise-client-urls",
230 ]);
231
232 let adv_client_urls = client_ports
233 .iter()
234 .map(|p| format!("http://{my_local_ip_str}:{p}"))
235 .collect::<Vec<_>>()
236 .join(",");
237
238 arg_list.push(&adv_client_urls);
239
240 arg_list.extend(["-listen-client-urls"]);
241
242 let client_ports_fmt = client_ports
243 .iter()
244 .map(|p| format!("http://0.0.0.0:{p}"))
245 .collect::<Vec<_>>()
246 .join(",");
247
248 arg_list.push(&client_ports_fmt);
249
250 arg_list.push("-initial-advertise-peer-urls");
251 let advertise_peer_url = format!("http://{my_local_ip_str}:{peer_port}");
252 arg_list.push(&advertise_peer_url);
253
254 arg_list.extend(["-listen-peer-urls", &peer_url]);
255
256 arg_list.extend(["-initial-cluster-token", "etcd-cluster-1"]);
257
258 arg_list.push("-initial-cluster");
259
260 let init_cluster_url = format!("etcd0=http://{my_local_ip_str}:{peer_port}");
261
262 arg_list.push(&init_cluster_url);
263
264 arg_list.extend(["-initial-cluster-state", "new"]);
265
266 let mut cmd = std::process::Command::new("docker");
267
268 cmd.args(arg_list);
269
270 println!("Starting etcd with command: {:?}", cmd);
271
272 let status = cmd.status();
273 if status.is_err() {
274 panic!("Failed to start etcd: {:?}", status);
275 } else if let Ok(status) = status {
276 if status.success() {
277 println!(
278 "Started etcd with client ports {:?} and peer port {}, statues:{status:?}",
279 client_ports, peer_port
280 );
281 } else {
282 panic!("Failed to start etcd: {:?}", status);
283 }
284 }
285}
286
287pub fn stop_rm_etcd() {
289 let status = std::process::Command::new("docker")
290 .args(["container", "stop", "etcd"])
291 .status();
292 if status.is_err() {
293 panic!("Failed to stop etcd: {:?}", status);
294 } else {
295 println!("Stopped etcd");
296 }
297 let status = std::process::Command::new("docker")
299 .args(["container", "rm", "etcd"])
300 .status();
301 if status.is_err() {
302 panic!("Failed to remove etcd container: {:?}", status);
303 } else {
304 println!("Removed etcd container");
305 }
306}
307
308pub fn setup_pg(pg_port: u16, pg_version: Option<&str>) {
310 if std::process::Command::new("docker")
311 .args(["-v"])
312 .status()
313 .is_err()
314 {
315 panic!("Docker is not installed");
316 }
317
318 let pg_image = if let Some(pg_version) = pg_version {
319 format!("postgres:{pg_version}")
320 } else {
321 "postgres:latest".to_string()
322 };
323 let pg_password = "admin";
324 let pg_user = "greptimedb";
325
326 let mut arg_list = vec![];
327 arg_list.extend(["run", "-d"]);
328
329 let pg_password_env = format!("POSTGRES_PASSWORD={pg_password}");
330 let pg_user_env = format!("POSTGRES_USER={pg_user}");
331 let pg_port_forward = format!("{pg_port}:5432");
332 arg_list.extend(["-e", &pg_password_env, "-e", &pg_user_env]);
333 arg_list.extend(["-p", &pg_port_forward]);
334
335 arg_list.extend(["--name", "greptimedb_pg", &pg_image]);
336
337 let mut cmd = std::process::Command::new("docker");
338
339 cmd.args(arg_list);
340
341 println!("Starting PostgreSQL with command: {:?}", cmd);
342
343 let status = cmd.status();
344 if status.is_err() {
345 panic!("Failed to start PostgreSQL: {:?}", status);
346 } else if let Ok(status) = status {
347 if status.success() {
348 println!("Started PostgreSQL with port {}", pg_port);
349 } else {
350 panic!("Failed to start PostgreSQL: {:?}", status);
351 }
352 }
353}
354
355pub fn setup_mysql(mysql_port: u16, mysql_version: Option<&str>) {
357 if std::process::Command::new("docker")
358 .args(["-v"])
359 .status()
360 .is_err()
361 {
362 panic!("Docker is not installed");
363 }
364
365 let mysql_image = if let Some(mysql_version) = mysql_version {
366 format!("bitnami/mysql:{mysql_version}")
367 } else {
368 "bitnami/mysql:5.7".to_string()
369 };
370 let mysql_password = "admin";
371 let mysql_user = "greptimedb";
372
373 let mut arg_list = vec![];
374 arg_list.extend(["run", "-d"]);
375
376 let mysql_password_env = format!("MYSQL_PASSWORD={mysql_password}");
377 let mysql_user_env = format!("MYSQL_USER={mysql_user}");
378 let mysql_root_password_env = format!("MYSQL_ROOT_PASSWORD={mysql_password}");
379 let mysql_port_forward = format!("{mysql_port}:3306");
380 arg_list.extend([
381 "-e",
382 &mysql_password_env,
383 "-e",
384 &mysql_user_env,
385 "-e",
386 &mysql_root_password_env,
387 "-e",
388 "MYSQL_DATABASE=mysql",
389 ]);
390 arg_list.extend(["-p", &mysql_port_forward]);
391
392 arg_list.extend(["--name", "greptimedb_mysql", &mysql_image]);
393
394 let mut cmd = std::process::Command::new("docker");
395
396 cmd.args(arg_list);
397
398 println!("Starting MySQL with command: {:?}", cmd);
399
400 let status = cmd.status();
401 if status.is_err() {
402 panic!("Failed to start MySQL: {:?}", status);
403 } else if let Ok(status) = status {
404 if status.success() {
405 println!("Started MySQL with port {}", mysql_port);
406 } else {
407 panic!("Failed to start MySQL: {:?}", status);
408 }
409 }
410}
411
412pub fn get_case_dir(case_dir: Option<PathBuf>) -> String {
415 let runner_path = match case_dir {
416 Some(path) => path,
417 None => {
418 let mut runner_crate_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
420 let _ = runner_crate_path.pop();
422 runner_crate_path.push("cases");
423 runner_crate_path
424 }
425 };
426
427 runner_path.into_os_string().into_string().unwrap()
428}
429
430pub fn get_workspace_root() -> String {
432 let mut runner_crate_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
434
435 let _ = runner_crate_path.pop();
437 let _ = runner_crate_path.pop();
438
439 runner_crate_path.into_os_string().into_string().unwrap()
440}
441
442pub fn get_binary_dir(mode: &str) -> PathBuf {
443 let mut workspace_root = PathBuf::from(get_workspace_root());
445
446 workspace_root.push("target");
448 workspace_root.push(mode);
449
450 workspace_root
451}
452
453pub async fn check_port(ip_addr: SocketAddr, timeout: Duration) -> bool {
456 let check_task = async {
457 loop {
458 let socket = TcpSocket::new_v4().expect("Cannot create v4 socket");
459 match socket.connect(ip_addr).await {
460 Ok(mut stream) => {
461 let _ = stream.shutdown().await;
462 break;
463 }
464 Err(_) => time::sleep(PORT_CHECK_INTERVAL).await,
465 }
466 }
467 };
468
469 tokio::time::timeout(timeout, check_task).await.is_ok()
470}
471
472pub fn sqlness_conf_path() -> PathBuf {
474 let mut sqlness_root_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
475 sqlness_root_path.pop();
476 sqlness_root_path.push("conf");
477 sqlness_root_path
478}
479
480pub fn setup_wal() {
486 let mut sqlness_root_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
487 sqlness_root_path.pop();
488 sqlness_root_path.push("conf");
489
490 Command::new("docker")
491 .current_dir(sqlness_conf_path())
492 .args([
493 "compose",
494 "-f",
495 "kafka-cluster.yml",
496 "up",
497 "kafka",
498 "-d",
499 "--wait",
500 ])
501 .output()
502 .expect("Failed to start kafka cluster");
503
504 println!("kafka cluster is up");
505}
506
507pub fn teardown_wal() {
513 let mut sqlness_root_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
514 sqlness_root_path.pop();
515 sqlness_root_path.push("conf");
516
517 Command::new("docker")
518 .current_dir(sqlness_conf_path())
519 .args(["compose", "-f", "kafka-cluster.yml", "down", "kafka"])
520 .output()
521 .expect("Failed to stop kafka cluster");
522
523 println!("kafka cluster is down");
524}
525
526pub fn get_random_port() -> u16 {
528 use std::net::TcpListener;
529 let listener = TcpListener::bind("127.0.0.1:0").expect("Failed to bind to random port");
530 listener
531 .local_addr()
532 .expect("Failed to get local address")
533 .port()
534}