common_memory_manager/
guard.rs1use std::{fmt, mem};
16
17use snafu::ensure;
18use tokio::sync::{OwnedSemaphorePermit, TryAcquireError};
19
20use crate::error::{
21 MemoryAcquireTimeoutSnafu, MemoryLimitExceededSnafu, MemorySemaphoreClosedSnafu, Result,
22};
23use crate::manager::{MemoryMetrics, MemoryQuota, UnlimitedMemoryQuota};
24use crate::policy::OnExhaustedPolicy;
25
26pub struct MemoryGuard<M: MemoryMetrics> {
28 pub(crate) state: GuardState<M>,
29}
30
31pub(crate) enum GuardState<M: MemoryMetrics> {
32 Released,
33 Unlimited {
34 quota: UnlimitedMemoryQuota<M>,
35 granted_bytes: u64,
36 },
37 Limited {
38 quota: MemoryQuota<M>,
39 permit: OwnedSemaphorePermit,
40 },
41}
42
43impl<M: MemoryMetrics> GuardState<M> {
44 fn release(self) {
45 match self {
46 GuardState::Released => {}
47 GuardState::Unlimited {
48 quota,
49 granted_bytes,
50 } => {
51 quota.sub_in_use(granted_bytes);
52 }
53 GuardState::Limited { quota, permit } => {
54 quota.release_permit(permit);
55 }
56 }
57 }
58}
59
60impl<M: MemoryMetrics> MemoryGuard<M> {
61 pub(crate) fn unlimited(quota: UnlimitedMemoryQuota<M>, bytes: u64) -> Self {
62 quota.add_in_use(bytes);
63 Self {
64 state: GuardState::Unlimited {
65 quota,
66 granted_bytes: bytes,
67 },
68 }
69 }
70
71 pub(crate) fn limited(quota: MemoryQuota<M>, permit: OwnedSemaphorePermit) -> Self {
72 Self {
73 state: GuardState::Limited { quota, permit },
74 }
75 }
76
77 pub fn granted_bytes(&self) -> u64 {
79 match &self.state {
80 GuardState::Released => 0,
81 GuardState::Unlimited { granted_bytes, .. } => *granted_bytes,
82 GuardState::Limited { quota, permit } => {
83 quota.permits_to_bytes(permit.num_permits() as u32)
84 }
85 }
86 }
87
88 pub async fn acquire_additional(&mut self, bytes: u64) -> Result<()> {
96 if bytes == 0 {
97 return Ok(());
98 }
99
100 match &mut self.state {
101 GuardState::Released => {
102 debug_assert!(false, "released memory guard state should not be reused");
103 Ok(())
104 }
105 GuardState::Unlimited {
106 quota,
107 granted_bytes,
108 } => {
109 quota.add_in_use(bytes);
110 *granted_bytes = granted_bytes.saturating_add(bytes);
111 Ok(())
112 }
113 GuardState::Limited { quota, permit } => {
114 let additional_permits = quota.bytes_to_permits(bytes);
115 let current_permits = permit.num_permits() as u32;
116
117 ensure!(
118 current_permits.saturating_add(additional_permits) <= quota.limit_permits,
119 MemoryLimitExceededSnafu {
120 requested_bytes: bytes,
121 limit_bytes: quota.permits_to_bytes(quota.limit_permits)
122 }
123 );
124
125 let additional_permit = quota
126 .semaphore
127 .clone()
128 .acquire_many_owned(additional_permits)
129 .await
130 .map_err(|_| MemorySemaphoreClosedSnafu.build())?;
131
132 permit.merge(additional_permit);
133 quota.update_in_use_metric();
134 Ok(())
135 }
136 }
137 }
138
139 pub fn try_acquire_additional(&mut self, bytes: u64) -> bool {
144 if bytes == 0 {
145 return true;
146 }
147
148 match &mut self.state {
149 GuardState::Released => {
150 debug_assert!(false, "released memory guard state should not be reused");
151 false
152 }
153 GuardState::Unlimited {
154 quota,
155 granted_bytes,
156 } => {
157 quota.add_in_use(bytes);
158 *granted_bytes = granted_bytes.saturating_add(bytes);
159 true
160 }
161 GuardState::Limited { quota, permit } => {
162 let additional_permits = quota.bytes_to_permits(bytes);
163
164 match quota
165 .semaphore
166 .clone()
167 .try_acquire_many_owned(additional_permits)
168 {
169 Ok(additional_permit) => {
170 permit.merge(additional_permit);
171 quota.update_in_use_metric();
172 true
173 }
174 Err(TryAcquireError::NoPermits) | Err(TryAcquireError::Closed) => {
175 quota.metrics.inc_exhausted("try_acquire_additional");
176 false
177 }
178 }
179 }
180 }
181 }
182
183 pub async fn acquire_additional_with_policy(
193 &mut self,
194 bytes: u64,
195 policy: OnExhaustedPolicy,
196 ) -> Result<()> {
197 match policy {
198 OnExhaustedPolicy::Wait { timeout } => {
199 match tokio::time::timeout(timeout, self.acquire_additional(bytes)).await {
200 Ok(Ok(())) => Ok(()),
201 Ok(Err(e)) => Err(e),
202 Err(_elapsed) => MemoryAcquireTimeoutSnafu {
203 requested_bytes: bytes,
204 waited: timeout,
205 }
206 .fail(),
207 }
208 }
209 OnExhaustedPolicy::Fail => {
210 if self.try_acquire_additional(bytes) {
211 Ok(())
212 } else {
213 MemoryLimitExceededSnafu {
214 requested_bytes: bytes,
215 limit_bytes: match &self.state {
216 GuardState::Released => 0,
217 GuardState::Unlimited { .. } => 0, GuardState::Limited { quota, .. } => {
219 quota.permits_to_bytes(quota.limit_permits)
220 }
221 },
222 }
223 .fail()
224 }
225 }
226 }
227 }
228
229 pub fn release_partial(&mut self, bytes: u64) -> bool {
233 if bytes == 0 {
234 return true;
235 }
236
237 match &mut self.state {
238 GuardState::Released => true,
239 GuardState::Unlimited {
240 quota,
241 granted_bytes,
242 } => {
243 if bytes > *granted_bytes {
244 return false;
245 }
246
247 quota.sub_in_use(bytes);
248 *granted_bytes = granted_bytes.saturating_sub(bytes);
249 true
250 }
251 GuardState::Limited { quota, permit } => {
252 let release_permits = quota.bytes_to_permits(bytes);
253
254 match permit.split(release_permits as usize) {
255 Some(released_permit) => {
256 quota.release_permit(released_permit);
257 true
258 }
259 None => false,
260 }
261 }
262 }
263 }
264}
265
266impl<M: MemoryMetrics> Drop for MemoryGuard<M> {
267 fn drop(&mut self) {
268 mem::replace(&mut self.state, GuardState::Released).release();
269 }
270}
271
272impl<M: MemoryMetrics> fmt::Debug for MemoryGuard<M> {
273 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
274 f.debug_struct("MemoryGuard")
275 .field("granted_bytes", &self.granted_bytes())
276 .finish()
277 }
278}