1use std::sync::Arc;
16use std::sync::atomic::{AtomicI64, Ordering};
17use std::time::Duration;
18
19use common_base::readable_size::ReadableSize;
20use common_runtime::JoinHandle;
21use common_telemetry::info;
22use sysinfo::System;
23use tokio::time::sleep;
24
25use crate::cgroups::calculate_cpu_usage;
26use crate::{
27 get_cpu_limit_from_cgroups, get_cpu_usage_from_cgroups, get_memory_limit_from_cgroups,
28 get_memory_usage_from_cgroups,
29};
30
31pub fn get_total_cpu_millicores() -> i64 {
33 if let Some(cgroup_cpu_limit) = get_cpu_limit_from_cgroups() {
35 cgroup_cpu_limit
36 } else {
37 num_cpus::get() as i64 * 1000
39 }
40}
41
42pub fn get_total_memory_bytes() -> i64 {
45 if let Some(cgroup_memory_limit) = get_memory_limit_from_cgroups() {
47 cgroup_memory_limit
48 } else {
49 if sysinfo::IS_SUPPORTED_SYSTEM {
51 let mut sys_info = System::new();
52 sys_info.refresh_memory();
53 sys_info.total_memory() as i64
54 } else {
55 0
57 }
58 }
59}
60
61pub fn get_total_cpu_cores() -> usize {
64 cpu_cores(get_total_cpu_millicores())
65}
66
67fn cpu_cores(cpu_millicores: i64) -> usize {
68 ((cpu_millicores as f64) / 1_000.0).ceil() as usize
69}
70
71pub fn get_total_memory_readable() -> Option<ReadableSize> {
73 if get_total_memory_bytes() > 0 {
74 Some(ReadableSize(get_total_memory_bytes() as u64))
75 } else {
76 None
77 }
78}
79
80pub type ResourceStatRef = Arc<dyn ResourceStat + Send + Sync>;
82
83pub trait ResourceStat {
85 fn get_total_cpu_millicores(&self) -> i64;
87 fn get_total_memory_bytes(&self) -> i64;
89 fn get_cpu_usage_millicores(&self) -> i64;
91 fn get_memory_usage_bytes(&self) -> i64;
93}
94
95pub struct ResourceStatImpl {
97 cpu_usage_millicores: Arc<AtomicI64>,
98 last_cpu_usage_usecs: Arc<AtomicI64>,
99 calculate_interval: Duration,
100 handler: Option<JoinHandle<()>>,
101}
102
103impl Default for ResourceStatImpl {
104 fn default() -> Self {
105 Self {
106 cpu_usage_millicores: Arc::new(AtomicI64::new(0)),
107 last_cpu_usage_usecs: Arc::new(AtomicI64::new(0)),
108 calculate_interval: Duration::from_secs(5),
109 handler: None,
110 }
111 }
112}
113
114impl ResourceStatImpl {
115 pub fn start_collect_cpu_usage(&mut self) {
118 if self.handler.is_some() {
119 return;
120 }
121
122 let cpu_usage_millicores = self.cpu_usage_millicores.clone();
123 let last_cpu_usage_usecs = self.last_cpu_usage_usecs.clone();
124 let calculate_interval = self.calculate_interval;
125
126 let handler = common_runtime::spawn_global(async move {
127 info!(
128 "Starting to collect CPU usage periodically for every {} seconds",
129 calculate_interval.as_secs()
130 );
131 loop {
132 let current_cpu_usage_usecs = get_cpu_usage_from_cgroups();
133 if let Some(current_cpu_usage_usecs) = current_cpu_usage_usecs {
134 if last_cpu_usage_usecs.load(Ordering::Relaxed) == 0 {
136 last_cpu_usage_usecs.store(current_cpu_usage_usecs, Ordering::Relaxed);
137 continue;
138 }
139 let cpu_usage = calculate_cpu_usage(
140 current_cpu_usage_usecs,
141 last_cpu_usage_usecs.load(Ordering::Relaxed),
142 calculate_interval.as_millis() as i64,
143 );
144 cpu_usage_millicores.store(cpu_usage, Ordering::Relaxed);
145 last_cpu_usage_usecs.store(current_cpu_usage_usecs, Ordering::Relaxed);
146 }
147 sleep(calculate_interval).await;
148 }
149 });
150
151 self.handler = Some(handler);
152 }
153}
154
155impl ResourceStat for ResourceStatImpl {
156 fn get_total_cpu_millicores(&self) -> i64 {
158 get_total_cpu_millicores()
159 }
160
161 fn get_total_memory_bytes(&self) -> i64 {
163 get_total_memory_bytes()
164 }
165
166 fn get_cpu_usage_millicores(&self) -> i64 {
168 self.cpu_usage_millicores.load(Ordering::Relaxed)
169 }
170
171 fn get_memory_usage_bytes(&self) -> i64 {
174 get_memory_usage_from_cgroups().unwrap_or_default()
175 }
176}
177
178#[cfg(test)]
179mod tests {
180 use super::*;
181
182 #[test]
183 fn test_get_total_cpu_cores() {
184 assert!(get_total_cpu_cores() > 0);
185 assert_eq!(cpu_cores(1), 1);
186 assert_eq!(cpu_cores(100), 1);
187 assert_eq!(cpu_cores(500), 1);
188 assert_eq!(cpu_cores(1000), 1);
189 assert_eq!(cpu_cores(1100), 2);
190 assert_eq!(cpu_cores(1900), 2);
191 assert_eq!(cpu_cores(10_000), 10);
192 }
193
194 #[test]
195 fn test_get_total_memory_readable() {
196 assert!(get_total_memory_readable().unwrap() > ReadableSize::mb(0));
197 }
198}