servers/http/
extractor.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::str;
16
17use axum::extract::FromRequestParts;
18use axum::http::request::Parts;
19use axum::http::StatusCode;
20use http::HeaderMap;
21use pipeline::{GreptimePipelineParams, SelectInfo};
22
23use crate::http::header::constants::{
24    GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME, GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME,
25    GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME, GREPTIME_LOG_TABLE_NAME_HEADER_NAME,
26    GREPTIME_PIPELINE_NAME_HEADER_NAME, GREPTIME_PIPELINE_PARAMS_HEADER,
27    GREPTIME_PIPELINE_VERSION_HEADER_NAME, GREPTIME_TRACE_TABLE_NAME_HEADER_NAME,
28};
29
30/// Axum extractor for optional target log table name from HTTP header
31/// using [`GREPTIME_LOG_TABLE_NAME_HEADER_NAME`] as key.
32pub struct LogTableName(pub Option<String>);
33
34impl<S> FromRequestParts<S> for LogTableName
35where
36    S: Send + Sync,
37{
38    type Rejection = (StatusCode, String);
39
40    async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
41        let headers = &parts.headers;
42        string_value_from_header(headers, &[GREPTIME_LOG_TABLE_NAME_HEADER_NAME]).map(LogTableName)
43    }
44}
45
46/// Axum extractor for optional target trace table name from HTTP header
47/// using [`GREPTIME_TRACE_TABLE_NAME_HEADER_NAME`] as key.
48pub struct TraceTableName(pub Option<String>);
49
50impl<S> FromRequestParts<S> for TraceTableName
51where
52    S: Send + Sync,
53{
54    type Rejection = (StatusCode, String);
55
56    async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
57        let headers = &parts.headers;
58        string_value_from_header(headers, &[GREPTIME_TRACE_TABLE_NAME_HEADER_NAME])
59            .map(TraceTableName)
60    }
61}
62
63/// Axum extractor for select keys from HTTP header,
64/// to extract and uplift key-values from OTLP attributes.
65/// See [`SelectInfo`] for more details.
66pub struct SelectInfoWrapper(pub SelectInfo);
67
68impl<S> FromRequestParts<S> for SelectInfoWrapper
69where
70    S: Send + Sync,
71{
72    type Rejection = (StatusCode, String);
73
74    async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
75        let select =
76            string_value_from_header(&parts.headers, &[GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME])?;
77
78        match select {
79            Some(name) => {
80                if name.is_empty() {
81                    Ok(SelectInfoWrapper(Default::default()))
82                } else {
83                    Ok(SelectInfoWrapper(SelectInfo::from(name)))
84                }
85            }
86            None => Ok(SelectInfoWrapper(Default::default())),
87        }
88    }
89}
90
91/// Axum extractor for optional Pipeline name and version
92/// from HTTP headers.
93pub struct PipelineInfo {
94    pub pipeline_name: Option<String>,
95    pub pipeline_version: Option<String>,
96    pub pipeline_params: GreptimePipelineParams,
97}
98
99impl<S> FromRequestParts<S> for PipelineInfo
100where
101    S: Send + Sync,
102{
103    type Rejection = (StatusCode, String);
104
105    async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
106        let headers = &parts.headers;
107        let pipeline_name = string_value_from_header(
108            headers,
109            &[
110                GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME,
111                GREPTIME_PIPELINE_NAME_HEADER_NAME,
112            ],
113        )?;
114        let pipeline_version = string_value_from_header(
115            headers,
116            &[
117                GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME,
118                GREPTIME_PIPELINE_VERSION_HEADER_NAME,
119            ],
120        )?;
121        let pipeline_parameters =
122            string_value_from_header(headers, &[GREPTIME_PIPELINE_PARAMS_HEADER])?;
123
124        Ok(PipelineInfo {
125            pipeline_name,
126            pipeline_version,
127            pipeline_params: GreptimePipelineParams::from_params(pipeline_parameters.as_deref()),
128        })
129    }
130}
131
132#[inline]
133fn string_value_from_header(
134    headers: &HeaderMap,
135    header_keys: &[&str],
136) -> Result<Option<String>, (StatusCode, String)> {
137    for header_key in header_keys {
138        if let Some(value) = headers.get(*header_key) {
139            return Some(String::from_utf8(value.as_bytes().to_vec()).map_err(|_| {
140                (
141                    StatusCode::BAD_REQUEST,
142                    format!("`{}` header is not valid UTF-8 string type.", header_key),
143                )
144            }))
145            .transpose();
146        }
147    }
148
149    Ok(None)
150}