1use std::future::Future;
17use std::sync::{Mutex, Once};
18
19use common_telemetry::info;
20use once_cell::sync::Lazy;
21use paste::paste;
22use serde::{Deserialize, Serialize};
23
24use crate::runtime::{BuilderBuild, RuntimeTrait};
25use crate::{Builder, JoinHandle, Runtime};
26
27const GLOBAL_WORKERS: usize = 8;
28const COMPACT_WORKERS: usize = 4;
29const HB_WORKERS: usize = 2;
30
31#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
33pub struct RuntimeOptions {
34 pub global_rt_size: usize,
36 pub compact_rt_size: usize,
38}
39
40impl Default for RuntimeOptions {
41 fn default() -> Self {
42 let cpus = num_cpus::get();
43 Self {
44 global_rt_size: cpus,
45 compact_rt_size: usize::max(cpus / 2, 1),
46 }
47 }
48}
49
50pub fn create_runtime(runtime_name: &str, thread_name: &str, worker_threads: usize) -> Runtime {
51 info!(
52 "Creating runtime with runtime_name: {runtime_name}, thread_name: {thread_name}, work_threads: {worker_threads}."
53 );
54 Builder::default()
55 .runtime_name(runtime_name)
56 .thread_name(thread_name)
57 .worker_threads(worker_threads)
58 .build()
59 .expect("Fail to create runtime")
60}
61
62struct GlobalRuntimes {
63 global_runtime: Runtime,
64 compact_runtime: Runtime,
65 hb_runtime: Runtime,
66}
67
68macro_rules! define_spawn {
69 ($type: ident) => {
70 paste! {
71
72 fn [<spawn_ $type>]<F>(&self, future: F) -> JoinHandle<F::Output>
73 where
74 F: Future + Send + 'static,
75 F::Output: Send + 'static,
76 {
77 self.[<$type _runtime>].spawn(future)
78 }
79
80 fn [<spawn_blocking_ $type>]<F, R>(&self, future: F) -> JoinHandle<R>
81 where
82 F: FnOnce() -> R + Send + 'static,
83 R: Send + 'static,
84 {
85 self.[<$type _runtime>].spawn_blocking(future)
86 }
87
88 fn [<block_on_ $type>]<F: Future>(&self, future: F) -> F::Output {
89 self.[<$type _runtime>].block_on(future)
90 }
91 }
92 };
93}
94
95impl GlobalRuntimes {
96 define_spawn!(global);
97 define_spawn!(compact);
98 define_spawn!(hb);
99
100 fn new(global: Option<Runtime>, compact: Option<Runtime>, heartbeat: Option<Runtime>) -> Self {
101 Self {
102 global_runtime: global
103 .unwrap_or_else(|| create_runtime("global", "global-worker", GLOBAL_WORKERS)),
104 compact_runtime: compact
105 .unwrap_or_else(|| create_runtime("compact", "compact-worker", COMPACT_WORKERS)),
106 hb_runtime: heartbeat
107 .unwrap_or_else(|| create_runtime("heartbeat", "hb-worker", HB_WORKERS)),
108 }
109 }
110}
111
112#[derive(Default)]
113struct ConfigRuntimes {
114 global_runtime: Option<Runtime>,
115 compact_runtime: Option<Runtime>,
116 hb_runtime: Option<Runtime>,
117 already_init: bool,
118}
119
120static GLOBAL_RUNTIMES: Lazy<GlobalRuntimes> = Lazy::new(|| {
121 let mut c = CONFIG_RUNTIMES.lock().unwrap();
122 let global = c.global_runtime.take();
123 let compact = c.compact_runtime.take();
124 let heartbeat = c.hb_runtime.take();
125 c.already_init = true;
126
127 GlobalRuntimes::new(global, compact, heartbeat)
128});
129
130static CONFIG_RUNTIMES: Lazy<Mutex<ConfigRuntimes>> =
131 Lazy::new(|| Mutex::new(ConfigRuntimes::default()));
132
133pub fn init_global_runtimes(options: &RuntimeOptions) {
139 static START: Once = Once::new();
140 START.call_once(move || {
141 let mut c = CONFIG_RUNTIMES.lock().unwrap();
142 assert!(!c.already_init, "Global runtimes already initialized");
143 c.global_runtime = Some(create_runtime(
144 "global",
145 "global-worker",
146 options.global_rt_size,
147 ));
148 c.compact_runtime = Some(create_runtime(
149 "compact",
150 "compact-worker",
151 options.compact_rt_size,
152 ));
153 c.hb_runtime = Some(create_runtime("hreartbeat", "hb-worker", HB_WORKERS));
154 });
155}
156
157macro_rules! define_global_runtime_spawn {
158 ($type: ident) => {
159 paste! {
160 #[doc = "Returns the global `" $type "` thread pool."]
161 pub fn [<$type _runtime>]() -> Runtime {
162 GLOBAL_RUNTIMES.[<$type _runtime>].clone()
163 }
164
165 #[doc = "Spawn a future and execute it in `" $type "` thread pool."]
166 pub fn [<spawn_ $type>]<F>(future: F) -> JoinHandle<F::Output>
167 where
168 F: Future + Send + 'static,
169 F::Output: Send + 'static,
170 {
171 GLOBAL_RUNTIMES.[<spawn_ $type>](future)
172 }
173
174 #[doc = "Run the blocking operation in `" $type "` thread pool."]
175 pub fn [<spawn_blocking_ $type>]<F, R>(future: F) -> JoinHandle<R>
176 where
177 F: FnOnce() -> R + Send + 'static,
178 R: Send + 'static,
179 {
180 GLOBAL_RUNTIMES.[<spawn_blocking_ $type>](future)
181 }
182
183 #[doc = "Run a future to complete in `" $type "` thread pool."]
184 pub fn [<block_on_ $type>]<F: Future>(future: F) -> F::Output {
185 GLOBAL_RUNTIMES.[<block_on_ $type>](future)
186 }
187 }
188 };
189}
190
191define_global_runtime_spawn!(global);
192define_global_runtime_spawn!(compact);
193define_global_runtime_spawn!(hb);
194
195#[cfg(test)]
196mod tests {
197 use tokio_test::assert_ok;
198
199 use super::*;
200
201 #[test]
202 fn test_spawn_block_on() {
203 let handle = spawn_global(async { 1 + 1 });
204 assert_eq!(2, block_on_global(handle).unwrap());
205
206 let handle = spawn_compact(async { 2 + 2 });
207 assert_eq!(4, block_on_compact(handle).unwrap());
208
209 let handle = spawn_hb(async { 4 + 4 });
210 assert_eq!(8, block_on_hb(handle).unwrap());
211 }
212
213 macro_rules! define_spawn_blocking_test {
214 ($type: ident) => {
215 paste! {
216 #[test]
217 fn [<test_spawn_ $type _from_blocking>]() {
218 let runtime = [<$type _runtime>]();
219 let out = runtime.block_on(async move {
220 let inner = assert_ok!(
221 [<spawn_blocking_ $type>](move || {
222 [<spawn_ $type>](async move { "hello" })
223 }).await
224 );
225
226 assert_ok!(inner.await)
227 });
228
229 assert_eq!(out, "hello")
230 }
231 }
232 };
233 }
234
235 define_spawn_blocking_test!(global);
236 define_spawn_blocking_test!(compact);
237 define_spawn_blocking_test!(hb);
238}