servers/request_memory_limiter.rs
1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Unified memory limiter for all server request protocols.
16
17use std::sync::Arc;
18
19use common_memory_manager::{MemoryGuard, MemoryManager, OnExhaustedPolicy, PermitGranularity};
20use snafu::ResultExt;
21
22use crate::error::{MemoryLimitExceededSnafu, Result};
23use crate::request_memory_metrics::RequestMemoryMetrics;
24
25/// Unified memory limiter for all server request protocols.
26///
27/// Manages a global memory pool for HTTP requests, gRPC messages, and
28/// Arrow Flight batches without distinguishing between them.
29#[derive(Clone)]
30pub struct ServerMemoryLimiter {
31 manager: Arc<MemoryManager<RequestMemoryMetrics>>,
32 policy: OnExhaustedPolicy,
33}
34
35impl Default for ServerMemoryLimiter {
36 /// Creates a limiter with unlimited memory (0 bytes) and default policy.
37 fn default() -> Self {
38 Self::new(0, OnExhaustedPolicy::default())
39 }
40}
41
42impl ServerMemoryLimiter {
43 /// Creates a new unified memory limiter.
44 ///
45 /// # Arguments
46 ///
47 /// * `total_bytes` - Maximum total memory for all concurrent requests (0 = unlimited)
48 /// * `policy` - Policy when memory quota is exhausted
49 pub fn new(total_bytes: u64, policy: OnExhaustedPolicy) -> Self {
50 let manager = Arc::new(MemoryManager::with_granularity(
51 total_bytes,
52 PermitGranularity::Kilobyte,
53 RequestMemoryMetrics,
54 ));
55
56 Self { manager, policy }
57 }
58
59 /// Acquire memory for a request.
60 pub async fn acquire(&self, bytes: u64) -> Result<MemoryGuard<RequestMemoryMetrics>> {
61 self.manager
62 .acquire_with_policy(bytes, self.policy)
63 .await
64 .context(MemoryLimitExceededSnafu)
65 }
66
67 /// Try to acquire memory without waiting.
68 pub fn try_acquire(&self, bytes: u64) -> Option<MemoryGuard<RequestMemoryMetrics>> {
69 self.manager.try_acquire(bytes)
70 }
71
72 /// Returns total memory limit in bytes (0 if unlimited).
73 pub fn limit_bytes(&self) -> u64 {
74 self.manager.limit_bytes()
75 }
76}