Skip to main content

common_memory_manager/
guard.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{fmt, mem};
16
17use snafu::ensure;
18use tokio::sync::{OwnedSemaphorePermit, TryAcquireError};
19
20use crate::error::{
21    MemoryAcquireTimeoutSnafu, MemoryLimitExceededSnafu, MemorySemaphoreClosedSnafu, Result,
22};
23use crate::manager::{MemoryMetrics, MemoryQuota, UnlimitedMemoryQuota};
24use crate::policy::OnExhaustedPolicy;
25
26/// Guard representing a slice of reserved memory.
27pub struct MemoryGuard<M: MemoryMetrics> {
28    pub(crate) state: GuardState<M>,
29}
30
31pub(crate) enum GuardState<M: MemoryMetrics> {
32    Released,
33    Unlimited {
34        quota: UnlimitedMemoryQuota<M>,
35        granted_bytes: u64,
36    },
37    Limited {
38        quota: MemoryQuota<M>,
39        permit: OwnedSemaphorePermit,
40    },
41}
42
43impl<M: MemoryMetrics> GuardState<M> {
44    fn release(self) {
45        match self {
46            GuardState::Released => {}
47            GuardState::Unlimited {
48                quota,
49                granted_bytes,
50            } => {
51                quota.sub_in_use(granted_bytes);
52            }
53            GuardState::Limited { quota, permit } => {
54                quota.release_permit(permit);
55            }
56        }
57    }
58}
59
60impl<M: MemoryMetrics> MemoryGuard<M> {
61    pub(crate) fn unlimited(quota: UnlimitedMemoryQuota<M>, bytes: u64) -> Self {
62        quota.add_in_use(bytes);
63        Self {
64            state: GuardState::Unlimited {
65                quota,
66                granted_bytes: bytes,
67            },
68        }
69    }
70
71    pub(crate) fn limited(quota: MemoryQuota<M>, permit: OwnedSemaphorePermit) -> Self {
72        Self {
73            state: GuardState::Limited { quota, permit },
74        }
75    }
76
77    /// Returns granted quota in bytes.
78    pub fn granted_bytes(&self) -> u64 {
79        match &self.state {
80            GuardState::Released => 0,
81            GuardState::Unlimited { granted_bytes, .. } => *granted_bytes,
82            GuardState::Limited { quota, permit } => {
83                quota.permits_to_bytes(permit.num_permits() as u32)
84            }
85        }
86    }
87
88    /// Acquires additional memory, waiting if necessary until enough is available.
89    ///
90    /// On success, merges the new memory into this guard.
91    ///
92    /// # Errors
93    /// - Returns error if requested bytes would exceed the manager's total limit
94    /// - Returns error if the semaphore is unexpectedly closed
95    pub async fn acquire_additional(&mut self, bytes: u64) -> Result<()> {
96        if bytes == 0 {
97            return Ok(());
98        }
99
100        match &mut self.state {
101            GuardState::Released => {
102                debug_assert!(false, "released memory guard state should not be reused");
103                Ok(())
104            }
105            GuardState::Unlimited {
106                quota,
107                granted_bytes,
108            } => {
109                quota.add_in_use(bytes);
110                *granted_bytes = granted_bytes.saturating_add(bytes);
111                Ok(())
112            }
113            GuardState::Limited { quota, permit } => {
114                let additional_permits = quota.bytes_to_permits(bytes);
115                let current_permits = permit.num_permits() as u32;
116
117                ensure!(
118                    current_permits.saturating_add(additional_permits) <= quota.limit_permits,
119                    MemoryLimitExceededSnafu {
120                        requested_bytes: bytes,
121                        limit_bytes: quota.permits_to_bytes(quota.limit_permits)
122                    }
123                );
124
125                let additional_permit = quota
126                    .semaphore
127                    .clone()
128                    .acquire_many_owned(additional_permits)
129                    .await
130                    .map_err(|_| MemorySemaphoreClosedSnafu.build())?;
131
132                permit.merge(additional_permit);
133                quota.update_in_use_metric();
134                Ok(())
135            }
136        }
137    }
138
139    /// Tries to acquire additional memory without waiting.
140    ///
141    /// On success, merges the new memory into this guard and returns true.
142    /// On failure, returns false and leaves this guard unchanged.
143    pub fn try_acquire_additional(&mut self, bytes: u64) -> bool {
144        if bytes == 0 {
145            return true;
146        }
147
148        match &mut self.state {
149            GuardState::Released => {
150                debug_assert!(false, "released memory guard state should not be reused");
151                false
152            }
153            GuardState::Unlimited {
154                quota,
155                granted_bytes,
156            } => {
157                quota.add_in_use(bytes);
158                *granted_bytes = granted_bytes.saturating_add(bytes);
159                true
160            }
161            GuardState::Limited { quota, permit } => {
162                let additional_permits = quota.bytes_to_permits(bytes);
163
164                match quota
165                    .semaphore
166                    .clone()
167                    .try_acquire_many_owned(additional_permits)
168                {
169                    Ok(additional_permit) => {
170                        permit.merge(additional_permit);
171                        quota.update_in_use_metric();
172                        true
173                    }
174                    Err(TryAcquireError::NoPermits) | Err(TryAcquireError::Closed) => {
175                        quota.metrics.inc_exhausted("try_acquire_additional");
176                        false
177                    }
178                }
179            }
180        }
181    }
182
183    /// Acquires additional memory based on the given policy.
184    ///
185    /// - For `OnExhaustedPolicy::Wait`: Waits up to the timeout duration for memory to become available
186    /// - For `OnExhaustedPolicy::Fail`: Returns immediately if memory is not available
187    ///
188    /// # Errors
189    /// - `MemoryLimitExceeded`: Requested bytes would exceed the total limit (both policies), or memory is currently exhausted (Fail policy only)
190    /// - `MemoryAcquireTimeout`: Timeout elapsed while waiting for memory (Wait policy only)
191    /// - `MemorySemaphoreClosed`: The internal semaphore is unexpectedly closed (rare, indicates system issue)
192    pub async fn acquire_additional_with_policy(
193        &mut self,
194        bytes: u64,
195        policy: OnExhaustedPolicy,
196    ) -> Result<()> {
197        match policy {
198            OnExhaustedPolicy::Wait { timeout } => {
199                match tokio::time::timeout(timeout, self.acquire_additional(bytes)).await {
200                    Ok(Ok(())) => Ok(()),
201                    Ok(Err(e)) => Err(e),
202                    Err(_elapsed) => MemoryAcquireTimeoutSnafu {
203                        requested_bytes: bytes,
204                        waited: timeout,
205                    }
206                    .fail(),
207                }
208            }
209            OnExhaustedPolicy::Fail => {
210                if self.try_acquire_additional(bytes) {
211                    Ok(())
212                } else {
213                    MemoryLimitExceededSnafu {
214                        requested_bytes: bytes,
215                        limit_bytes: match &self.state {
216                            GuardState::Released => 0,
217                            GuardState::Unlimited { .. } => 0, // unreachable: unlimited mode always succeeds
218                            GuardState::Limited { quota, .. } => {
219                                quota.permits_to_bytes(quota.limit_permits)
220                            }
221                        },
222                    }
223                    .fail()
224                }
225            }
226        }
227    }
228
229    /// Releases a portion of granted memory back to the pool before the guard is dropped.
230    ///
231    /// Returns true if the release succeeds or is a no-op; false if the request exceeds granted.
232    pub fn release_partial(&mut self, bytes: u64) -> bool {
233        if bytes == 0 {
234            return true;
235        }
236
237        match &mut self.state {
238            GuardState::Released => true,
239            GuardState::Unlimited {
240                quota,
241                granted_bytes,
242            } => {
243                if bytes > *granted_bytes {
244                    return false;
245                }
246
247                quota.sub_in_use(bytes);
248                *granted_bytes = granted_bytes.saturating_sub(bytes);
249                true
250            }
251            GuardState::Limited { quota, permit } => {
252                let release_permits = quota.bytes_to_permits(bytes);
253
254                match permit.split(release_permits as usize) {
255                    Some(released_permit) => {
256                        quota.release_permit(released_permit);
257                        true
258                    }
259                    None => false,
260                }
261            }
262        }
263    }
264}
265
266impl<M: MemoryMetrics> Drop for MemoryGuard<M> {
267    fn drop(&mut self) {
268        mem::replace(&mut self.state, GuardState::Released).release();
269    }
270}
271
272impl<M: MemoryMetrics> fmt::Debug for MemoryGuard<M> {
273    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
274        f.debug_struct("MemoryGuard")
275            .field("granted_bytes", &self.granted_bytes())
276            .finish()
277    }
278}