servers/http/
timeout.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::Future;
16use std::pin::Pin;
17use std::task::{Context, Poll};
18use std::time::Duration;
19
20use axum::body::Body;
21use axum::http::Request;
22use axum::response::Response;
23use http::StatusCode;
24use pin_project::pin_project;
25use tokio::time::{Instant, Sleep};
26use tower::{Layer, Service};
27
28use crate::http::header::constants::GREPTIME_DB_HEADER_TIMEOUT;
29
30/// [`Timeout`] response future
31///
32/// [`Timeout`]: crate::timeout::Timeout
33///
34/// Modified from https://github.com/tower-rs/tower-http/blob/tower-http-0.5.2/tower-http/src/timeout/service.rs
35#[derive(Debug)]
36#[pin_project]
37pub struct ResponseFuture<T> {
38    #[pin]
39    inner: T,
40    #[pin]
41    sleep: Sleep,
42}
43
44impl<T> ResponseFuture<T> {
45    pub(crate) fn new(inner: T, sleep: Sleep) -> Self {
46        ResponseFuture { inner, sleep }
47    }
48}
49
50impl<F, E> Future for ResponseFuture<F>
51where
52    F: Future<Output = Result<Response, E>>,
53{
54    type Output = Result<Response, E>;
55
56    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
57        let this = self.project();
58
59        if this.sleep.poll(cx).is_ready() {
60            let mut res = Response::default();
61            *res.status_mut() = StatusCode::REQUEST_TIMEOUT;
62            return Poll::Ready(Ok(res));
63        }
64
65        this.inner.poll(cx)
66    }
67}
68
69/// Applies a timeout to requests via the supplied inner service.
70///
71/// Modified from https://github.com/tower-rs/tower-http/blob/tower-http-0.5.2/tower-http/src/timeout/service.rs
72#[derive(Debug, Clone)]
73pub struct DynamicTimeoutLayer {
74    default_timeout: Duration,
75}
76
77impl DynamicTimeoutLayer {
78    /// Create a timeout from a duration
79    pub fn new(default_timeout: Duration) -> Self {
80        DynamicTimeoutLayer { default_timeout }
81    }
82}
83
84impl<S> Layer<S> for DynamicTimeoutLayer {
85    type Service = DynamicTimeout<S>;
86
87    fn layer(&self, service: S) -> Self::Service {
88        DynamicTimeout::new(service, self.default_timeout)
89    }
90}
91
92/// Modified from https://github.com/tower-rs/tower-http/blob/tower-http-0.5.2/tower-http/src/timeout/service.rs
93#[derive(Clone)]
94pub struct DynamicTimeout<S> {
95    inner: S,
96    default_timeout: Duration,
97}
98
99impl<S> DynamicTimeout<S> {
100    /// Create a new [`DynamicTimeout`] with the given timeout
101    pub fn new(inner: S, default_timeout: Duration) -> Self {
102        DynamicTimeout {
103            inner,
104            default_timeout,
105        }
106    }
107}
108
109impl<S> Service<Request<Body>> for DynamicTimeout<S>
110where
111    S: Service<Request<Body>, Response = Response> + Send + 'static,
112{
113    type Response = S::Response;
114    type Error = S::Error;
115    type Future = ResponseFuture<S::Future>;
116
117    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
118        match self.inner.poll_ready(cx) {
119            Poll::Pending => Poll::Pending,
120            Poll::Ready(r) => Poll::Ready(r),
121        }
122    }
123
124    fn call(&mut self, request: Request<Body>) -> Self::Future {
125        let timeout = request
126            .headers()
127            .get(GREPTIME_DB_HEADER_TIMEOUT)
128            .and_then(|value| {
129                value
130                    .to_str()
131                    .ok()
132                    .and_then(|value| humantime::parse_duration(value).ok())
133            })
134            .unwrap_or(self.default_timeout);
135        let response = self.inner.call(request);
136
137        if timeout.is_zero() {
138            // 30 years. See `Instant::far_future`.
139            let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 30);
140            ResponseFuture::new(response, tokio::time::sleep_until(far_future))
141        } else {
142            let sleep = tokio::time::sleep(timeout);
143            ResponseFuture::new(response, sleep)
144        }
145    }
146}