1use std::io::Read;
16use std::net::SocketAddr;
17use std::path::{Path, PathBuf};
18use std::process::Command;
19use std::time::Duration;
20
21use sha2::{Digest, Sha256};
22use tokio::io::AsyncWriteExt;
23use tokio::net::TcpSocket;
24use tokio::time;
25use tokio_stream::StreamExt;
26
27const PORT_CHECK_INTERVAL: Duration = Duration::from_millis(100);
29
30pub const PROGRAM: &str = "greptime";
31
32fn http_proxy() -> Option<String> {
33 for proxy in ["http_proxy", "HTTP_PROXY", "all_proxy", "ALL_PROXY"] {
34 if let Ok(proxy_addr) = std::env::var(proxy) {
35 println!("Getting Proxy from env var: {}={}", proxy, proxy_addr);
36 return Some(proxy_addr);
37 }
38 }
39 None
40}
41
42fn https_proxy() -> Option<String> {
43 for proxy in ["https_proxy", "HTTPS_PROXY", "all_proxy", "ALL_PROXY"] {
44 if let Ok(proxy_addr) = std::env::var(proxy) {
45 println!("Getting Proxy from env var: {}={}", proxy, proxy_addr);
46 return Some(proxy_addr);
47 }
48 }
49 None
50}
51
52async fn download_files(url: &str, path: &str) {
53 let proxy = if url.starts_with("http://") {
54 http_proxy().map(|proxy| reqwest::Proxy::http(proxy).unwrap())
55 } else if url.starts_with("https://") {
56 https_proxy().map(|proxy| reqwest::Proxy::https(proxy).unwrap())
57 } else {
58 None
59 };
60
61 let client = proxy
62 .map(|proxy| {
63 reqwest::Client::builder()
64 .proxy(proxy)
65 .build()
66 .expect("Failed to build client")
67 })
68 .unwrap_or(reqwest::Client::new());
69
70 let mut file = tokio::fs::File::create(path)
71 .await
72 .unwrap_or_else(|_| panic!("Failed to create file in {path}"));
73 println!("Downloading {}...", url);
74
75 let resp = client
76 .get(url)
77 .send()
78 .await
79 .expect("Failed to send download request");
80 let len = resp.content_length();
81 let mut stream = resp.bytes_stream();
82 let mut size_downloaded = 0;
83
84 while let Some(chunk_result) = stream.next().await {
85 let chunk = chunk_result.unwrap();
86 size_downloaded += chunk.len();
87 if let Some(len) = len {
88 print!("\rDownloading {}/{} bytes", size_downloaded, len);
89 } else {
90 print!("\rDownloaded {} bytes", size_downloaded);
91 }
92
93 file.write_all(&chunk).await.unwrap();
94 }
95
96 file.flush().await.unwrap();
97
98 println!("\nDownloaded {}", url);
99}
100
101fn decompress(archive: &str, dest: &str) {
102 let tar = std::fs::File::open(archive).unwrap();
103 let dec = flate2::read::GzDecoder::new(tar);
104 let mut a = tar::Archive::new(dec);
105 a.unpack(dest).unwrap();
106}
107
108pub async fn pull_binary(version: &str) {
114 let os = std::env::consts::OS;
115 let arch = match std::env::consts::ARCH {
116 "x86_64" => "amd64",
117 "aarch64" => "arm64",
118 _ => panic!("Unsupported arch: {}", std::env::consts::ARCH),
119 };
120 let triple = format!("greptime-{}-{}-{}", os, arch, version);
121 let filename = format!("{triple}.tar.gz");
122
123 let url = format!(
124 "https://github.com/GreptimeTeam/greptimedb/releases/download/{version}/{filename}"
125 );
126 println!("Downloading {version} binary from {}", url);
127
128 let _ = std::fs::create_dir(version);
130
131 let archive = Path::new(version).join(filename);
132 let folder_path = Path::new(version);
133
134 download_files(&url, &archive.to_string_lossy()).await;
136
137 let checksum_file = format!("{triple}.sha256sum");
138 let checksum_url = format!(
139 "https://github.com/GreptimeTeam/greptimedb/releases/download/{version}/{checksum_file}"
140 );
141 download_files(
142 &checksum_url,
143 &PathBuf::from_iter([version, &checksum_file]).to_string_lossy(),
144 )
145 .await;
146
147 let mut file = std::fs::File::open(&archive).unwrap();
149 let mut sha256 = Sha256::new();
150 std::io::copy(&mut file, &mut sha256).unwrap();
151 let checksum: Vec<u8> = sha256.finalize().to_vec();
152
153 let mut expected_checksum =
154 std::fs::File::open(PathBuf::from_iter([version, &checksum_file])).unwrap();
155 let mut buf = String::new();
156 expected_checksum.read_to_string(&mut buf).unwrap();
157 let expected_checksum = hex::decode(buf.lines().next().unwrap()).unwrap();
158
159 assert_eq!(
160 checksum, expected_checksum,
161 "Checksum mismatched, downloaded file is corrupted"
162 );
163
164 decompress(&archive.to_string_lossy(), &folder_path.to_string_lossy());
165 println!("Downloaded and extracted {version} binary to {folder_path:?}");
166
167 std::fs::rename(
169 PathBuf::from_iter([version, &triple, "greptime"]),
170 PathBuf::from_iter([version, "greptime"]),
171 )
172 .unwrap();
173
174 std::fs::remove_file(&archive).unwrap();
176 std::fs::remove_dir(PathBuf::from_iter([version, &triple])).unwrap();
177}
178
179pub async fn maybe_pull_binary(version: &str, pull_version_on_need: bool) {
181 let exist = Path::new(version).join(PROGRAM).is_file();
182 match (exist, pull_version_on_need) {
183 (true, _) => println!("Binary {version} exists"),
184 (false, false) => panic!(
185 "Binary {version} does not exist, please run with --pull-version-on-need or manually download it"
186 ),
187 (false, true) => {
188 pull_binary(version).await;
189 }
190 }
191}
192
193pub fn setup_etcd(client_ports: Vec<u16>, peer_port: Option<u16>, etcd_version: Option<&str>) {
195 if std::process::Command::new("docker")
196 .args(["-v"])
197 .status()
198 .is_err()
199 {
200 panic!("Docker is not installed");
201 }
202 let peer_port = peer_port.unwrap_or(2380);
203 let exposed_port: Vec<_> = client_ports.iter().chain(Some(&peer_port)).collect();
204 let exposed_port_str = exposed_port
205 .iter()
206 .flat_map(|p| ["-p".to_string(), format!("{p}:{p}")])
207 .collect::<Vec<_>>();
208 let etcd_version = etcd_version.unwrap_or("v3.5.17");
209 let etcd_image = format!("quay.io/coreos/etcd:{etcd_version}");
210 let peer_url = format!("http://0.0.0.0:{peer_port}");
211 let my_local_ip = local_ip_address::local_ip().unwrap();
212
213 let my_local_ip_str = my_local_ip.to_string();
214
215 let mut arg_list = vec![];
216 arg_list.extend([
217 "run",
218 "-d",
219 "-v",
220 "/usr/share/ca-certificates/:/etc/ssl/certs",
221 ]);
222 arg_list.extend(exposed_port_str.iter().map(std::ops::Deref::deref));
223 arg_list.extend([
224 "--name",
225 "etcd",
226 &etcd_image,
227 "etcd",
228 "-name",
229 "etcd0",
230 "-advertise-client-urls",
231 ]);
232
233 let adv_client_urls = client_ports
234 .iter()
235 .map(|p| format!("http://{my_local_ip_str}:{p}"))
236 .collect::<Vec<_>>()
237 .join(",");
238
239 arg_list.push(&adv_client_urls);
240
241 arg_list.extend(["-listen-client-urls"]);
242
243 let client_ports_fmt = client_ports
244 .iter()
245 .map(|p| format!("http://0.0.0.0:{p}"))
246 .collect::<Vec<_>>()
247 .join(",");
248
249 arg_list.push(&client_ports_fmt);
250
251 arg_list.push("-initial-advertise-peer-urls");
252 let advertise_peer_url = format!("http://{my_local_ip_str}:{peer_port}");
253 arg_list.push(&advertise_peer_url);
254
255 arg_list.extend(["-listen-peer-urls", &peer_url]);
256
257 arg_list.extend(["-initial-cluster-token", "etcd-cluster-1"]);
258
259 arg_list.push("-initial-cluster");
260
261 let init_cluster_url = format!("etcd0=http://{my_local_ip_str}:{peer_port}");
262
263 arg_list.push(&init_cluster_url);
264
265 arg_list.extend(["-initial-cluster-state", "new"]);
266
267 let mut cmd = std::process::Command::new("docker");
268
269 cmd.args(arg_list);
270
271 println!("Starting etcd with command: {:?}", cmd);
272
273 let status = cmd.status();
274 if status.is_err() {
275 panic!("Failed to start etcd: {:?}", status);
276 } else if let Ok(status) = status {
277 if status.success() {
278 println!(
279 "Started etcd with client ports {:?} and peer port {}, statues:{status:?}",
280 client_ports, peer_port
281 );
282 } else {
283 panic!("Failed to start etcd: {:?}", status);
284 }
285 }
286}
287
288pub fn stop_rm_etcd() {
290 let status = std::process::Command::new("docker")
291 .args(["container", "stop", "etcd"])
292 .status();
293 if status.is_err() {
294 panic!("Failed to stop etcd: {:?}", status);
295 } else {
296 println!("Stopped etcd");
297 }
298 let status = std::process::Command::new("docker")
300 .args(["container", "rm", "etcd"])
301 .status();
302 if status.is_err() {
303 panic!("Failed to remove etcd container: {:?}", status);
304 } else {
305 println!("Removed etcd container");
306 }
307}
308
309pub fn setup_pg(pg_port: u16, pg_version: Option<&str>) {
311 if std::process::Command::new("docker")
312 .args(["-v"])
313 .status()
314 .is_err()
315 {
316 panic!("Docker is not installed");
317 }
318
319 let pg_image = if let Some(pg_version) = pg_version {
320 format!("postgres:{pg_version}")
321 } else {
322 "postgres:latest".to_string()
323 };
324 let pg_password = "admin";
325 let pg_user = "greptimedb";
326
327 let mut arg_list = vec![];
328 arg_list.extend(["run", "-d"]);
329
330 let pg_password_env = format!("POSTGRES_PASSWORD={pg_password}");
331 let pg_user_env = format!("POSTGRES_USER={pg_user}");
332 let pg_port_forward = format!("{pg_port}:5432");
333 arg_list.extend(["-e", &pg_password_env, "-e", &pg_user_env]);
334 arg_list.extend(["-p", &pg_port_forward]);
335
336 arg_list.extend(["--name", "greptimedb_pg", &pg_image]);
337
338 let mut cmd = std::process::Command::new("docker");
339
340 cmd.args(arg_list);
341
342 println!("Starting PostgreSQL with command: {:?}", cmd);
343
344 let status = cmd.status();
345 if status.is_err() {
346 panic!("Failed to start PostgreSQL: {:?}", status);
347 } else if let Ok(status) = status {
348 if status.success() {
349 println!("Started PostgreSQL with port {}", pg_port);
350 } else {
351 panic!("Failed to start PostgreSQL: {:?}", status);
352 }
353 }
354}
355
356pub fn setup_mysql(mysql_port: u16, mysql_version: Option<&str>) {
358 if std::process::Command::new("docker")
359 .args(["-v"])
360 .status()
361 .is_err()
362 {
363 panic!("Docker is not installed");
364 }
365
366 let mysql_image = if let Some(mysql_version) = mysql_version {
367 format!("greptime/mysql:{mysql_version}")
368 } else {
369 "greptime/mysql:5.7".to_string()
370 };
371 let mysql_password = "admin";
372 let mysql_user = "greptimedb";
373
374 let mut arg_list = vec![];
375 arg_list.extend(["run", "-d"]);
376
377 let mysql_password_env = format!("MYSQL_PASSWORD={mysql_password}");
378 let mysql_user_env = format!("MYSQL_USER={mysql_user}");
379 let mysql_root_password_env = format!("MYSQL_ROOT_PASSWORD={mysql_password}");
380 let mysql_port_forward = format!("{mysql_port}:3306");
381 arg_list.extend([
382 "-e",
383 &mysql_password_env,
384 "-e",
385 &mysql_user_env,
386 "-e",
387 &mysql_root_password_env,
388 "-e",
389 "MYSQL_DATABASE=mysql",
390 ]);
391 arg_list.extend(["-p", &mysql_port_forward]);
392
393 arg_list.extend(["--name", "greptimedb_mysql", &mysql_image]);
394
395 let mut cmd = std::process::Command::new("docker");
396
397 cmd.args(arg_list);
398
399 println!("Starting MySQL with command: {:?}", cmd);
400
401 let status = cmd.status();
402 if status.is_err() {
403 panic!("Failed to start MySQL: {:?}", status);
404 } else if let Ok(status) = status {
405 if status.success() {
406 println!("Started MySQL with port {}", mysql_port);
407 } else {
408 panic!("Failed to start MySQL: {:?}", status);
409 }
410 }
411}
412
413pub fn get_case_dir(case_dir: Option<PathBuf>) -> String {
416 let runner_path = match case_dir {
417 Some(path) => path,
418 None => {
419 let mut runner_crate_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
421 let _ = runner_crate_path.pop();
423 runner_crate_path.push("cases");
424 runner_crate_path
425 }
426 };
427
428 runner_path.into_os_string().into_string().unwrap()
429}
430
431pub fn get_workspace_root() -> String {
433 let mut runner_crate_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
435
436 let _ = runner_crate_path.pop();
438 let _ = runner_crate_path.pop();
439
440 runner_crate_path.into_os_string().into_string().unwrap()
441}
442
443pub fn get_binary_dir(mode: &str) -> PathBuf {
444 let mut workspace_root = PathBuf::from(get_workspace_root());
446
447 workspace_root.push("target");
449 workspace_root.push(mode);
450
451 workspace_root
452}
453
454pub async fn check_port(ip_addr: SocketAddr, timeout: Duration) -> bool {
457 let check_task = async {
458 loop {
459 let socket = TcpSocket::new_v4().expect("Cannot create v4 socket");
460 match socket.connect(ip_addr).await {
461 Ok(mut stream) => {
462 let _ = stream.shutdown().await;
463 break;
464 }
465 Err(_) => time::sleep(PORT_CHECK_INTERVAL).await,
466 }
467 }
468 };
469
470 tokio::time::timeout(timeout, check_task).await.is_ok()
471}
472
473pub fn sqlness_conf_path() -> PathBuf {
475 let mut sqlness_root_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
476 sqlness_root_path.pop();
477 sqlness_root_path.push("conf");
478 sqlness_root_path
479}
480
481pub fn setup_wal() {
487 let mut sqlness_root_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
488 sqlness_root_path.pop();
489 sqlness_root_path.push("conf");
490
491 Command::new("docker")
492 .current_dir(sqlness_conf_path())
493 .args([
494 "compose",
495 "-f",
496 "kafka-cluster.yml",
497 "up",
498 "kafka",
499 "-d",
500 "--wait",
501 ])
502 .output()
503 .expect("Failed to start kafka cluster");
504
505 println!("kafka cluster is up");
506}
507
508pub fn teardown_wal() {
514 let mut sqlness_root_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
515 sqlness_root_path.pop();
516 sqlness_root_path.push("conf");
517
518 Command::new("docker")
519 .current_dir(sqlness_conf_path())
520 .args(["compose", "-f", "kafka-cluster.yml", "down", "kafka"])
521 .output()
522 .expect("Failed to stop kafka cluster");
523
524 println!("kafka cluster is down");
525}
526
527pub fn get_random_port() -> u16 {
529 use std::net::TcpListener;
530 let listener = TcpListener::bind("127.0.0.1:0").expect("Failed to bind to random port");
531 listener
532 .local_addr()
533 .expect("Failed to get local address")
534 .port()
535}