common_memory_manager/
manager.rs1use std::sync::Arc;
16use std::sync::atomic::{AtomicU64, Ordering};
17
18use snafu::ensure;
19use tokio::sync::{OwnedSemaphorePermit, Semaphore, TryAcquireError};
20
21use crate::error::{
22 MemoryAcquireTimeoutSnafu, MemoryLimitExceededSnafu, MemorySemaphoreClosedSnafu, Result,
23};
24use crate::granularity::PermitGranularity;
25use crate::guard::MemoryGuard;
26use crate::policy::OnExhaustedPolicy;
27
28pub trait MemoryMetrics: Clone + Send + Sync + 'static {
30 fn set_limit(&self, bytes: i64);
31 fn set_in_use(&self, bytes: i64);
32 fn inc_exhausted(&self, reason: &str);
34}
35
36#[derive(Clone)]
38pub struct MemoryManager<M: MemoryMetrics> {
39 quota: MemoryQuotaState<M>,
40}
41
42impl<M: MemoryMetrics + Default> Default for MemoryManager<M> {
43 fn default() -> Self {
44 Self::new(0, M::default())
45 }
46}
47
48#[derive(Clone)]
49pub(crate) struct MemoryQuota<M: MemoryMetrics> {
50 pub(crate) semaphore: Arc<Semaphore>,
51 pub(crate) limit_permits: u32,
52 pub(crate) granularity: PermitGranularity,
53 pub(crate) metrics: M,
54}
55
56#[derive(Clone)]
57pub(crate) struct UnlimitedMemoryQuota<M: MemoryMetrics> {
58 pub(crate) current_bytes: Arc<AtomicU64>,
59 pub(crate) metrics: M,
60}
61
62#[derive(Clone)]
63pub(crate) enum MemoryQuotaState<M: MemoryMetrics> {
64 Unlimited(UnlimitedMemoryQuota<M>),
65 Limited(MemoryQuota<M>),
66}
67
68impl<M: MemoryMetrics> MemoryManager<M> {
69 pub fn new(limit_bytes: u64, metrics: M) -> Self {
72 Self::with_granularity(limit_bytes, PermitGranularity::default(), metrics)
73 }
74
75 pub fn with_granularity(limit_bytes: u64, granularity: PermitGranularity, metrics: M) -> Self {
77 if limit_bytes == 0 {
78 metrics.set_limit(0);
79 return Self {
80 quota: MemoryQuotaState::Unlimited(UnlimitedMemoryQuota {
81 current_bytes: Arc::new(AtomicU64::new(0)),
82 metrics,
83 }),
84 };
85 }
86
87 let limit_permits = granularity.bytes_to_permits(limit_bytes);
88 let limit_aligned_bytes = granularity.permits_to_bytes(limit_permits);
89 metrics.set_limit(limit_aligned_bytes as i64);
90
91 Self {
92 quota: MemoryQuotaState::Limited(MemoryQuota {
93 semaphore: Arc::new(Semaphore::new(limit_permits as usize)),
94 limit_permits,
95 granularity,
96 metrics,
97 }),
98 }
99 }
100
101 pub fn limit_bytes(&self) -> u64 {
103 match &self.quota {
104 MemoryQuotaState::Unlimited(_) => 0,
105 MemoryQuotaState::Limited(quota) => quota.permits_to_bytes(quota.limit_permits),
106 }
107 }
108
109 pub fn used_bytes(&self) -> u64 {
111 match &self.quota {
112 MemoryQuotaState::Unlimited(quota) => quota.current_bytes.load(Ordering::Acquire),
113 MemoryQuotaState::Limited(quota) => quota.permits_to_bytes(quota.used_permits()),
114 }
115 }
116
117 pub fn available_bytes(&self) -> u64 {
121 match &self.quota {
122 MemoryQuotaState::Unlimited(_) => u64::MAX,
123 MemoryQuotaState::Limited(quota) => {
124 quota.permits_to_bytes(quota.available_permits_clamped())
125 }
126 }
127 }
128
129 pub async fn acquire(&self, bytes: u64) -> Result<MemoryGuard<M>> {
135 match &self.quota {
136 MemoryQuotaState::Unlimited(quota) => Ok(MemoryGuard::unlimited(quota.clone(), bytes)),
137 MemoryQuotaState::Limited(quota) => {
138 let permits = quota.bytes_to_permits(bytes);
139
140 ensure!(
141 permits <= quota.limit_permits,
142 MemoryLimitExceededSnafu {
143 requested_bytes: bytes,
144 limit_bytes: self.limit_bytes()
145 }
146 );
147
148 let permit = quota
149 .semaphore
150 .clone()
151 .acquire_many_owned(permits)
152 .await
153 .map_err(|_| MemorySemaphoreClosedSnafu.build())?;
154 quota.update_in_use_metric();
155 Ok(MemoryGuard::limited(quota.clone(), permit))
156 }
157 }
158 }
159
160 pub fn try_acquire(&self, bytes: u64) -> Option<MemoryGuard<M>> {
162 match &self.quota {
163 MemoryQuotaState::Unlimited(quota) => {
164 Some(MemoryGuard::unlimited(quota.clone(), bytes))
165 }
166 MemoryQuotaState::Limited(quota) => {
167 let permits = quota.bytes_to_permits(bytes);
168
169 match quota.semaphore.clone().try_acquire_many_owned(permits) {
170 Ok(permit) => {
171 quota.update_in_use_metric();
172 Some(MemoryGuard::limited(quota.clone(), permit))
173 }
174 Err(TryAcquireError::NoPermits) | Err(TryAcquireError::Closed) => {
175 quota.metrics.inc_exhausted("try_acquire");
176 None
177 }
178 }
179 }
180 }
181 }
182
183 pub async fn acquire_with_policy(
193 &self,
194 bytes: u64,
195 policy: OnExhaustedPolicy,
196 ) -> Result<MemoryGuard<M>> {
197 match policy {
198 OnExhaustedPolicy::Wait { timeout } => {
199 match tokio::time::timeout(timeout, self.acquire(bytes)).await {
200 Ok(Ok(guard)) => Ok(guard),
201 Ok(Err(e)) => Err(e),
202 Err(_elapsed) => {
203 MemoryAcquireTimeoutSnafu {
205 requested_bytes: bytes,
206 waited: timeout,
207 }
208 .fail()
209 }
210 }
211 }
212 OnExhaustedPolicy::Fail => self.try_acquire(bytes).ok_or_else(|| {
213 MemoryLimitExceededSnafu {
214 requested_bytes: bytes,
215 limit_bytes: self.limit_bytes(),
216 }
217 .build()
218 }),
219 }
220 }
221}
222
223impl<M: MemoryMetrics> MemoryQuota<M> {
224 pub(crate) fn bytes_to_permits(&self, bytes: u64) -> u32 {
225 self.granularity.bytes_to_permits(bytes)
226 }
227
228 pub(crate) fn permits_to_bytes(&self, permits: u32) -> u64 {
229 self.granularity.permits_to_bytes(permits)
230 }
231
232 pub(crate) fn used_permits(&self) -> u32 {
233 self.limit_permits
234 .saturating_sub(self.available_permits_clamped())
235 }
236
237 pub(crate) fn available_permits_clamped(&self) -> u32 {
238 self.semaphore
239 .available_permits()
240 .min(self.limit_permits as usize) as u32
241 }
242
243 pub(crate) fn update_in_use_metric(&self) {
244 let bytes = self.permits_to_bytes(self.used_permits());
245 self.metrics.set_in_use(bytes as i64);
246 }
247
248 pub(crate) fn release_permit(&self, permit: OwnedSemaphorePermit) {
249 drop(permit);
250 self.update_in_use_metric();
251 }
252}
253
254impl<M: MemoryMetrics> UnlimitedMemoryQuota<M> {
255 pub(crate) fn add_in_use(&self, bytes: u64) {
256 if bytes == 0 {
257 return;
258 }
259
260 let previous = self
261 .current_bytes
262 .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
263 Some(current.saturating_add(bytes))
264 })
265 .unwrap();
266 let new_total = previous.saturating_add(bytes);
267 debug_assert!(
268 new_total >= previous,
269 "unlimited memory usage counter overflowed"
270 );
271 self.metrics.set_in_use(new_total as i64);
272 }
273
274 pub(crate) fn sub_in_use(&self, bytes: u64) {
275 if bytes == 0 {
276 return;
277 }
278
279 let previous = self
280 .current_bytes
281 .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
282 Some(current.saturating_sub(bytes))
283 })
284 .unwrap();
285 debug_assert!(
286 previous >= bytes,
287 "unlimited memory usage counter underflowed: current={previous}, release={bytes}"
288 );
289 let new_total = previous.saturating_sub(bytes);
290 self.metrics.set_in_use(new_total as i64);
291 }
292}