1use std::future::Future;
17use std::sync::{Mutex, Once};
18
19use common_telemetry::info;
20use once_cell::sync::Lazy;
21use paste::paste;
22use serde::{Deserialize, Serialize};
23
24use crate::runtime::{BuilderBuild, RuntimeTrait};
25use crate::{Builder, JoinHandle, Runtime};
26
27const GLOBAL_WORKERS: usize = 8;
28const COMPACT_WORKERS: usize = 4;
29const HB_WORKERS: usize = 2;
30
31#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
33pub struct RuntimeOptions {
34 pub global_rt_size: usize,
36 pub compact_rt_size: usize,
38}
39
40impl Default for RuntimeOptions {
41 fn default() -> Self {
42 let cpus = num_cpus::get();
43 Self {
44 global_rt_size: cpus,
45 compact_rt_size: usize::max(cpus / 2, 1),
46 }
47 }
48}
49
50pub fn create_runtime(runtime_name: &str, thread_name: &str, worker_threads: usize) -> Runtime {
51 info!("Creating runtime with runtime_name: {runtime_name}, thread_name: {thread_name}, work_threads: {worker_threads}.");
52 Builder::default()
53 .runtime_name(runtime_name)
54 .thread_name(thread_name)
55 .worker_threads(worker_threads)
56 .build()
57 .expect("Fail to create runtime")
58}
59
60struct GlobalRuntimes {
61 global_runtime: Runtime,
62 compact_runtime: Runtime,
63 hb_runtime: Runtime,
64}
65
66macro_rules! define_spawn {
67 ($type: ident) => {
68 paste! {
69
70 fn [<spawn_ $type>]<F>(&self, future: F) -> JoinHandle<F::Output>
71 where
72 F: Future + Send + 'static,
73 F::Output: Send + 'static,
74 {
75 self.[<$type _runtime>].spawn(future)
76 }
77
78 fn [<spawn_blocking_ $type>]<F, R>(&self, future: F) -> JoinHandle<R>
79 where
80 F: FnOnce() -> R + Send + 'static,
81 R: Send + 'static,
82 {
83 self.[<$type _runtime>].spawn_blocking(future)
84 }
85
86 fn [<block_on_ $type>]<F: Future>(&self, future: F) -> F::Output {
87 self.[<$type _runtime>].block_on(future)
88 }
89 }
90 };
91}
92
93impl GlobalRuntimes {
94 define_spawn!(global);
95 define_spawn!(compact);
96 define_spawn!(hb);
97
98 fn new(global: Option<Runtime>, compact: Option<Runtime>, heartbeat: Option<Runtime>) -> Self {
99 Self {
100 global_runtime: global
101 .unwrap_or_else(|| create_runtime("global", "global-worker", GLOBAL_WORKERS)),
102 compact_runtime: compact
103 .unwrap_or_else(|| create_runtime("compact", "compact-worker", COMPACT_WORKERS)),
104 hb_runtime: heartbeat
105 .unwrap_or_else(|| create_runtime("heartbeat", "hb-worker", HB_WORKERS)),
106 }
107 }
108}
109
110#[derive(Default)]
111struct ConfigRuntimes {
112 global_runtime: Option<Runtime>,
113 compact_runtime: Option<Runtime>,
114 hb_runtime: Option<Runtime>,
115 already_init: bool,
116}
117
118static GLOBAL_RUNTIMES: Lazy<GlobalRuntimes> = Lazy::new(|| {
119 let mut c = CONFIG_RUNTIMES.lock().unwrap();
120 let global = c.global_runtime.take();
121 let compact = c.compact_runtime.take();
122 let heartbeat = c.hb_runtime.take();
123 c.already_init = true;
124
125 GlobalRuntimes::new(global, compact, heartbeat)
126});
127
128static CONFIG_RUNTIMES: Lazy<Mutex<ConfigRuntimes>> =
129 Lazy::new(|| Mutex::new(ConfigRuntimes::default()));
130
131pub fn init_global_runtimes(options: &RuntimeOptions) {
137 static START: Once = Once::new();
138 START.call_once(move || {
139 let mut c = CONFIG_RUNTIMES.lock().unwrap();
140 assert!(!c.already_init, "Global runtimes already initialized");
141 c.global_runtime = Some(create_runtime(
142 "global",
143 "global-worker",
144 options.global_rt_size,
145 ));
146 c.compact_runtime = Some(create_runtime(
147 "compact",
148 "compact-worker",
149 options.compact_rt_size,
150 ));
151 c.hb_runtime = Some(create_runtime("hreartbeat", "hb-worker", HB_WORKERS));
152 });
153}
154
155macro_rules! define_global_runtime_spawn {
156 ($type: ident) => {
157 paste! {
158 #[doc = "Returns the global `" $type "` thread pool."]
159 pub fn [<$type _runtime>]() -> Runtime {
160 GLOBAL_RUNTIMES.[<$type _runtime>].clone()
161 }
162
163 #[doc = "Spawn a future and execute it in `" $type "` thread pool."]
164 pub fn [<spawn_ $type>]<F>(future: F) -> JoinHandle<F::Output>
165 where
166 F: Future + Send + 'static,
167 F::Output: Send + 'static,
168 {
169 GLOBAL_RUNTIMES.[<spawn_ $type>](future)
170 }
171
172 #[doc = "Run the blocking operation in `" $type "` thread pool."]
173 pub fn [<spawn_blocking_ $type>]<F, R>(future: F) -> JoinHandle<R>
174 where
175 F: FnOnce() -> R + Send + 'static,
176 R: Send + 'static,
177 {
178 GLOBAL_RUNTIMES.[<spawn_blocking_ $type>](future)
179 }
180
181 #[doc = "Run a future to complete in `" $type "` thread pool."]
182 pub fn [<block_on_ $type>]<F: Future>(future: F) -> F::Output {
183 GLOBAL_RUNTIMES.[<block_on_ $type>](future)
184 }
185 }
186 };
187}
188
189define_global_runtime_spawn!(global);
190define_global_runtime_spawn!(compact);
191define_global_runtime_spawn!(hb);
192
193#[cfg(test)]
194mod tests {
195 use tokio_test::assert_ok;
196
197 use super::*;
198
199 #[test]
200 fn test_spawn_block_on() {
201 let handle = spawn_global(async { 1 + 1 });
202 assert_eq!(2, block_on_global(handle).unwrap());
203
204 let handle = spawn_compact(async { 2 + 2 });
205 assert_eq!(4, block_on_compact(handle).unwrap());
206
207 let handle = spawn_hb(async { 4 + 4 });
208 assert_eq!(8, block_on_hb(handle).unwrap());
209 }
210
211 macro_rules! define_spawn_blocking_test {
212 ($type: ident) => {
213 paste! {
214 #[test]
215 fn [<test_spawn_ $type _from_blocking>]() {
216 let runtime = [<$type _runtime>]();
217 let out = runtime.block_on(async move {
218 let inner = assert_ok!(
219 [<spawn_blocking_ $type>](move || {
220 [<spawn_ $type>](async move { "hello" })
221 }).await
222 );
223
224 assert_ok!(inner.await)
225 });
226
227 assert_eq!(out, "hello")
228 }
229 }
230 };
231 }
232
233 define_spawn_blocking_test!(global);
234 define_spawn_blocking_test!(compact);
235 define_spawn_blocking_test!(hb);
236}