1use std::future::Future;
16use std::pin::Pin;
17use std::task::{Context, Poll};
18use std::time::Duration;
19
20use axum::body::Body;
21use axum::http::Request;
22use axum::response::Response;
23use http::StatusCode;
24use pin_project::pin_project;
25use tokio::time::{Instant, Sleep};
26use tower::{Layer, Service};
27
28use crate::http::header::constants::GREPTIME_DB_HEADER_TIMEOUT;
29
30#[derive(Debug)]
36#[pin_project]
37pub struct ResponseFuture<T> {
38 #[pin]
39 inner: T,
40 #[pin]
41 sleep: Sleep,
42}
43
44impl<T> ResponseFuture<T> {
45 pub(crate) fn new(inner: T, sleep: Sleep) -> Self {
46 ResponseFuture { inner, sleep }
47 }
48}
49
50impl<F, E> Future for ResponseFuture<F>
51where
52 F: Future<Output = Result<Response, E>>,
53{
54 type Output = Result<Response, E>;
55
56 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
57 let this = self.project();
58
59 if this.sleep.poll(cx).is_ready() {
60 let mut res = Response::default();
61 *res.status_mut() = StatusCode::REQUEST_TIMEOUT;
62 return Poll::Ready(Ok(res));
63 }
64
65 this.inner.poll(cx)
66 }
67}
68
69#[derive(Debug, Clone)]
73pub struct DynamicTimeoutLayer {
74 default_timeout: Duration,
75}
76
77impl DynamicTimeoutLayer {
78 pub fn new(default_timeout: Duration) -> Self {
80 DynamicTimeoutLayer { default_timeout }
81 }
82}
83
84impl<S> Layer<S> for DynamicTimeoutLayer {
85 type Service = DynamicTimeout<S>;
86
87 fn layer(&self, service: S) -> Self::Service {
88 DynamicTimeout::new(service, self.default_timeout)
89 }
90}
91
92#[derive(Clone)]
94pub struct DynamicTimeout<S> {
95 inner: S,
96 default_timeout: Duration,
97}
98
99impl<S> DynamicTimeout<S> {
100 pub fn new(inner: S, default_timeout: Duration) -> Self {
102 DynamicTimeout {
103 inner,
104 default_timeout,
105 }
106 }
107}
108
109impl<S> Service<Request<Body>> for DynamicTimeout<S>
110where
111 S: Service<Request<Body>, Response = Response> + Send + 'static,
112{
113 type Response = S::Response;
114 type Error = S::Error;
115 type Future = ResponseFuture<S::Future>;
116
117 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
118 match self.inner.poll_ready(cx) {
119 Poll::Pending => Poll::Pending,
120 Poll::Ready(r) => Poll::Ready(r),
121 }
122 }
123
124 fn call(&mut self, request: Request<Body>) -> Self::Future {
125 let timeout = request
126 .headers()
127 .get(GREPTIME_DB_HEADER_TIMEOUT)
128 .and_then(|value| {
129 value
130 .to_str()
131 .ok()
132 .and_then(|value| humantime::parse_duration(value).ok())
133 })
134 .unwrap_or(self.default_timeout);
135 let response = self.inner.call(request);
136
137 if timeout.is_zero() {
138 let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 30);
140 ResponseFuture::new(response, tokio::time::sleep_until(far_future))
141 } else {
142 let sleep = tokio::time::sleep(timeout);
143 ResponseFuture::new(response, sleep)
144 }
145 }
146}