servers/http/
extractor.rs1use core::str;
16
17use axum::extract::FromRequestParts;
18use axum::http::request::Parts;
19use axum::http::StatusCode;
20use http::HeaderMap;
21use pipeline::{GreptimePipelineParams, SelectInfo};
22
23use crate::http::header::constants::{
24 GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME, GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME,
25 GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME, GREPTIME_LOG_TABLE_NAME_HEADER_NAME,
26 GREPTIME_PIPELINE_NAME_HEADER_NAME, GREPTIME_PIPELINE_PARAMS_HEADER,
27 GREPTIME_PIPELINE_VERSION_HEADER_NAME, GREPTIME_TRACE_TABLE_NAME_HEADER_NAME,
28};
29
30pub struct LogTableName(pub Option<String>);
33
34impl<S> FromRequestParts<S> for LogTableName
35where
36 S: Send + Sync,
37{
38 type Rejection = (StatusCode, String);
39
40 async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
41 let headers = &parts.headers;
42 string_value_from_header(headers, &[GREPTIME_LOG_TABLE_NAME_HEADER_NAME]).map(LogTableName)
43 }
44}
45
46pub struct TraceTableName(pub Option<String>);
49
50impl<S> FromRequestParts<S> for TraceTableName
51where
52 S: Send + Sync,
53{
54 type Rejection = (StatusCode, String);
55
56 async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
57 let headers = &parts.headers;
58 string_value_from_header(headers, &[GREPTIME_TRACE_TABLE_NAME_HEADER_NAME])
59 .map(TraceTableName)
60 }
61}
62
63pub struct SelectInfoWrapper(pub SelectInfo);
67
68impl<S> FromRequestParts<S> for SelectInfoWrapper
69where
70 S: Send + Sync,
71{
72 type Rejection = (StatusCode, String);
73
74 async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
75 let select =
76 string_value_from_header(&parts.headers, &[GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME])?;
77
78 match select {
79 Some(name) => {
80 if name.is_empty() {
81 Ok(SelectInfoWrapper(Default::default()))
82 } else {
83 Ok(SelectInfoWrapper(SelectInfo::from(name)))
84 }
85 }
86 None => Ok(SelectInfoWrapper(Default::default())),
87 }
88 }
89}
90
91pub struct PipelineInfo {
94 pub pipeline_name: Option<String>,
95 pub pipeline_version: Option<String>,
96 pub pipeline_params: GreptimePipelineParams,
97}
98
99impl<S> FromRequestParts<S> for PipelineInfo
100where
101 S: Send + Sync,
102{
103 type Rejection = (StatusCode, String);
104
105 async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
106 let headers = &parts.headers;
107 let pipeline_name = string_value_from_header(
108 headers,
109 &[
110 GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME,
111 GREPTIME_PIPELINE_NAME_HEADER_NAME,
112 ],
113 )?;
114 let pipeline_version = string_value_from_header(
115 headers,
116 &[
117 GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME,
118 GREPTIME_PIPELINE_VERSION_HEADER_NAME,
119 ],
120 )?;
121 let pipeline_parameters =
122 string_value_from_header(headers, &[GREPTIME_PIPELINE_PARAMS_HEADER])?;
123
124 Ok(PipelineInfo {
125 pipeline_name,
126 pipeline_version,
127 pipeline_params: GreptimePipelineParams::from_params(pipeline_parameters.as_deref()),
128 })
129 }
130}
131
132#[inline]
133fn string_value_from_header(
134 headers: &HeaderMap,
135 header_keys: &[&str],
136) -> Result<Option<String>, (StatusCode, String)> {
137 for header_key in header_keys {
138 if let Some(value) = headers.get(*header_key) {
139 return Some(String::from_utf8(value.as_bytes().to_vec()).map_err(|_| {
140 (
141 StatusCode::BAD_REQUEST,
142 format!("`{}` header is not valid UTF-8 string type.", header_key),
143 )
144 }))
145 .transpose();
146 }
147 }
148
149 Ok(None)
150}