Skip to main content

common_memory_manager/
manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::sync::atomic::{AtomicU64, Ordering};
17
18use snafu::ensure;
19use tokio::sync::{OwnedSemaphorePermit, Semaphore, TryAcquireError};
20
21use crate::error::{
22    MemoryAcquireTimeoutSnafu, MemoryLimitExceededSnafu, MemorySemaphoreClosedSnafu, Result,
23};
24use crate::granularity::PermitGranularity;
25use crate::guard::MemoryGuard;
26use crate::policy::OnExhaustedPolicy;
27
28/// Trait for recording memory usage metrics.
29pub trait MemoryMetrics: Clone + Send + Sync + 'static {
30    fn set_limit(&self, bytes: i64);
31    fn set_in_use(&self, bytes: i64);
32    /// Record that immediate memory acquisition failed due to exhausted quota.
33    fn inc_exhausted(&self, reason: &str);
34}
35
36/// Generic memory manager for quota-controlled operations.
37#[derive(Clone)]
38pub struct MemoryManager<M: MemoryMetrics> {
39    quota: MemoryQuotaState<M>,
40}
41
42impl<M: MemoryMetrics + Default> Default for MemoryManager<M> {
43    fn default() -> Self {
44        Self::new(0, M::default())
45    }
46}
47
48#[derive(Clone)]
49pub(crate) struct MemoryQuota<M: MemoryMetrics> {
50    pub(crate) semaphore: Arc<Semaphore>,
51    pub(crate) limit_permits: u32,
52    pub(crate) granularity: PermitGranularity,
53    pub(crate) metrics: M,
54}
55
56#[derive(Clone)]
57pub(crate) struct UnlimitedMemoryQuota<M: MemoryMetrics> {
58    pub(crate) current_bytes: Arc<AtomicU64>,
59    pub(crate) metrics: M,
60}
61
62#[derive(Clone)]
63pub(crate) enum MemoryQuotaState<M: MemoryMetrics> {
64    Unlimited(UnlimitedMemoryQuota<M>),
65    Limited(MemoryQuota<M>),
66}
67
68impl<M: MemoryMetrics> MemoryManager<M> {
69    /// Creates a new memory manager with the given limit in bytes.
70    /// `limit_bytes = 0` disables the limit.
71    pub fn new(limit_bytes: u64, metrics: M) -> Self {
72        Self::with_granularity(limit_bytes, PermitGranularity::default(), metrics)
73    }
74
75    /// Creates a new memory manager with specified granularity.
76    pub fn with_granularity(limit_bytes: u64, granularity: PermitGranularity, metrics: M) -> Self {
77        if limit_bytes == 0 {
78            metrics.set_limit(0);
79            return Self {
80                quota: MemoryQuotaState::Unlimited(UnlimitedMemoryQuota {
81                    current_bytes: Arc::new(AtomicU64::new(0)),
82                    metrics,
83                }),
84            };
85        }
86
87        let limit_permits = granularity.bytes_to_permits(limit_bytes);
88        let limit_aligned_bytes = granularity.permits_to_bytes(limit_permits);
89        metrics.set_limit(limit_aligned_bytes as i64);
90
91        Self {
92            quota: MemoryQuotaState::Limited(MemoryQuota {
93                semaphore: Arc::new(Semaphore::new(limit_permits as usize)),
94                limit_permits,
95                granularity,
96                metrics,
97            }),
98        }
99    }
100
101    /// Returns the configured limit in bytes (0 if unlimited).
102    pub fn limit_bytes(&self) -> u64 {
103        match &self.quota {
104            MemoryQuotaState::Unlimited(_) => 0,
105            MemoryQuotaState::Limited(quota) => quota.permits_to_bytes(quota.limit_permits),
106        }
107    }
108
109    /// Returns currently used bytes.
110    pub fn used_bytes(&self) -> u64 {
111        match &self.quota {
112            MemoryQuotaState::Unlimited(quota) => quota.current_bytes.load(Ordering::Acquire),
113            MemoryQuotaState::Limited(quota) => quota.permits_to_bytes(quota.used_permits()),
114        }
115    }
116
117    /// Returns available bytes.
118    ///
119    /// Unlimited managers report `u64::MAX`.
120    pub fn available_bytes(&self) -> u64 {
121        match &self.quota {
122            MemoryQuotaState::Unlimited(_) => u64::MAX,
123            MemoryQuotaState::Limited(quota) => {
124                quota.permits_to_bytes(quota.available_permits_clamped())
125            }
126        }
127    }
128
129    /// Acquires memory, waiting if necessary until enough is available.
130    ///
131    /// # Errors
132    /// - Returns error if requested bytes exceed the total limit
133    /// - Returns error if the semaphore is unexpectedly closed
134    pub async fn acquire(&self, bytes: u64) -> Result<MemoryGuard<M>> {
135        match &self.quota {
136            MemoryQuotaState::Unlimited(quota) => Ok(MemoryGuard::unlimited(quota.clone(), bytes)),
137            MemoryQuotaState::Limited(quota) => {
138                let permits = quota.bytes_to_permits(bytes);
139
140                ensure!(
141                    permits <= quota.limit_permits,
142                    MemoryLimitExceededSnafu {
143                        requested_bytes: bytes,
144                        limit_bytes: self.limit_bytes()
145                    }
146                );
147
148                let permit = quota
149                    .semaphore
150                    .clone()
151                    .acquire_many_owned(permits)
152                    .await
153                    .map_err(|_| MemorySemaphoreClosedSnafu.build())?;
154                quota.update_in_use_metric();
155                Ok(MemoryGuard::limited(quota.clone(), permit))
156            }
157        }
158    }
159
160    /// Tries to acquire memory. Returns Some(guard) on success, None if insufficient.
161    pub fn try_acquire(&self, bytes: u64) -> Option<MemoryGuard<M>> {
162        match &self.quota {
163            MemoryQuotaState::Unlimited(quota) => {
164                Some(MemoryGuard::unlimited(quota.clone(), bytes))
165            }
166            MemoryQuotaState::Limited(quota) => {
167                let permits = quota.bytes_to_permits(bytes);
168
169                match quota.semaphore.clone().try_acquire_many_owned(permits) {
170                    Ok(permit) => {
171                        quota.update_in_use_metric();
172                        Some(MemoryGuard::limited(quota.clone(), permit))
173                    }
174                    Err(TryAcquireError::NoPermits) | Err(TryAcquireError::Closed) => {
175                        quota.metrics.inc_exhausted("try_acquire");
176                        None
177                    }
178                }
179            }
180        }
181    }
182
183    /// Acquires memory based on the given policy.
184    ///
185    /// - For `OnExhaustedPolicy::Wait`: Waits up to the timeout duration for memory to become available
186    /// - For `OnExhaustedPolicy::Fail`: Returns immediately if memory is not available
187    ///
188    /// # Errors
189    /// - `MemoryLimitExceeded`: Requested bytes exceed the total limit (both policies), or memory is currently exhausted (Fail policy only)
190    /// - `MemoryAcquireTimeout`: Timeout elapsed while waiting for memory (Wait policy only)
191    /// - `MemorySemaphoreClosed`: The internal semaphore is unexpectedly closed (rare, indicates system issue)
192    pub async fn acquire_with_policy(
193        &self,
194        bytes: u64,
195        policy: OnExhaustedPolicy,
196    ) -> Result<MemoryGuard<M>> {
197        match policy {
198            OnExhaustedPolicy::Wait { timeout } => {
199                match tokio::time::timeout(timeout, self.acquire(bytes)).await {
200                    Ok(Ok(guard)) => Ok(guard),
201                    Ok(Err(e)) => Err(e),
202                    Err(_elapsed) => {
203                        // Timeout elapsed while waiting
204                        MemoryAcquireTimeoutSnafu {
205                            requested_bytes: bytes,
206                            waited: timeout,
207                        }
208                        .fail()
209                    }
210                }
211            }
212            OnExhaustedPolicy::Fail => self.try_acquire(bytes).ok_or_else(|| {
213                MemoryLimitExceededSnafu {
214                    requested_bytes: bytes,
215                    limit_bytes: self.limit_bytes(),
216                }
217                .build()
218            }),
219        }
220    }
221}
222
223impl<M: MemoryMetrics> MemoryQuota<M> {
224    pub(crate) fn bytes_to_permits(&self, bytes: u64) -> u32 {
225        self.granularity.bytes_to_permits(bytes)
226    }
227
228    pub(crate) fn permits_to_bytes(&self, permits: u32) -> u64 {
229        self.granularity.permits_to_bytes(permits)
230    }
231
232    pub(crate) fn used_permits(&self) -> u32 {
233        self.limit_permits
234            .saturating_sub(self.available_permits_clamped())
235    }
236
237    pub(crate) fn available_permits_clamped(&self) -> u32 {
238        self.semaphore
239            .available_permits()
240            .min(self.limit_permits as usize) as u32
241    }
242
243    pub(crate) fn update_in_use_metric(&self) {
244        let bytes = self.permits_to_bytes(self.used_permits());
245        self.metrics.set_in_use(bytes as i64);
246    }
247
248    pub(crate) fn release_permit(&self, permit: OwnedSemaphorePermit) {
249        drop(permit);
250        self.update_in_use_metric();
251    }
252}
253
254impl<M: MemoryMetrics> UnlimitedMemoryQuota<M> {
255    pub(crate) fn add_in_use(&self, bytes: u64) {
256        if bytes == 0 {
257            return;
258        }
259
260        let previous = self
261            .current_bytes
262            .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
263                Some(current.saturating_add(bytes))
264            })
265            .unwrap();
266        let new_total = previous.saturating_add(bytes);
267        debug_assert!(
268            new_total >= previous,
269            "unlimited memory usage counter overflowed"
270        );
271        self.metrics.set_in_use(new_total as i64);
272    }
273
274    pub(crate) fn sub_in_use(&self, bytes: u64) {
275        if bytes == 0 {
276            return;
277        }
278
279        let previous = self
280            .current_bytes
281            .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
282                Some(current.saturating_sub(bytes))
283            })
284            .unwrap();
285        debug_assert!(
286            previous >= bytes,
287            "unlimited memory usage counter underflowed: current={previous}, release={bytes}"
288        );
289        let new_total = previous.saturating_sub(bytes);
290        self.metrics.set_in_use(new_total as i64);
291    }
292}