common_runtime_bin/
bin.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use clap::Parser;
16
17#[derive(Debug, Default, Parser)]
18pub struct Command {
19    #[clap(long)]
20    loop_cnt: usize,
21}
22
23fn main() {
24    common_telemetry::init_default_ut_logging();
25    let cmd = Command::parse();
26
27    test_diff_priority_cpu::test_diff_workload_priority(cmd.loop_cnt);
28}
29
30mod test_diff_priority_cpu {
31    use std::path::PathBuf;
32
33    use common_runtime::runtime::{BuilderBuild, Priority, RuntimeTrait};
34    use common_runtime::{Builder, Runtime};
35    use common_telemetry::debug;
36    use tempfile::TempDir;
37
38    fn compute_pi_str(precision: usize) -> String {
39        let mut pi = 0.0;
40        let mut sign = 1.0;
41
42        for i in 0..precision {
43            pi += sign / (2 * i + 1) as f64;
44            sign *= -1.0;
45        }
46
47        pi *= 4.0;
48        format!("{:.prec$}", pi, prec = precision)
49    }
50
51    macro_rules! def_workload_enum {
52        ($($variant:ident),+) => {
53            #[derive(Debug)]
54            enum WorkloadType {
55                $($variant),+
56            }
57
58            /// array of workloads for iteration
59            const WORKLOADS: &'static [WorkloadType] = &[
60                $( WorkloadType::$variant ),+
61            ];
62        };
63    }
64
65    def_workload_enum!(
66        ComputeHeavily,
67        ComputeHeavily2,
68        WriteFile,
69        SpawnBlockingWriteFile
70    );
71
72    async fn workload_compute_heavily() {
73        let prefix = 10;
74
75        for _ in 0..3000 {
76            let _ = compute_pi_str(prefix);
77            tokio::task::yield_now().await;
78        }
79    }
80    async fn workload_compute_heavily2() {
81        let prefix = 30;
82        for _ in 0..2000 {
83            let _ = compute_pi_str(prefix);
84            tokio::task::yield_now().await;
85        }
86    }
87    async fn workload_write_file(_idx: u64, tempdir: PathBuf) {
88        use tokio::io::AsyncWriteExt;
89        let prefix = 50;
90
91        let mut file = tokio::fs::OpenOptions::new()
92            .write(true)
93            .append(true)
94            .create(true)
95            .open(tempdir.join(format!("pi_{}", prefix)))
96            .await
97            .unwrap();
98        for i in 0..200 {
99            let pi = compute_pi_str(prefix);
100
101            if i % 2 == 0 {
102                file.write_all(pi.as_bytes()).await.unwrap();
103            }
104        }
105    }
106    async fn workload_spawn_blocking_write_file(tempdir: PathBuf) {
107        use std::io::Write;
108        let prefix = 100;
109        let mut file = Some(
110            std::fs::OpenOptions::new()
111                .append(true)
112                .create(true)
113                .open(tempdir.join(format!("pi_{}", prefix)))
114                .unwrap(),
115        );
116        for i in 0..100 {
117            let pi = compute_pi_str(prefix);
118            if i % 2 == 0 {
119                let mut file1 = file.take().unwrap();
120                file = Some(
121                    tokio::task::spawn_blocking(move || {
122                        file1.write_all(pi.as_bytes()).unwrap();
123                        file1
124                    })
125                    .await
126                    .unwrap(),
127                );
128            }
129        }
130    }
131
132    pub fn test_diff_workload_priority(loop_cnt: usize) {
133        let tempdir = tempfile::tempdir().unwrap();
134        let priorities = [
135            Priority::VeryLow,
136            Priority::Low,
137            Priority::Middle,
138            Priority::High,
139            Priority::VeryHigh,
140        ];
141        for wl in WORKLOADS {
142            for p in priorities.iter() {
143                let runtime: Runtime = Builder::default()
144                    .runtime_name("test")
145                    .thread_name("test")
146                    .worker_threads(8)
147                    .priority(*p)
148                    .build()
149                    .expect("Fail to create runtime");
150                let runtime2 = runtime.clone();
151                runtime.block_on(test_spec_priority_and_workload(
152                    *p, runtime2, wl, &tempdir, loop_cnt,
153                ));
154            }
155        }
156    }
157
158    async fn test_spec_priority_and_workload(
159        priority: Priority,
160        runtime: Runtime,
161        workload_id: &WorkloadType,
162        tempdir: &TempDir,
163        loop_cnt: usize,
164    ) {
165        tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
166        debug!(
167            "testing cpu usage for priority {:?} workload_id {:?}",
168            priority, workload_id,
169        );
170        // start monitor thread
171        let mut tasks = vec![];
172        let start = std::time::Instant::now();
173        for i in 0..loop_cnt {
174            // persist cpu usage in json: {priority}.{workload_id}
175            match *workload_id {
176                WorkloadType::ComputeHeavily => {
177                    tasks.push(runtime.spawn(workload_compute_heavily()));
178                }
179                WorkloadType::ComputeHeavily2 => {
180                    tasks.push(runtime.spawn(workload_compute_heavily2()));
181                }
182                WorkloadType::SpawnBlockingWriteFile => {
183                    tasks.push(runtime.spawn(workload_spawn_blocking_write_file(
184                        tempdir.path().to_path_buf(),
185                    )));
186                }
187                WorkloadType::WriteFile => {
188                    tasks.push(
189                        runtime.spawn(workload_write_file(i as u64, tempdir.path().to_path_buf())),
190                    );
191                }
192            }
193        }
194        for task in tasks {
195            task.await.unwrap();
196        }
197        let elapsed = start.elapsed();
198        debug!(
199            "test cpu usage for priority {:?} workload_id {:?} elapsed {}ms",
200            priority,
201            workload_id,
202            elapsed.as_millis()
203        );
204    }
205}