common_runtime/
global.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Global runtimes
16use std::future::Future;
17use std::sync::{Mutex, Once};
18
19use common_telemetry::info;
20use once_cell::sync::Lazy;
21use paste::paste;
22use serde::{Deserialize, Serialize};
23
24use crate::runtime::{BuilderBuild, RuntimeTrait};
25use crate::{Builder, JoinHandle, Runtime};
26
27const GLOBAL_WORKERS: usize = 8;
28const COMPACT_WORKERS: usize = 4;
29const HB_WORKERS: usize = 2;
30
31/// The options for the global runtimes.
32#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
33pub struct RuntimeOptions {
34    /// The number of threads for the global default runtime.
35    pub global_rt_size: usize,
36    /// The number of threads to execute the runtime for compact operations.
37    pub compact_rt_size: usize,
38}
39
40impl Default for RuntimeOptions {
41    fn default() -> Self {
42        let cpus = num_cpus::get();
43        Self {
44            global_rt_size: cpus,
45            compact_rt_size: usize::max(cpus / 2, 1),
46        }
47    }
48}
49
50pub fn create_runtime(runtime_name: &str, thread_name: &str, worker_threads: usize) -> Runtime {
51    info!(
52        "Creating runtime with runtime_name: {runtime_name}, thread_name: {thread_name}, work_threads: {worker_threads}."
53    );
54    Builder::default()
55        .runtime_name(runtime_name)
56        .thread_name(thread_name)
57        .worker_threads(worker_threads)
58        .build()
59        .expect("Fail to create runtime")
60}
61
62struct GlobalRuntimes {
63    global_runtime: Runtime,
64    compact_runtime: Runtime,
65    hb_runtime: Runtime,
66}
67
68macro_rules! define_spawn {
69    ($type: ident) => {
70        paste! {
71
72            fn [<spawn_ $type>]<F>(&self, future: F) -> JoinHandle<F::Output>
73            where
74                F: Future + Send + 'static,
75                F::Output: Send + 'static,
76            {
77                self.[<$type _runtime>].spawn(future)
78            }
79
80            fn [<spawn_blocking_ $type>]<F, R>(&self, future: F) ->  JoinHandle<R>
81            where
82                F: FnOnce() -> R + Send + 'static,
83                R: Send + 'static,
84            {
85                self.[<$type _runtime>].spawn_blocking(future)
86            }
87
88            fn [<block_on_ $type>]<F: Future>(&self, future: F) -> F::Output {
89                self.[<$type _runtime>].block_on(future)
90            }
91        }
92    };
93}
94
95impl GlobalRuntimes {
96    define_spawn!(global);
97    define_spawn!(compact);
98    define_spawn!(hb);
99
100    fn new(global: Option<Runtime>, compact: Option<Runtime>, heartbeat: Option<Runtime>) -> Self {
101        Self {
102            global_runtime: global
103                .unwrap_or_else(|| create_runtime("global", "global-worker", GLOBAL_WORKERS)),
104            compact_runtime: compact
105                .unwrap_or_else(|| create_runtime("compact", "compact-worker", COMPACT_WORKERS)),
106            hb_runtime: heartbeat
107                .unwrap_or_else(|| create_runtime("heartbeat", "hb-worker", HB_WORKERS)),
108        }
109    }
110}
111
112#[derive(Default)]
113struct ConfigRuntimes {
114    global_runtime: Option<Runtime>,
115    compact_runtime: Option<Runtime>,
116    hb_runtime: Option<Runtime>,
117    already_init: bool,
118}
119
120static GLOBAL_RUNTIMES: Lazy<GlobalRuntimes> = Lazy::new(|| {
121    let mut c = CONFIG_RUNTIMES.lock().unwrap();
122    let global = c.global_runtime.take();
123    let compact = c.compact_runtime.take();
124    let heartbeat = c.hb_runtime.take();
125    c.already_init = true;
126
127    GlobalRuntimes::new(global, compact, heartbeat)
128});
129
130static CONFIG_RUNTIMES: Lazy<Mutex<ConfigRuntimes>> =
131    Lazy::new(|| Mutex::new(ConfigRuntimes::default()));
132
133/// Initialize the global runtimes
134///
135/// # Panics
136/// Panics when the global runtimes are already initialized.
137/// You should call this function before using any runtime functions.
138pub fn init_global_runtimes(options: &RuntimeOptions) {
139    static START: Once = Once::new();
140    START.call_once(move || {
141        let mut c = CONFIG_RUNTIMES.lock().unwrap();
142        assert!(!c.already_init, "Global runtimes already initialized");
143        c.global_runtime = Some(create_runtime(
144            "global",
145            "global-worker",
146            options.global_rt_size,
147        ));
148        c.compact_runtime = Some(create_runtime(
149            "compact",
150            "compact-worker",
151            options.compact_rt_size,
152        ));
153        c.hb_runtime = Some(create_runtime("hreartbeat", "hb-worker", HB_WORKERS));
154    });
155}
156
157macro_rules! define_global_runtime_spawn {
158    ($type: ident) => {
159        paste! {
160            #[doc = "Returns the global `" $type "` thread pool."]
161            pub fn [<$type _runtime>]() -> Runtime {
162                GLOBAL_RUNTIMES.[<$type _runtime>].clone()
163            }
164
165            #[doc = "Spawn a future and execute it in `" $type "` thread pool."]
166            pub fn [<spawn_ $type>]<F>(future: F) -> JoinHandle<F::Output>
167            where
168                F: Future + Send + 'static,
169                F::Output: Send + 'static,
170            {
171                GLOBAL_RUNTIMES.[<spawn_ $type>](future)
172            }
173
174            #[doc = "Run the blocking operation in `" $type "` thread pool."]
175            pub fn [<spawn_blocking_ $type>]<F, R>(future: F) ->  JoinHandle<R>
176            where
177                F: FnOnce() -> R + Send + 'static,
178                R: Send + 'static,
179            {
180                GLOBAL_RUNTIMES.[<spawn_blocking_ $type>](future)
181            }
182
183            #[doc = "Run a future to complete in `" $type "` thread pool."]
184            pub fn [<block_on_ $type>]<F: Future>(future: F) -> F::Output {
185                GLOBAL_RUNTIMES.[<block_on_ $type>](future)
186            }
187        }
188    };
189}
190
191define_global_runtime_spawn!(global);
192define_global_runtime_spawn!(compact);
193define_global_runtime_spawn!(hb);
194
195#[cfg(test)]
196mod tests {
197    use tokio_test::assert_ok;
198
199    use super::*;
200
201    #[test]
202    fn test_spawn_block_on() {
203        let handle = spawn_global(async { 1 + 1 });
204        assert_eq!(2, block_on_global(handle).unwrap());
205
206        let handle = spawn_compact(async { 2 + 2 });
207        assert_eq!(4, block_on_compact(handle).unwrap());
208
209        let handle = spawn_hb(async { 4 + 4 });
210        assert_eq!(8, block_on_hb(handle).unwrap());
211    }
212
213    macro_rules! define_spawn_blocking_test {
214        ($type: ident) => {
215            paste! {
216                #[test]
217                fn [<test_spawn_ $type _from_blocking>]() {
218                    let runtime = [<$type _runtime>]();
219                    let out = runtime.block_on(async move {
220                        let inner = assert_ok!(
221                            [<spawn_blocking_  $type>](move || {
222                                [<spawn_ $type>](async move { "hello" })
223                            }).await
224                        );
225
226                        assert_ok!(inner.await)
227                    });
228
229                    assert_eq!(out, "hello")
230                }
231            }
232        };
233    }
234
235    define_spawn_blocking_test!(global);
236    define_spawn_blocking_test!(compact);
237    define_spawn_blocking_test!(hb);
238}