1use core::str;
16
17use ahash::HashSet;
18use axum::extract::FromRequestParts;
19use axum::http::StatusCode;
20use axum::http::request::Parts;
21use http::HeaderMap;
22use pipeline::{GreptimePipelineParams, SelectInfo, truthy};
23use session::protocol_ctx::OtlpMetricTranslationStrategy;
24
25use crate::http::header::constants::{
26 GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME, GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME,
27 GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME, GREPTIME_LOG_TABLE_NAME_HEADER_NAME,
28 GREPTIME_OTLP_METRIC_IGNORE_RESOURCE_ATTRS_HEADER_NAME,
29 GREPTIME_OTLP_METRIC_PROMOTE_ALL_RESOURCE_ATTRS_HEADER_NAME,
30 GREPTIME_OTLP_METRIC_PROMOTE_RESOURCE_ATTRS_HEADER_NAME,
31 GREPTIME_OTLP_METRIC_PROMOTE_SCOPE_ATTRS_HEADER_NAME,
32 GREPTIME_OTLP_METRIC_TRANSLATION_STRATEGY_HEADER_NAME, GREPTIME_PIPELINE_NAME_HEADER_NAME,
33 GREPTIME_PIPELINE_PARAMS_HEADER, GREPTIME_PIPELINE_VERSION_HEADER_NAME,
34 GREPTIME_TRACE_TABLE_NAME_HEADER_NAME,
35};
36
37pub struct LogTableName(pub Option<String>);
40
41impl<S> FromRequestParts<S> for LogTableName
42where
43 S: Send + Sync,
44{
45 type Rejection = (StatusCode, String);
46
47 async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
48 let headers = &parts.headers;
49 string_value_from_header(headers, &[GREPTIME_LOG_TABLE_NAME_HEADER_NAME]).map(LogTableName)
50 }
51}
52
53pub struct TraceTableName(pub Option<String>);
56
57impl<S> FromRequestParts<S> for TraceTableName
58where
59 S: Send + Sync,
60{
61 type Rejection = (StatusCode, String);
62
63 async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
64 let headers = &parts.headers;
65 string_value_from_header(headers, &[GREPTIME_TRACE_TABLE_NAME_HEADER_NAME])
66 .map(TraceTableName)
67 }
68}
69
70pub struct SelectInfoWrapper(pub SelectInfo);
74
75impl<S> FromRequestParts<S> for SelectInfoWrapper
76where
77 S: Send + Sync,
78{
79 type Rejection = (StatusCode, String);
80
81 async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
82 let select =
83 string_value_from_header(&parts.headers, &[GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME])?;
84
85 match select {
86 Some(name) => {
87 if name.is_empty() {
88 Ok(SelectInfoWrapper(Default::default()))
89 } else {
90 Ok(SelectInfoWrapper(SelectInfo::from(name)))
91 }
92 }
93 None => Ok(SelectInfoWrapper(Default::default())),
94 }
95 }
96}
97
98pub struct PipelineInfo {
101 pub pipeline_name: Option<String>,
102 pub pipeline_version: Option<String>,
103 pub pipeline_params: GreptimePipelineParams,
104}
105
106impl<S> FromRequestParts<S> for PipelineInfo
107where
108 S: Send + Sync,
109{
110 type Rejection = (StatusCode, String);
111
112 async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
113 let headers = &parts.headers;
114 let pipeline_name = string_value_from_header(
115 headers,
116 &[
117 GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME,
118 GREPTIME_PIPELINE_NAME_HEADER_NAME,
119 ],
120 )?;
121 let pipeline_version = string_value_from_header(
122 headers,
123 &[
124 GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME,
125 GREPTIME_PIPELINE_VERSION_HEADER_NAME,
126 ],
127 )?;
128 let pipeline_parameters =
129 string_value_from_header(headers, &[GREPTIME_PIPELINE_PARAMS_HEADER])?;
130
131 Ok(PipelineInfo {
132 pipeline_name,
133 pipeline_version,
134 pipeline_params: GreptimePipelineParams::from_params(pipeline_parameters.as_deref()),
135 })
136 }
137}
138
139pub struct OtlpMetricOptions {
141 pub promote_all_resource_attrs: bool,
144
145 pub resource_attrs: HashSet<String>,
148
149 pub promote_scope_attrs: bool,
152
153 pub metric_translation_strategy: OtlpMetricTranslationStrategy,
155}
156
157impl<S> FromRequestParts<S> for OtlpMetricOptions
158where
159 S: Send + Sync,
160{
161 type Rejection = (StatusCode, String);
162
163 async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
164 let headers = &parts.headers;
165 let promote_all_resource_attrs = string_value_from_header(
166 headers,
167 &[GREPTIME_OTLP_METRIC_PROMOTE_ALL_RESOURCE_ATTRS_HEADER_NAME],
168 )?
169 .map(truthy)
170 .unwrap_or(false);
171
172 let attr_header = if promote_all_resource_attrs {
173 [GREPTIME_OTLP_METRIC_IGNORE_RESOURCE_ATTRS_HEADER_NAME]
174 } else {
175 [GREPTIME_OTLP_METRIC_PROMOTE_RESOURCE_ATTRS_HEADER_NAME]
176 };
177
178 let resource_attrs = string_value_from_header(headers, &attr_header)?
179 .map(|s| s.split(';').map(|s| s.trim().to_string()).collect())
180 .unwrap_or_default();
181
182 let promote_scope_attrs = string_value_from_header(
183 headers,
184 &[GREPTIME_OTLP_METRIC_PROMOTE_SCOPE_ATTRS_HEADER_NAME],
185 )?
186 .map(truthy)
187 .unwrap_or(false);
188
189 let metric_translation_strategy = string_value_from_header(
190 headers,
191 &[GREPTIME_OTLP_METRIC_TRANSLATION_STRATEGY_HEADER_NAME],
192 )?
193 .map(|value| parse_otlp_metric_translation_strategy(&value))
194 .transpose()?
195 .unwrap_or_default();
196
197 Ok(OtlpMetricOptions {
198 promote_all_resource_attrs,
199 resource_attrs,
200 promote_scope_attrs,
201 metric_translation_strategy,
202 })
203 }
204}
205
206fn parse_otlp_metric_translation_strategy(
207 value: &str,
208) -> Result<OtlpMetricTranslationStrategy, (StatusCode, String)> {
209 value.parse().map_err(|_| {
210 (
211 StatusCode::BAD_REQUEST,
212 format!(
213 "`{}` header value `{}` is invalid. Expected one of: {}.",
214 GREPTIME_OTLP_METRIC_TRANSLATION_STRATEGY_HEADER_NAME,
215 value,
216 OtlpMetricTranslationStrategy::VALUES.join(", ")
217 ),
218 )
219 })
220}
221
222#[inline]
223fn string_value_from_header(
224 headers: &HeaderMap,
225 header_keys: &[&str],
226) -> Result<Option<String>, (StatusCode, String)> {
227 for header_key in header_keys {
228 if let Some(value) = headers.get(*header_key) {
229 return Some(String::from_utf8(value.as_bytes().to_vec()).map_err(|_| {
230 (
231 StatusCode::BAD_REQUEST,
232 format!("`{}` header is not valid UTF-8 string type.", header_key),
233 )
234 }))
235 .transpose();
236 }
237 }
238
239 Ok(None)
240}
241
242#[cfg(test)]
243mod tests {
244 use axum::http::Request;
245 use session::protocol_ctx::OtlpMetricTranslationStrategy;
246
247 use super::*;
248
249 #[test]
250 fn test_parse_otlp_metric_translation_strategy() {
251 let cases = [
252 (
253 "UnderscoreEscapingWithSuffixes",
254 OtlpMetricTranslationStrategy::UnderscoreEscapingWithSuffixes,
255 ),
256 (
257 "UnderscoreEscapingWithoutSuffixes",
258 OtlpMetricTranslationStrategy::UnderscoreEscapingWithoutSuffixes,
259 ),
260 (
261 "NoUTF8EscapingWithSuffixes",
262 OtlpMetricTranslationStrategy::NoUtf8EscapingWithSuffixes,
263 ),
264 (
265 "NoTranslation",
266 OtlpMetricTranslationStrategy::NoTranslation,
267 ),
268 ];
269
270 for (value, expected) in cases {
271 assert_eq!(
272 parse_otlp_metric_translation_strategy(value).unwrap(),
273 expected
274 );
275 }
276 }
277
278 #[test]
279 fn test_parse_otlp_metric_translation_strategy_rejects_invalid_value() {
280 let err = parse_otlp_metric_translation_strategy("no_translation").unwrap_err();
281 assert_eq!(err.0, StatusCode::BAD_REQUEST);
282 assert!(
283 err.1
284 .contains(GREPTIME_OTLP_METRIC_TRANSLATION_STRATEGY_HEADER_NAME)
285 );
286 assert!(err.1.contains("no_translation"));
287 assert!(err.1.contains("NoTranslation"));
288 }
289
290 #[tokio::test]
291 async fn test_otlp_metric_options_extracts_translation_strategy() {
292 let (mut parts, _) = Request::builder()
293 .header(
294 GREPTIME_OTLP_METRIC_TRANSLATION_STRATEGY_HEADER_NAME,
295 "NoTranslation",
296 )
297 .body(())
298 .unwrap()
299 .into_parts();
300
301 let opts = OtlpMetricOptions::from_request_parts(&mut parts, &())
302 .await
303 .unwrap();
304 assert_eq!(
305 opts.metric_translation_strategy,
306 OtlpMetricTranslationStrategy::NoTranslation
307 );
308 }
309
310 #[tokio::test]
311 async fn test_otlp_metric_options_rejects_invalid_translation_strategy() {
312 let (mut parts, _) = Request::builder()
313 .header(
314 GREPTIME_OTLP_METRIC_TRANSLATION_STRATEGY_HEADER_NAME,
315 "no_translation",
316 )
317 .body(())
318 .unwrap()
319 .into_parts();
320
321 let err = match OtlpMetricOptions::from_request_parts(&mut parts, &()).await {
322 Ok(_) => panic!("invalid metric translation strategy should be rejected"),
323 Err(err) => err,
324 };
325 assert_eq!(err.0, StatusCode::BAD_REQUEST);
326 assert!(
327 err.1
328 .contains(GREPTIME_OTLP_METRIC_TRANSLATION_STRATEGY_HEADER_NAME)
329 );
330 assert!(err.1.contains("no_translation"));
331 }
332}