1use std::sync::Arc;
16use std::sync::atomic::{AtomicI64, Ordering};
17use std::time::Duration;
18
19use common_base::readable_size::ReadableSize;
20use common_runtime::JoinHandle;
21use common_telemetry::info;
22use sysinfo::System;
23use tokio::time::sleep;
24
25use crate::cgroups::calculate_cpu_usage;
26use crate::{
27 get_cpu_limit_from_cgroups, get_cpu_usage_from_cgroups, get_memory_limit_from_cgroups,
28 get_memory_usage_from_cgroups,
29};
30
31pub fn get_total_cpu_millicores() -> i64 {
33 if let Some(cgroup_cpu_limit) = get_cpu_limit_from_cgroups() {
35 cgroup_cpu_limit
36 } else {
37 num_cpus::get() as i64 * 1000
39 }
40}
41
42pub fn get_total_memory_bytes() -> i64 {
45 if let Some(cgroup_memory_limit) = get_memory_limit_from_cgroups() {
47 cgroup_memory_limit
48 } else {
49 if sysinfo::IS_SUPPORTED_SYSTEM {
51 let mut sys_info = System::new();
52 sys_info.refresh_memory();
53 sys_info.total_memory() as i64
54 } else {
55 0
57 }
58 }
59}
60
61pub fn get_total_cpu_cores() -> usize {
64 ((get_total_cpu_millicores() as f64) / 1000.0).round() as usize
65}
66
67pub fn get_total_memory_readable() -> Option<ReadableSize> {
69 if get_total_memory_bytes() > 0 {
70 Some(ReadableSize(get_total_memory_bytes() as u64))
71 } else {
72 None
73 }
74}
75
76pub type ResourceStatRef = Arc<dyn ResourceStat + Send + Sync>;
78
79pub trait ResourceStat {
81 fn get_total_cpu_millicores(&self) -> i64;
83 fn get_total_memory_bytes(&self) -> i64;
85 fn get_cpu_usage_millicores(&self) -> i64;
87 fn get_memory_usage_bytes(&self) -> i64;
89}
90
91pub struct ResourceStatImpl {
93 cpu_usage_millicores: Arc<AtomicI64>,
94 last_cpu_usage_usecs: Arc<AtomicI64>,
95 calculate_interval: Duration,
96 handler: Option<JoinHandle<()>>,
97}
98
99impl Default for ResourceStatImpl {
100 fn default() -> Self {
101 Self {
102 cpu_usage_millicores: Arc::new(AtomicI64::new(0)),
103 last_cpu_usage_usecs: Arc::new(AtomicI64::new(0)),
104 calculate_interval: Duration::from_secs(5),
105 handler: None,
106 }
107 }
108}
109
110impl ResourceStatImpl {
111 pub fn start_collect_cpu_usage(&mut self) {
114 if self.handler.is_some() {
115 return;
116 }
117
118 let cpu_usage_millicores = self.cpu_usage_millicores.clone();
119 let last_cpu_usage_usecs = self.last_cpu_usage_usecs.clone();
120 let calculate_interval = self.calculate_interval;
121
122 let handler = common_runtime::spawn_global(async move {
123 info!(
124 "Starting to collect CPU usage periodically for every {} seconds",
125 calculate_interval.as_secs()
126 );
127 loop {
128 let current_cpu_usage_usecs = get_cpu_usage_from_cgroups();
129 if let Some(current_cpu_usage_usecs) = current_cpu_usage_usecs {
130 if last_cpu_usage_usecs.load(Ordering::Relaxed) == 0 {
132 last_cpu_usage_usecs.store(current_cpu_usage_usecs, Ordering::Relaxed);
133 continue;
134 }
135 let cpu_usage = calculate_cpu_usage(
136 current_cpu_usage_usecs,
137 last_cpu_usage_usecs.load(Ordering::Relaxed),
138 calculate_interval.as_millis() as i64,
139 );
140 cpu_usage_millicores.store(cpu_usage, Ordering::Relaxed);
141 last_cpu_usage_usecs.store(current_cpu_usage_usecs, Ordering::Relaxed);
142 }
143 sleep(calculate_interval).await;
144 }
145 });
146
147 self.handler = Some(handler);
148 }
149}
150
151impl ResourceStat for ResourceStatImpl {
152 fn get_total_cpu_millicores(&self) -> i64 {
154 get_total_cpu_millicores()
155 }
156
157 fn get_total_memory_bytes(&self) -> i64 {
159 get_total_memory_bytes()
160 }
161
162 fn get_cpu_usage_millicores(&self) -> i64 {
164 self.cpu_usage_millicores.load(Ordering::Relaxed)
165 }
166
167 fn get_memory_usage_bytes(&self) -> i64 {
170 get_memory_usage_from_cgroups().unwrap_or_default()
171 }
172}
173
174#[cfg(test)]
175mod tests {
176 use super::*;
177
178 #[test]
179 fn test_get_total_cpu_cores() {
180 assert!(get_total_cpu_cores() > 0);
181 }
182
183 #[test]
184 fn test_get_total_memory_readable() {
185 assert!(get_total_memory_readable().unwrap() > ReadableSize::mb(0));
186 }
187}