Skip to main content

servers/http/
hints.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use axum::body::Body;
16use axum::http::Request;
17use axum::middleware::Next;
18use axum::response::Response;
19use common_telemetry::debug;
20use session::context::QueryContext;
21use session::hints::is_reserved_extension_key;
22
23use crate::hint_headers;
24
25pub async fn extract_hints(mut request: Request<Body>, next: Next) -> Response {
26    let hints = hint_headers::extract_hints(request.headers());
27    if let Some(query_ctx) = request.extensions_mut().get_mut::<QueryContext>() {
28        apply_hints(query_ctx, hints);
29    }
30    next.run(request).await
31}
32
33fn apply_hints(query_ctx: &mut QueryContext, hints: Vec<(String, String)>) {
34    for (key, value) in hints {
35        if is_reserved_extension_key(&key) {
36            debug!(
37                key = key.as_str(),
38                "Ignoring reserved external query context extension key"
39            );
40            continue;
41        }
42        query_ctx.set_extension(key, value);
43    }
44}
45
46#[cfg(test)]
47mod tests {
48    use common_query::request::INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY as COMMON_INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY;
49    use query::options::FLOW_SCHEDULED_TIME_MILLIS;
50    use session::context::{QueryContextBuilder, generate_remote_query_id};
51    use session::hints::{
52        INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY, REMOTE_QUERY_ID_EXTENSION_KEY,
53    };
54
55    use super::apply_hints;
56
57    #[test]
58    fn test_apply_hints_ignores_reserved_extension_keys_only() {
59        let original_query_id = generate_remote_query_id();
60        let mut query_ctx = QueryContextBuilder::default()
61            .set_extension(
62                REMOTE_QUERY_ID_EXTENSION_KEY.to_string(),
63                original_query_id.clone(),
64            )
65            .build();
66
67        apply_hints(
68            &mut query_ctx,
69            vec![
70                (
71                    REMOTE_QUERY_ID_EXTENSION_KEY.to_string(),
72                    "spoofed".to_string(),
73                ),
74                (
75                    INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY.to_string(),
76                    "spoofed-regs".to_string(),
77                ),
78                (
79                    FLOW_SCHEDULED_TIME_MILLIS.to_string(),
80                    "1700000000000".to_string(),
81                ),
82                ("ttl".to_string(), "7d".to_string()),
83            ],
84        );
85
86        assert_eq!(
87            query_ctx.remote_query_id(),
88            Some(original_query_id.as_str())
89        );
90        assert!(
91            query_ctx
92                .extension(INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY)
93                .is_none()
94        );
95        assert_eq!(
96            query_ctx.extension(FLOW_SCHEDULED_TIME_MILLIS),
97            Some("1700000000000")
98        );
99        assert_eq!(query_ctx.extension("ttl"), Some("7d"));
100    }
101
102    #[test]
103    fn test_initial_dyn_filter_registration_key_matches_common_query_constant() {
104        assert_eq!(
105            INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY,
106            COMMON_INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY
107        );
108    }
109}