1use axum::body::Body;
16use axum::http::Request;
17use axum::middleware::Next;
18use axum::response::Response;
19use common_telemetry::debug;
20use session::context::QueryContext;
21use session::hints::is_reserved_extension_key;
22
23use crate::hint_headers;
24
25pub async fn extract_hints(mut request: Request<Body>, next: Next) -> Response {
26 let hints = hint_headers::extract_hints(request.headers());
27 if let Some(query_ctx) = request.extensions_mut().get_mut::<QueryContext>() {
28 apply_hints(query_ctx, hints);
29 }
30 next.run(request).await
31}
32
33fn apply_hints(query_ctx: &mut QueryContext, hints: Vec<(String, String)>) {
34 for (key, value) in hints {
35 if is_reserved_extension_key(&key) {
36 debug!(
37 key = key.as_str(),
38 "Ignoring reserved external query context extension key"
39 );
40 continue;
41 }
42 query_ctx.set_extension(key, value);
43 }
44}
45
46#[cfg(test)]
47mod tests {
48 use common_query::request::INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY as COMMON_INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY;
49 use query::options::FLOW_SCHEDULED_TIME_MILLIS;
50 use session::context::{QueryContextBuilder, generate_remote_query_id};
51 use session::hints::{
52 INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY, REMOTE_QUERY_ID_EXTENSION_KEY,
53 };
54
55 use super::apply_hints;
56
57 #[test]
58 fn test_apply_hints_ignores_reserved_extension_keys_only() {
59 let original_query_id = generate_remote_query_id();
60 let mut query_ctx = QueryContextBuilder::default()
61 .set_extension(
62 REMOTE_QUERY_ID_EXTENSION_KEY.to_string(),
63 original_query_id.clone(),
64 )
65 .build();
66
67 apply_hints(
68 &mut query_ctx,
69 vec![
70 (
71 REMOTE_QUERY_ID_EXTENSION_KEY.to_string(),
72 "spoofed".to_string(),
73 ),
74 (
75 INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY.to_string(),
76 "spoofed-regs".to_string(),
77 ),
78 (
79 FLOW_SCHEDULED_TIME_MILLIS.to_string(),
80 "1700000000000".to_string(),
81 ),
82 ("ttl".to_string(), "7d".to_string()),
83 ],
84 );
85
86 assert_eq!(
87 query_ctx.remote_query_id(),
88 Some(original_query_id.as_str())
89 );
90 assert!(
91 query_ctx
92 .extension(INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY)
93 .is_none()
94 );
95 assert_eq!(
96 query_ctx.extension(FLOW_SCHEDULED_TIME_MILLIS),
97 Some("1700000000000")
98 );
99 assert_eq!(query_ctx.extension("ttl"), Some("7d"));
100 }
101
102 #[test]
103 fn test_initial_dyn_filter_registration_key_matches_common_query_constant() {
104 assert_eq!(
105 INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY,
106 COMMON_INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY
107 );
108 }
109}