servers/
request_memory_limiter.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Unified memory limiter for all server request protocols.
16
17use std::sync::Arc;
18
19use common_memory_manager::{MemoryGuard, MemoryManager, OnExhaustedPolicy, PermitGranularity};
20use snafu::ResultExt;
21
22use crate::error::{MemoryLimitExceededSnafu, Result};
23use crate::request_memory_metrics::RequestMemoryMetrics;
24
25/// Unified memory limiter for all server request protocols.
26///
27/// Manages a global memory pool for HTTP requests, gRPC messages, and
28/// Arrow Flight batches without distinguishing between them.
29#[derive(Clone)]
30pub struct ServerMemoryLimiter {
31    manager: Arc<MemoryManager<RequestMemoryMetrics>>,
32    policy: OnExhaustedPolicy,
33}
34
35impl Default for ServerMemoryLimiter {
36    /// Creates a limiter with unlimited memory (0 bytes) and default policy.
37    fn default() -> Self {
38        Self::new(0, OnExhaustedPolicy::default())
39    }
40}
41
42impl ServerMemoryLimiter {
43    /// Creates a new unified memory limiter.
44    ///
45    /// # Arguments
46    ///
47    /// * `total_bytes` - Maximum total memory for all concurrent requests (0 = unlimited)
48    /// * `policy` - Policy when memory quota is exhausted
49    pub fn new(total_bytes: u64, policy: OnExhaustedPolicy) -> Self {
50        let manager = Arc::new(MemoryManager::with_granularity(
51            total_bytes,
52            PermitGranularity::Kilobyte,
53            RequestMemoryMetrics,
54        ));
55
56        Self { manager, policy }
57    }
58
59    /// Acquire memory for a request.
60    pub async fn acquire(&self, bytes: u64) -> Result<MemoryGuard<RequestMemoryMetrics>> {
61        self.manager
62            .acquire_with_policy(bytes, self.policy)
63            .await
64            .context(MemoryLimitExceededSnafu)
65    }
66
67    /// Try to acquire memory without waiting.
68    pub fn try_acquire(&self, bytes: u64) -> Option<MemoryGuard<RequestMemoryMetrics>> {
69        self.manager.try_acquire(bytes)
70    }
71
72    /// Returns total memory limit in bytes (0 if unlimited).
73    pub fn limit_bytes(&self) -> u64 {
74        self.manager.limit_bytes()
75    }
76}