common_memory_manager/
guard.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{fmt, mem};
16
17use common_telemetry::debug;
18use snafu::ensure;
19use tokio::sync::{OwnedSemaphorePermit, TryAcquireError};
20
21use crate::error::{
22    MemoryAcquireTimeoutSnafu, MemoryLimitExceededSnafu, MemorySemaphoreClosedSnafu, Result,
23};
24use crate::manager::{MemoryMetrics, MemoryQuota};
25use crate::policy::OnExhaustedPolicy;
26
27/// Guard representing a slice of reserved memory.
28pub struct MemoryGuard<M: MemoryMetrics> {
29    pub(crate) state: GuardState<M>,
30}
31
32pub(crate) enum GuardState<M: MemoryMetrics> {
33    Unlimited,
34    Limited {
35        permit: OwnedSemaphorePermit,
36        quota: MemoryQuota<M>,
37    },
38}
39
40impl<M: MemoryMetrics> MemoryGuard<M> {
41    pub(crate) fn unlimited() -> Self {
42        Self {
43            state: GuardState::Unlimited,
44        }
45    }
46
47    pub(crate) fn limited(permit: OwnedSemaphorePermit, quota: MemoryQuota<M>) -> Self {
48        Self {
49            state: GuardState::Limited { permit, quota },
50        }
51    }
52
53    /// Returns granted quota in bytes.
54    pub fn granted_bytes(&self) -> u64 {
55        match &self.state {
56            GuardState::Unlimited => 0,
57            GuardState::Limited { permit, quota } => {
58                quota.permits_to_bytes(permit.num_permits() as u32)
59            }
60        }
61    }
62
63    /// Acquires additional memory, waiting if necessary until enough is available.
64    ///
65    /// On success, merges the new memory into this guard.
66    ///
67    /// # Errors
68    /// - Returns error if requested bytes would exceed the manager's total limit
69    /// - Returns error if the semaphore is unexpectedly closed
70    pub async fn acquire_additional(&mut self, bytes: u64) -> Result<()> {
71        match &mut self.state {
72            GuardState::Unlimited => Ok(()),
73            GuardState::Limited { permit, quota } => {
74                if bytes == 0 {
75                    return Ok(());
76                }
77
78                let additional_permits = quota.bytes_to_permits(bytes);
79                let current_permits = permit.num_permits() as u32;
80
81                ensure!(
82                    current_permits.saturating_add(additional_permits) <= quota.limit_permits,
83                    MemoryLimitExceededSnafu {
84                        requested_bytes: bytes,
85                        limit_bytes: quota.permits_to_bytes(quota.limit_permits)
86                    }
87                );
88
89                let additional_permit = quota
90                    .semaphore
91                    .clone()
92                    .acquire_many_owned(additional_permits)
93                    .await
94                    .map_err(|_| MemorySemaphoreClosedSnafu.build())?;
95
96                permit.merge(additional_permit);
97                quota.update_in_use_metric();
98                debug!("Acquired additional {} bytes", bytes);
99                Ok(())
100            }
101        }
102    }
103
104    /// Tries to acquire additional memory without waiting.
105    ///
106    /// On success, merges the new memory into this guard and returns true.
107    /// On failure, returns false and leaves this guard unchanged.
108    pub fn try_acquire_additional(&mut self, bytes: u64) -> bool {
109        match &mut self.state {
110            GuardState::Unlimited => true,
111            GuardState::Limited { permit, quota } => {
112                if bytes == 0 {
113                    return true;
114                }
115
116                let additional_permits = quota.bytes_to_permits(bytes);
117
118                match quota
119                    .semaphore
120                    .clone()
121                    .try_acquire_many_owned(additional_permits)
122                {
123                    Ok(additional_permit) => {
124                        permit.merge(additional_permit);
125                        quota.update_in_use_metric();
126                        debug!("Acquired additional {} bytes", bytes);
127                        true
128                    }
129                    Err(TryAcquireError::NoPermits) | Err(TryAcquireError::Closed) => {
130                        quota.metrics.inc_rejected("try_acquire_additional");
131                        false
132                    }
133                }
134            }
135        }
136    }
137
138    /// Acquires additional memory based on the given policy.
139    ///
140    /// - For `OnExhaustedPolicy::Wait`: Waits up to the timeout duration for memory to become available
141    /// - For `OnExhaustedPolicy::Fail`: Returns immediately if memory is not available
142    ///
143    /// # Errors
144    /// - `MemoryLimitExceeded`: Requested bytes would exceed the total limit (both policies), or memory is currently exhausted (Fail policy only)
145    /// - `MemoryAcquireTimeout`: Timeout elapsed while waiting for memory (Wait policy only)
146    /// - `MemorySemaphoreClosed`: The internal semaphore is unexpectedly closed (rare, indicates system issue)
147    pub async fn acquire_additional_with_policy(
148        &mut self,
149        bytes: u64,
150        policy: OnExhaustedPolicy,
151    ) -> Result<()> {
152        match policy {
153            OnExhaustedPolicy::Wait { timeout } => {
154                match tokio::time::timeout(timeout, self.acquire_additional(bytes)).await {
155                    Ok(Ok(())) => Ok(()),
156                    Ok(Err(e)) => Err(e),
157                    Err(_elapsed) => MemoryAcquireTimeoutSnafu {
158                        requested_bytes: bytes,
159                        waited: timeout,
160                    }
161                    .fail(),
162                }
163            }
164            OnExhaustedPolicy::Fail => {
165                if self.try_acquire_additional(bytes) {
166                    Ok(())
167                } else {
168                    MemoryLimitExceededSnafu {
169                        requested_bytes: bytes,
170                        limit_bytes: match &self.state {
171                            GuardState::Unlimited => 0, // unreachable: unlimited mode always succeeds
172                            GuardState::Limited { quota, .. } => {
173                                quota.permits_to_bytes(quota.limit_permits)
174                            }
175                        },
176                    }
177                    .fail()
178                }
179            }
180        }
181    }
182
183    /// Releases a portion of granted memory back to the pool before the guard is dropped.
184    ///
185    /// Returns true if the release succeeds or is a no-op; false if the request exceeds granted.
186    pub fn release_partial(&mut self, bytes: u64) -> bool {
187        match &mut self.state {
188            GuardState::Unlimited => true,
189            GuardState::Limited { permit, quota } => {
190                if bytes == 0 {
191                    return true;
192                }
193
194                let release_permits = quota.bytes_to_permits(bytes);
195
196                match permit.split(release_permits as usize) {
197                    Some(released_permit) => {
198                        let released_bytes =
199                            quota.permits_to_bytes(released_permit.num_permits() as u32);
200                        drop(released_permit);
201                        quota.update_in_use_metric();
202                        debug!("Released {} bytes from memory guard", released_bytes);
203                        true
204                    }
205                    None => false,
206                }
207            }
208        }
209    }
210}
211
212impl<M: MemoryMetrics> Drop for MemoryGuard<M> {
213    fn drop(&mut self) {
214        if let GuardState::Limited { permit, quota } =
215            mem::replace(&mut self.state, GuardState::Unlimited)
216        {
217            let bytes = quota.permits_to_bytes(permit.num_permits() as u32);
218            drop(permit);
219            quota.update_in_use_metric();
220            debug!("Released memory: {} bytes", bytes);
221        }
222    }
223}
224
225impl<M: MemoryMetrics> fmt::Debug for MemoryGuard<M> {
226    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
227        f.debug_struct("MemoryGuard")
228            .field("granted_bytes", &self.granted_bytes())
229            .finish()
230    }
231}