servers/http/
extractor.rs1use core::str;
16
17use ahash::HashSet;
18use axum::extract::FromRequestParts;
19use axum::http::request::Parts;
20use axum::http::StatusCode;
21use http::HeaderMap;
22use pipeline::{truthy, GreptimePipelineParams, SelectInfo};
23
24use crate::http::header::constants::{
25 GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME, GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME,
26 GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME, GREPTIME_LOG_TABLE_NAME_HEADER_NAME,
27 GREPTIME_OTLP_METRIC_IGNORE_RESOURCE_ATTRS_HEADER_NAME,
28 GREPTIME_OTLP_METRIC_PROMOTE_ALL_RESOURCE_ATTRS_HEADER_NAME,
29 GREPTIME_OTLP_METRIC_PROMOTE_RESOURCE_ATTRS_HEADER_NAME,
30 GREPTIME_OTLP_METRIC_PROMOTE_SCOPE_ATTRS_HEADER_NAME, GREPTIME_PIPELINE_NAME_HEADER_NAME,
31 GREPTIME_PIPELINE_PARAMS_HEADER, GREPTIME_PIPELINE_VERSION_HEADER_NAME,
32 GREPTIME_TRACE_TABLE_NAME_HEADER_NAME,
33};
34
35pub struct LogTableName(pub Option<String>);
38
39impl<S> FromRequestParts<S> for LogTableName
40where
41 S: Send + Sync,
42{
43 type Rejection = (StatusCode, String);
44
45 async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
46 let headers = &parts.headers;
47 string_value_from_header(headers, &[GREPTIME_LOG_TABLE_NAME_HEADER_NAME]).map(LogTableName)
48 }
49}
50
51pub struct TraceTableName(pub Option<String>);
54
55impl<S> FromRequestParts<S> for TraceTableName
56where
57 S: Send + Sync,
58{
59 type Rejection = (StatusCode, String);
60
61 async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
62 let headers = &parts.headers;
63 string_value_from_header(headers, &[GREPTIME_TRACE_TABLE_NAME_HEADER_NAME])
64 .map(TraceTableName)
65 }
66}
67
68pub struct SelectInfoWrapper(pub SelectInfo);
72
73impl<S> FromRequestParts<S> for SelectInfoWrapper
74where
75 S: Send + Sync,
76{
77 type Rejection = (StatusCode, String);
78
79 async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
80 let select =
81 string_value_from_header(&parts.headers, &[GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME])?;
82
83 match select {
84 Some(name) => {
85 if name.is_empty() {
86 Ok(SelectInfoWrapper(Default::default()))
87 } else {
88 Ok(SelectInfoWrapper(SelectInfo::from(name)))
89 }
90 }
91 None => Ok(SelectInfoWrapper(Default::default())),
92 }
93 }
94}
95
96pub struct PipelineInfo {
99 pub pipeline_name: Option<String>,
100 pub pipeline_version: Option<String>,
101 pub pipeline_params: GreptimePipelineParams,
102}
103
104impl<S> FromRequestParts<S> for PipelineInfo
105where
106 S: Send + Sync,
107{
108 type Rejection = (StatusCode, String);
109
110 async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
111 let headers = &parts.headers;
112 let pipeline_name = string_value_from_header(
113 headers,
114 &[
115 GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME,
116 GREPTIME_PIPELINE_NAME_HEADER_NAME,
117 ],
118 )?;
119 let pipeline_version = string_value_from_header(
120 headers,
121 &[
122 GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME,
123 GREPTIME_PIPELINE_VERSION_HEADER_NAME,
124 ],
125 )?;
126 let pipeline_parameters =
127 string_value_from_header(headers, &[GREPTIME_PIPELINE_PARAMS_HEADER])?;
128
129 Ok(PipelineInfo {
130 pipeline_name,
131 pipeline_version,
132 pipeline_params: GreptimePipelineParams::from_params(pipeline_parameters.as_deref()),
133 })
134 }
135}
136
137pub struct OtlpMetricOptions {
139 pub promote_all_resource_attrs: bool,
142
143 pub resource_attrs: HashSet<String>,
146
147 pub promote_scope_attrs: bool,
150}
151
152impl<S> FromRequestParts<S> for OtlpMetricOptions
153where
154 S: Send + Sync,
155{
156 type Rejection = (StatusCode, String);
157
158 async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
159 let headers = &parts.headers;
160 let promote_all_resource_attrs = string_value_from_header(
161 headers,
162 &[GREPTIME_OTLP_METRIC_PROMOTE_ALL_RESOURCE_ATTRS_HEADER_NAME],
163 )?
164 .map(truthy)
165 .unwrap_or(false);
166
167 let attr_header = if promote_all_resource_attrs {
168 [GREPTIME_OTLP_METRIC_IGNORE_RESOURCE_ATTRS_HEADER_NAME]
169 } else {
170 [GREPTIME_OTLP_METRIC_PROMOTE_RESOURCE_ATTRS_HEADER_NAME]
171 };
172
173 let resource_attrs = string_value_from_header(headers, &attr_header)?
174 .map(|s| s.split(';').map(|s| s.trim().to_string()).collect())
175 .unwrap_or_default();
176
177 let promote_scope_attrs = string_value_from_header(
178 headers,
179 &[GREPTIME_OTLP_METRIC_PROMOTE_SCOPE_ATTRS_HEADER_NAME],
180 )?
181 .map(truthy)
182 .unwrap_or(false);
183
184 Ok(OtlpMetricOptions {
185 promote_all_resource_attrs,
186 resource_attrs,
187 promote_scope_attrs,
188 })
189 }
190}
191
192#[inline]
193fn string_value_from_header(
194 headers: &HeaderMap,
195 header_keys: &[&str],
196) -> Result<Option<String>, (StatusCode, String)> {
197 for header_key in header_keys {
198 if let Some(value) = headers.get(*header_key) {
199 return Some(String::from_utf8(value.as_bytes().to_vec()).map_err(|_| {
200 (
201 StatusCode::BAD_REQUEST,
202 format!("`{}` header is not valid UTF-8 string type.", header_key),
203 )
204 }))
205 .transpose();
206 }
207 }
208
209 Ok(None)
210}