common_memory_manager/
guard.rs1use std::{fmt, mem};
16
17use common_telemetry::debug;
18use snafu::ensure;
19use tokio::sync::{OwnedSemaphorePermit, TryAcquireError};
20
21use crate::error::{
22 MemoryAcquireTimeoutSnafu, MemoryLimitExceededSnafu, MemorySemaphoreClosedSnafu, Result,
23};
24use crate::manager::{MemoryMetrics, MemoryQuota};
25use crate::policy::OnExhaustedPolicy;
26
27pub struct MemoryGuard<M: MemoryMetrics> {
29 pub(crate) state: GuardState<M>,
30}
31
32pub(crate) enum GuardState<M: MemoryMetrics> {
33 Unlimited,
34 Limited {
35 permit: OwnedSemaphorePermit,
36 quota: MemoryQuota<M>,
37 },
38}
39
40impl<M: MemoryMetrics> MemoryGuard<M> {
41 pub(crate) fn unlimited() -> Self {
42 Self {
43 state: GuardState::Unlimited,
44 }
45 }
46
47 pub(crate) fn limited(permit: OwnedSemaphorePermit, quota: MemoryQuota<M>) -> Self {
48 Self {
49 state: GuardState::Limited { permit, quota },
50 }
51 }
52
53 pub fn granted_bytes(&self) -> u64 {
55 match &self.state {
56 GuardState::Unlimited => 0,
57 GuardState::Limited { permit, quota } => {
58 quota.permits_to_bytes(permit.num_permits() as u32)
59 }
60 }
61 }
62
63 pub async fn acquire_additional(&mut self, bytes: u64) -> Result<()> {
71 match &mut self.state {
72 GuardState::Unlimited => Ok(()),
73 GuardState::Limited { permit, quota } => {
74 if bytes == 0 {
75 return Ok(());
76 }
77
78 let additional_permits = quota.bytes_to_permits(bytes);
79 let current_permits = permit.num_permits() as u32;
80
81 ensure!(
82 current_permits.saturating_add(additional_permits) <= quota.limit_permits,
83 MemoryLimitExceededSnafu {
84 requested_bytes: bytes,
85 limit_bytes: quota.permits_to_bytes(quota.limit_permits)
86 }
87 );
88
89 let additional_permit = quota
90 .semaphore
91 .clone()
92 .acquire_many_owned(additional_permits)
93 .await
94 .map_err(|_| MemorySemaphoreClosedSnafu.build())?;
95
96 permit.merge(additional_permit);
97 quota.update_in_use_metric();
98 debug!("Acquired additional {} bytes", bytes);
99 Ok(())
100 }
101 }
102 }
103
104 pub fn try_acquire_additional(&mut self, bytes: u64) -> bool {
109 match &mut self.state {
110 GuardState::Unlimited => true,
111 GuardState::Limited { permit, quota } => {
112 if bytes == 0 {
113 return true;
114 }
115
116 let additional_permits = quota.bytes_to_permits(bytes);
117
118 match quota
119 .semaphore
120 .clone()
121 .try_acquire_many_owned(additional_permits)
122 {
123 Ok(additional_permit) => {
124 permit.merge(additional_permit);
125 quota.update_in_use_metric();
126 debug!("Acquired additional {} bytes", bytes);
127 true
128 }
129 Err(TryAcquireError::NoPermits) | Err(TryAcquireError::Closed) => {
130 quota.metrics.inc_rejected("try_acquire_additional");
131 false
132 }
133 }
134 }
135 }
136 }
137
138 pub async fn acquire_additional_with_policy(
148 &mut self,
149 bytes: u64,
150 policy: OnExhaustedPolicy,
151 ) -> Result<()> {
152 match policy {
153 OnExhaustedPolicy::Wait { timeout } => {
154 match tokio::time::timeout(timeout, self.acquire_additional(bytes)).await {
155 Ok(Ok(())) => Ok(()),
156 Ok(Err(e)) => Err(e),
157 Err(_elapsed) => MemoryAcquireTimeoutSnafu {
158 requested_bytes: bytes,
159 waited: timeout,
160 }
161 .fail(),
162 }
163 }
164 OnExhaustedPolicy::Fail => {
165 if self.try_acquire_additional(bytes) {
166 Ok(())
167 } else {
168 MemoryLimitExceededSnafu {
169 requested_bytes: bytes,
170 limit_bytes: match &self.state {
171 GuardState::Unlimited => 0, GuardState::Limited { quota, .. } => {
173 quota.permits_to_bytes(quota.limit_permits)
174 }
175 },
176 }
177 .fail()
178 }
179 }
180 }
181 }
182
183 pub fn release_partial(&mut self, bytes: u64) -> bool {
187 match &mut self.state {
188 GuardState::Unlimited => true,
189 GuardState::Limited { permit, quota } => {
190 if bytes == 0 {
191 return true;
192 }
193
194 let release_permits = quota.bytes_to_permits(bytes);
195
196 match permit.split(release_permits as usize) {
197 Some(released_permit) => {
198 let released_bytes =
199 quota.permits_to_bytes(released_permit.num_permits() as u32);
200 drop(released_permit);
201 quota.update_in_use_metric();
202 debug!("Released {} bytes from memory guard", released_bytes);
203 true
204 }
205 None => false,
206 }
207 }
208 }
209 }
210}
211
212impl<M: MemoryMetrics> Drop for MemoryGuard<M> {
213 fn drop(&mut self) {
214 if let GuardState::Limited { permit, quota } =
215 mem::replace(&mut self.state, GuardState::Unlimited)
216 {
217 let bytes = quota.permits_to_bytes(permit.num_permits() as u32);
218 drop(permit);
219 quota.update_in_use_metric();
220 debug!("Released memory: {} bytes", bytes);
221 }
222 }
223}
224
225impl<M: MemoryMetrics> fmt::Debug for MemoryGuard<M> {
226 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
227 f.debug_struct("MemoryGuard")
228 .field("granted_bytes", &self.granted_bytes())
229 .finish()
230 }
231}