common_runtime/
global.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Global runtimes
16use std::future::Future;
17use std::sync::{Mutex, Once};
18
19use common_telemetry::info;
20use once_cell::sync::Lazy;
21use paste::paste;
22use serde::{Deserialize, Serialize};
23
24use crate::runtime::{BuilderBuild, RuntimeTrait};
25use crate::{Builder, JoinHandle, Runtime};
26
27const GLOBAL_WORKERS: usize = 8;
28const COMPACT_WORKERS: usize = 4;
29const HB_WORKERS: usize = 2;
30
31/// The options for the global runtimes.
32#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
33pub struct RuntimeOptions {
34    /// The number of threads for the global default runtime.
35    pub global_rt_size: usize,
36    /// The number of threads to execute the runtime for compact operations.
37    pub compact_rt_size: usize,
38}
39
40impl Default for RuntimeOptions {
41    fn default() -> Self {
42        let cpus = num_cpus::get();
43        Self {
44            global_rt_size: cpus,
45            compact_rt_size: usize::max(cpus / 2, 1),
46        }
47    }
48}
49
50pub fn create_runtime(runtime_name: &str, thread_name: &str, worker_threads: usize) -> Runtime {
51    info!("Creating runtime with runtime_name: {runtime_name}, thread_name: {thread_name}, work_threads: {worker_threads}.");
52    Builder::default()
53        .runtime_name(runtime_name)
54        .thread_name(thread_name)
55        .worker_threads(worker_threads)
56        .build()
57        .expect("Fail to create runtime")
58}
59
60struct GlobalRuntimes {
61    global_runtime: Runtime,
62    compact_runtime: Runtime,
63    hb_runtime: Runtime,
64}
65
66macro_rules! define_spawn {
67    ($type: ident) => {
68        paste! {
69
70            fn [<spawn_ $type>]<F>(&self, future: F) -> JoinHandle<F::Output>
71            where
72                F: Future + Send + 'static,
73                F::Output: Send + 'static,
74            {
75                self.[<$type _runtime>].spawn(future)
76            }
77
78            fn [<spawn_blocking_ $type>]<F, R>(&self, future: F) ->  JoinHandle<R>
79            where
80                F: FnOnce() -> R + Send + 'static,
81                R: Send + 'static,
82            {
83                self.[<$type _runtime>].spawn_blocking(future)
84            }
85
86            fn [<block_on_ $type>]<F: Future>(&self, future: F) -> F::Output {
87                self.[<$type _runtime>].block_on(future)
88            }
89        }
90    };
91}
92
93impl GlobalRuntimes {
94    define_spawn!(global);
95    define_spawn!(compact);
96    define_spawn!(hb);
97
98    fn new(global: Option<Runtime>, compact: Option<Runtime>, heartbeat: Option<Runtime>) -> Self {
99        Self {
100            global_runtime: global
101                .unwrap_or_else(|| create_runtime("global", "global-worker", GLOBAL_WORKERS)),
102            compact_runtime: compact
103                .unwrap_or_else(|| create_runtime("compact", "compact-worker", COMPACT_WORKERS)),
104            hb_runtime: heartbeat
105                .unwrap_or_else(|| create_runtime("heartbeat", "hb-worker", HB_WORKERS)),
106        }
107    }
108}
109
110#[derive(Default)]
111struct ConfigRuntimes {
112    global_runtime: Option<Runtime>,
113    compact_runtime: Option<Runtime>,
114    hb_runtime: Option<Runtime>,
115    already_init: bool,
116}
117
118static GLOBAL_RUNTIMES: Lazy<GlobalRuntimes> = Lazy::new(|| {
119    let mut c = CONFIG_RUNTIMES.lock().unwrap();
120    let global = c.global_runtime.take();
121    let compact = c.compact_runtime.take();
122    let heartbeat = c.hb_runtime.take();
123    c.already_init = true;
124
125    GlobalRuntimes::new(global, compact, heartbeat)
126});
127
128static CONFIG_RUNTIMES: Lazy<Mutex<ConfigRuntimes>> =
129    Lazy::new(|| Mutex::new(ConfigRuntimes::default()));
130
131/// Initialize the global runtimes
132///
133/// # Panics
134/// Panics when the global runtimes are already initialized.
135/// You should call this function before using any runtime functions.
136pub fn init_global_runtimes(options: &RuntimeOptions) {
137    static START: Once = Once::new();
138    START.call_once(move || {
139        let mut c = CONFIG_RUNTIMES.lock().unwrap();
140        assert!(!c.already_init, "Global runtimes already initialized");
141        c.global_runtime = Some(create_runtime(
142            "global",
143            "global-worker",
144            options.global_rt_size,
145        ));
146        c.compact_runtime = Some(create_runtime(
147            "compact",
148            "compact-worker",
149            options.compact_rt_size,
150        ));
151        c.hb_runtime = Some(create_runtime("hreartbeat", "hb-worker", HB_WORKERS));
152    });
153}
154
155macro_rules! define_global_runtime_spawn {
156    ($type: ident) => {
157        paste! {
158            #[doc = "Returns the global `" $type "` thread pool."]
159            pub fn [<$type _runtime>]() -> Runtime {
160                GLOBAL_RUNTIMES.[<$type _runtime>].clone()
161            }
162
163            #[doc = "Spawn a future and execute it in `" $type "` thread pool."]
164            pub fn [<spawn_ $type>]<F>(future: F) -> JoinHandle<F::Output>
165            where
166                F: Future + Send + 'static,
167                F::Output: Send + 'static,
168            {
169                GLOBAL_RUNTIMES.[<spawn_ $type>](future)
170            }
171
172            #[doc = "Run the blocking operation in `" $type "` thread pool."]
173            pub fn [<spawn_blocking_ $type>]<F, R>(future: F) ->  JoinHandle<R>
174            where
175                F: FnOnce() -> R + Send + 'static,
176                R: Send + 'static,
177            {
178                GLOBAL_RUNTIMES.[<spawn_blocking_ $type>](future)
179            }
180
181            #[doc = "Run a future to complete in `" $type "` thread pool."]
182            pub fn [<block_on_ $type>]<F: Future>(future: F) -> F::Output {
183                GLOBAL_RUNTIMES.[<block_on_ $type>](future)
184            }
185        }
186    };
187}
188
189define_global_runtime_spawn!(global);
190define_global_runtime_spawn!(compact);
191define_global_runtime_spawn!(hb);
192
193#[cfg(test)]
194mod tests {
195    use tokio_test::assert_ok;
196
197    use super::*;
198
199    #[test]
200    fn test_spawn_block_on() {
201        let handle = spawn_global(async { 1 + 1 });
202        assert_eq!(2, block_on_global(handle).unwrap());
203
204        let handle = spawn_compact(async { 2 + 2 });
205        assert_eq!(4, block_on_compact(handle).unwrap());
206
207        let handle = spawn_hb(async { 4 + 4 });
208        assert_eq!(8, block_on_hb(handle).unwrap());
209    }
210
211    macro_rules! define_spawn_blocking_test {
212        ($type: ident) => {
213            paste! {
214                #[test]
215                fn [<test_spawn_ $type _from_blocking>]() {
216                    let runtime = [<$type _runtime>]();
217                    let out = runtime.block_on(async move {
218                        let inner = assert_ok!(
219                            [<spawn_blocking_  $type>](move || {
220                                [<spawn_ $type>](async move { "hello" })
221                            }).await
222                        );
223
224                        assert_ok!(inner.await)
225                    });
226
227                    assert_eq!(out, "hello")
228                }
229            }
230        };
231    }
232
233    define_spawn_blocking_test!(global);
234    define_spawn_blocking_test!(compact);
235    define_spawn_blocking_test!(hb);
236}