servers/http/
extractor.rsuse core::str;
use std::result::Result as StdResult;
use axum::async_trait;
use axum::extract::FromRequestParts;
use axum::http::request::Parts;
use axum::http::StatusCode;
use http::HeaderMap;
use pipeline::SelectInfo;
use crate::http::header::constants::{
GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME, GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME,
GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME, GREPTIME_LOG_TABLE_NAME_HEADER_NAME,
GREPTIME_TRACE_TABLE_NAME_HEADER_NAME,
};
pub struct LogTableName(pub Option<String>);
#[async_trait]
impl<S> FromRequestParts<S> for LogTableName
where
S: Send + Sync,
{
type Rejection = (StatusCode, String);
async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult<Self, Self::Rejection> {
let headers = &parts.headers;
string_value_from_header(headers, GREPTIME_LOG_TABLE_NAME_HEADER_NAME).map(LogTableName)
}
}
pub struct TraceTableName(pub Option<String>);
#[async_trait]
impl<S> FromRequestParts<S> for TraceTableName
where
S: Send + Sync,
{
type Rejection = (StatusCode, String);
async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult<Self, Self::Rejection> {
let headers = &parts.headers;
string_value_from_header(headers, GREPTIME_TRACE_TABLE_NAME_HEADER_NAME).map(TraceTableName)
}
}
pub struct SelectInfoWrapper(pub SelectInfo);
#[async_trait]
impl<S> FromRequestParts<S> for SelectInfoWrapper
where
S: Send + Sync,
{
type Rejection = (StatusCode, String);
async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult<Self, Self::Rejection> {
let select =
string_value_from_header(&parts.headers, GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME)?;
match select {
Some(name) => {
if name.is_empty() {
Ok(SelectInfoWrapper(Default::default()))
} else {
Ok(SelectInfoWrapper(SelectInfo::from(name)))
}
}
None => Ok(SelectInfoWrapper(Default::default())),
}
}
}
pub struct PipelineInfo {
pub pipeline_name: Option<String>,
pub pipeline_version: Option<String>,
}
#[async_trait]
impl<S> FromRequestParts<S> for PipelineInfo
where
S: Send + Sync,
{
type Rejection = (StatusCode, String);
async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult<Self, Self::Rejection> {
let headers = &parts.headers;
let pipeline_name =
string_value_from_header(headers, GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME)?;
let pipeline_version =
string_value_from_header(headers, GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME)?;
match (pipeline_name, pipeline_version) {
(Some(name), Some(version)) => Ok(PipelineInfo {
pipeline_name: Some(name),
pipeline_version: Some(version),
}),
(None, _) => Ok(PipelineInfo {
pipeline_name: None,
pipeline_version: None,
}),
(Some(name), None) => Ok(PipelineInfo {
pipeline_name: Some(name),
pipeline_version: None,
}),
}
}
}
#[inline]
fn string_value_from_header(
headers: &HeaderMap,
header_key: &str,
) -> StdResult<Option<String>, (StatusCode, String)> {
headers
.get(header_key)
.map(|value| {
String::from_utf8(value.as_bytes().to_vec()).map_err(|_| {
(
StatusCode::BAD_REQUEST,
format!("`{}` header is not valid UTF-8 string type.", header_key),
)
})
})
.transpose()
}