1use std::future::Future;
17use std::sync::{Mutex, Once};
18
19use common_telemetry::info;
20use once_cell::sync::Lazy;
21use paste::paste;
22use serde::{Deserialize, Serialize};
23
24use crate::runtime::{BuilderBuild, RuntimeTrait};
25use crate::{Builder, JoinHandle, Runtime};
26
27const GLOBAL_WORKERS: usize = 8;
28const COMPACT_WORKERS: usize = 4;
29const HB_WORKERS: usize = 2;
30
31#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
33#[serde(default)]
34pub struct RuntimeOptions {
35 pub global_rt_size: usize,
37 pub compact_rt_size: usize,
39 pub query_rt_size: usize,
41 pub ingest_rt_size: usize,
43}
44
45impl Default for RuntimeOptions {
46 fn default() -> Self {
47 let cpus = num_cpus::get();
48 Self {
49 global_rt_size: cpus,
50 compact_rt_size: usize::max(cpus / 2, 1),
51 query_rt_size: usize::max(cpus.saturating_sub(1), 1),
52 ingest_rt_size: cpus,
53 }
54 }
55}
56
57pub fn create_runtime(runtime_name: &str, thread_name: &str, worker_threads: usize) -> Runtime {
58 info!(
59 "Creating runtime with runtime_name: {runtime_name}, thread_name: {thread_name}, work_threads: {worker_threads}."
60 );
61 Builder::default()
62 .runtime_name(runtime_name)
63 .thread_name(thread_name)
64 .worker_threads(worker_threads)
65 .build()
66 .expect("Fail to create runtime")
67}
68
69struct GlobalRuntimes {
70 global_runtime: Runtime,
71 compact_runtime: Runtime,
72 hb_runtime: Runtime,
73 query_runtime: Runtime,
74 ingest_runtime: Runtime,
75}
76
77macro_rules! define_spawn {
78 ($type: ident) => {
79 paste! {
80
81 fn [<spawn_ $type>]<F>(&self, future: F) -> JoinHandle<F::Output>
82 where
83 F: Future + Send + 'static,
84 F::Output: Send + 'static,
85 {
86 self.[<$type _runtime>].spawn(future)
87 }
88
89 fn [<spawn_blocking_ $type>]<F, R>(&self, future: F) -> JoinHandle<R>
90 where
91 F: FnOnce() -> R + Send + 'static,
92 R: Send + 'static,
93 {
94 self.[<$type _runtime>].spawn_blocking(future)
95 }
96
97 fn [<block_on_ $type>]<F: Future>(&self, future: F) -> F::Output {
98 self.[<$type _runtime>].block_on(future)
99 }
100 }
101 };
102}
103
104impl GlobalRuntimes {
105 define_spawn!(global);
106 define_spawn!(compact);
107 define_spawn!(hb);
108 define_spawn!(query);
109 define_spawn!(ingest);
110
111 fn new(
112 global: Option<Runtime>,
113 compact: Option<Runtime>,
114 heartbeat: Option<Runtime>,
115 query: Option<Runtime>,
116 ingest: Option<Runtime>,
117 ) -> Self {
118 let global_runtime =
119 global.unwrap_or_else(|| create_runtime("global", "global-worker", GLOBAL_WORKERS));
120 let query_runtime = query.unwrap_or_else(|| global_runtime.clone());
121 let ingest_runtime = ingest.unwrap_or_else(|| global_runtime.clone());
122
123 Self {
124 global_runtime,
125 compact_runtime: compact
126 .unwrap_or_else(|| create_runtime("compact", "compact-worker", COMPACT_WORKERS)),
127 hb_runtime: heartbeat
128 .unwrap_or_else(|| create_runtime("heartbeat", "hb-worker", HB_WORKERS)),
129 query_runtime,
130 ingest_runtime,
131 }
132 }
133}
134
135#[derive(Default)]
136struct ConfigRuntimes {
137 global_runtime: Option<Runtime>,
138 compact_runtime: Option<Runtime>,
139 hb_runtime: Option<Runtime>,
140 query_runtime: Option<Runtime>,
141 ingest_runtime: Option<Runtime>,
142 already_init: bool,
143}
144
145static GLOBAL_RUNTIMES: Lazy<GlobalRuntimes> = Lazy::new(|| {
146 let mut c = CONFIG_RUNTIMES.lock().unwrap();
147 let global = c.global_runtime.take();
148 let compact = c.compact_runtime.take();
149 let heartbeat = c.hb_runtime.take();
150 let query = c.query_runtime.take();
151 let ingest = c.ingest_runtime.take();
152 c.already_init = true;
153
154 GlobalRuntimes::new(global, compact, heartbeat, query, ingest)
155});
156
157static CONFIG_RUNTIMES: Lazy<Mutex<ConfigRuntimes>> =
158 Lazy::new(|| Mutex::new(ConfigRuntimes::default()));
159
160pub fn init_global_runtimes(options: &RuntimeOptions) {
166 static START: Once = Once::new();
167 START.call_once(move || {
168 let mut c = CONFIG_RUNTIMES.lock().unwrap();
169 assert!(!c.already_init, "Global runtimes already initialized");
170 c.global_runtime = Some(create_runtime(
171 "global",
172 "global-worker",
173 options.global_rt_size,
174 ));
175 c.compact_runtime = Some(create_runtime(
176 "compact",
177 "compact-worker",
178 options.compact_rt_size,
179 ));
180 c.hb_runtime = Some(create_runtime("heartbeat", "hb-worker", HB_WORKERS));
181 });
182}
183
184pub fn init_datanode_runtimes(options: &RuntimeOptions) {
190 static START: Once = Once::new();
191 START.call_once(move || {
192 let mut c = CONFIG_RUNTIMES.lock().unwrap();
193 assert!(!c.already_init, "Global runtimes already initialized");
194 c.query_runtime = Some(create_runtime(
195 "query",
196 "query-worker",
197 options.query_rt_size,
198 ));
199 c.ingest_runtime = Some(create_runtime(
200 "ingest",
201 "ingest-worker",
202 options.ingest_rt_size,
203 ));
204 });
205}
206
207macro_rules! define_global_runtime_spawn {
208 ($type: ident) => {
209 paste! {
210 #[doc = "Returns the global `" $type "` thread pool."]
211 pub fn [<$type _runtime>]() -> Runtime {
212 GLOBAL_RUNTIMES.[<$type _runtime>].clone()
213 }
214
215 #[doc = "Spawn a future and execute it in `" $type "` thread pool."]
216 pub fn [<spawn_ $type>]<F>(future: F) -> JoinHandle<F::Output>
217 where
218 F: Future + Send + 'static,
219 F::Output: Send + 'static,
220 {
221 GLOBAL_RUNTIMES.[<spawn_ $type>](future)
222 }
223
224 #[doc = "Run the blocking operation in `" $type "` thread pool."]
225 pub fn [<spawn_blocking_ $type>]<F, R>(future: F) -> JoinHandle<R>
226 where
227 F: FnOnce() -> R + Send + 'static,
228 R: Send + 'static,
229 {
230 GLOBAL_RUNTIMES.[<spawn_blocking_ $type>](future)
231 }
232
233 #[doc = "Run a future to complete in `" $type "` thread pool."]
234 pub fn [<block_on_ $type>]<F: Future>(future: F) -> F::Output {
235 GLOBAL_RUNTIMES.[<block_on_ $type>](future)
236 }
237 }
238 };
239}
240
241define_global_runtime_spawn!(global);
242define_global_runtime_spawn!(compact);
243define_global_runtime_spawn!(hb);
244define_global_runtime_spawn!(query);
245define_global_runtime_spawn!(ingest);
246
247#[cfg(test)]
248mod tests {
249 use tokio_test::assert_ok;
250
251 use super::*;
252
253 #[test]
254 fn test_datanode_runtime_options_default() {
255 let options = RuntimeOptions::default();
256 let cpus = num_cpus::get();
257
258 assert_eq!(cpus, options.global_rt_size);
259 assert_eq!(usize::max(cpus / 2, 1), options.compact_rt_size);
260 assert_eq!(usize::max(cpus.saturating_sub(1), 1), options.query_rt_size);
261 assert_eq!(cpus, options.ingest_rt_size);
262 }
263
264 #[test]
265 fn test_datanode_runtimes_fallback_to_global_runtime() {
266 let runtimes = GlobalRuntimes::new(
267 Some(create_runtime("test-global", "test-global-worker", 1)),
268 None,
269 None,
270 None,
271 None,
272 );
273
274 assert_eq!("test-global", runtimes.global_runtime.name());
275 assert_eq!("test-global", runtimes.query_runtime.name());
276 assert_eq!("test-global", runtimes.ingest_runtime.name());
277 }
278
279 #[test]
280 fn test_datanode_runtime_spawn_block_on() {
281 let handle = spawn_query(async { 1 + 1 });
282 assert_eq!(2, block_on_query(handle).unwrap());
283
284 let handle = spawn_ingest(async { 2 + 2 });
285 assert_eq!(4, block_on_ingest(handle).unwrap());
286 }
287
288 #[test]
289 fn test_spawn_block_on() {
290 let handle = spawn_global(async { 1 + 1 });
291 assert_eq!(2, block_on_global(handle).unwrap());
292
293 let handle = spawn_compact(async { 2 + 2 });
294 assert_eq!(4, block_on_compact(handle).unwrap());
295
296 let handle = spawn_hb(async { 4 + 4 });
297 assert_eq!(8, block_on_hb(handle).unwrap());
298 }
299
300 macro_rules! define_spawn_blocking_test {
301 ($type: ident) => {
302 paste! {
303 #[test]
304 fn [<test_spawn_ $type _from_blocking>]() {
305 let runtime = [<$type _runtime>]();
306 let out = runtime.block_on(async move {
307 let inner = assert_ok!(
308 [<spawn_blocking_ $type>](move || {
309 [<spawn_ $type>](async move { "hello" })
310 }).await
311 );
312
313 assert_ok!(inner.await)
314 });
315
316 assert_eq!(out, "hello")
317 }
318 }
319 };
320 }
321
322 define_spawn_blocking_test!(global);
323 define_spawn_blocking_test!(compact);
324 define_spawn_blocking_test!(hb);
325}