servers/http/
extractor.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::str;
16
17use ahash::HashSet;
18use axum::extract::FromRequestParts;
19use axum::http::request::Parts;
20use axum::http::StatusCode;
21use http::HeaderMap;
22use pipeline::{truthy, GreptimePipelineParams, SelectInfo};
23
24use crate::http::header::constants::{
25    GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME, GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME,
26    GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME, GREPTIME_LOG_TABLE_NAME_HEADER_NAME,
27    GREPTIME_OTLP_METRIC_IGNORE_RESOURCE_ATTRS_HEADER_NAME,
28    GREPTIME_OTLP_METRIC_PROMOTE_ALL_RESOURCE_ATTRS_HEADER_NAME,
29    GREPTIME_OTLP_METRIC_PROMOTE_RESOURCE_ATTRS_HEADER_NAME,
30    GREPTIME_OTLP_METRIC_PROMOTE_SCOPE_ATTRS_HEADER_NAME, GREPTIME_PIPELINE_NAME_HEADER_NAME,
31    GREPTIME_PIPELINE_PARAMS_HEADER, GREPTIME_PIPELINE_VERSION_HEADER_NAME,
32    GREPTIME_TRACE_TABLE_NAME_HEADER_NAME,
33};
34
35/// Axum extractor for optional target log table name from HTTP header
36/// using [`GREPTIME_LOG_TABLE_NAME_HEADER_NAME`] as key.
37pub struct LogTableName(pub Option<String>);
38
39impl<S> FromRequestParts<S> for LogTableName
40where
41    S: Send + Sync,
42{
43    type Rejection = (StatusCode, String);
44
45    async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
46        let headers = &parts.headers;
47        string_value_from_header(headers, &[GREPTIME_LOG_TABLE_NAME_HEADER_NAME]).map(LogTableName)
48    }
49}
50
51/// Axum extractor for optional target trace table name from HTTP header
52/// using [`GREPTIME_TRACE_TABLE_NAME_HEADER_NAME`] as key.
53pub struct TraceTableName(pub Option<String>);
54
55impl<S> FromRequestParts<S> for TraceTableName
56where
57    S: Send + Sync,
58{
59    type Rejection = (StatusCode, String);
60
61    async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
62        let headers = &parts.headers;
63        string_value_from_header(headers, &[GREPTIME_TRACE_TABLE_NAME_HEADER_NAME])
64            .map(TraceTableName)
65    }
66}
67
68/// Axum extractor for select keys from HTTP header,
69/// to extract and uplift key-values from OTLP attributes.
70/// See [`SelectInfo`] for more details.
71pub struct SelectInfoWrapper(pub SelectInfo);
72
73impl<S> FromRequestParts<S> for SelectInfoWrapper
74where
75    S: Send + Sync,
76{
77    type Rejection = (StatusCode, String);
78
79    async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
80        let select =
81            string_value_from_header(&parts.headers, &[GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME])?;
82
83        match select {
84            Some(name) => {
85                if name.is_empty() {
86                    Ok(SelectInfoWrapper(Default::default()))
87                } else {
88                    Ok(SelectInfoWrapper(SelectInfo::from(name)))
89                }
90            }
91            None => Ok(SelectInfoWrapper(Default::default())),
92        }
93    }
94}
95
96/// Axum extractor for optional Pipeline name and version
97/// from HTTP headers.
98pub struct PipelineInfo {
99    pub pipeline_name: Option<String>,
100    pub pipeline_version: Option<String>,
101    pub pipeline_params: GreptimePipelineParams,
102}
103
104impl<S> FromRequestParts<S> for PipelineInfo
105where
106    S: Send + Sync,
107{
108    type Rejection = (StatusCode, String);
109
110    async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
111        let headers = &parts.headers;
112        let pipeline_name = string_value_from_header(
113            headers,
114            &[
115                GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME,
116                GREPTIME_PIPELINE_NAME_HEADER_NAME,
117            ],
118        )?;
119        let pipeline_version = string_value_from_header(
120            headers,
121            &[
122                GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME,
123                GREPTIME_PIPELINE_VERSION_HEADER_NAME,
124            ],
125        )?;
126        let pipeline_parameters =
127            string_value_from_header(headers, &[GREPTIME_PIPELINE_PARAMS_HEADER])?;
128
129        Ok(PipelineInfo {
130            pipeline_name,
131            pipeline_version,
132            pipeline_params: GreptimePipelineParams::from_params(pipeline_parameters.as_deref()),
133        })
134    }
135}
136
137/// Axum extractor for OTLP metric options from HTTP headers.
138pub struct OtlpMetricOptions {
139    /// Persist all resource attributes to the table
140    /// If false, persist selected attributes. See [`promote_resource_attrs`].
141    pub promote_all_resource_attrs: bool,
142
143    /// If `promote_all_resource_attrs` is true, then the list is an exclude list from `ignore_resource_attrs`.
144    /// If `promote_all_resource_attrs` is false, then this list is a include list from `promote_resource_attrs`.
145    pub resource_attrs: HashSet<String>,
146
147    /// Persist scope attributes to the table
148    /// If false, persist none
149    pub promote_scope_attrs: bool,
150}
151
152impl<S> FromRequestParts<S> for OtlpMetricOptions
153where
154    S: Send + Sync,
155{
156    type Rejection = (StatusCode, String);
157
158    async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
159        let headers = &parts.headers;
160        let promote_all_resource_attrs = string_value_from_header(
161            headers,
162            &[GREPTIME_OTLP_METRIC_PROMOTE_ALL_RESOURCE_ATTRS_HEADER_NAME],
163        )?
164        .map(truthy)
165        .unwrap_or(false);
166
167        let attr_header = if promote_all_resource_attrs {
168            [GREPTIME_OTLP_METRIC_IGNORE_RESOURCE_ATTRS_HEADER_NAME]
169        } else {
170            [GREPTIME_OTLP_METRIC_PROMOTE_RESOURCE_ATTRS_HEADER_NAME]
171        };
172
173        let resource_attrs = string_value_from_header(headers, &attr_header)?
174            .map(|s| s.split(';').map(|s| s.trim().to_string()).collect())
175            .unwrap_or_default();
176
177        let promote_scope_attrs = string_value_from_header(
178            headers,
179            &[GREPTIME_OTLP_METRIC_PROMOTE_SCOPE_ATTRS_HEADER_NAME],
180        )?
181        .map(truthy)
182        .unwrap_or(false);
183
184        Ok(OtlpMetricOptions {
185            promote_all_resource_attrs,
186            resource_attrs,
187            promote_scope_attrs,
188        })
189    }
190}
191
192#[inline]
193fn string_value_from_header(
194    headers: &HeaderMap,
195    header_keys: &[&str],
196) -> Result<Option<String>, (StatusCode, String)> {
197    for header_key in header_keys {
198        if let Some(value) = headers.get(*header_key) {
199            return Some(String::from_utf8(value.as_bytes().to_vec()).map_err(|_| {
200                (
201                    StatusCode::BAD_REQUEST,
202                    format!("`{}` header is not valid UTF-8 string type.", header_key),
203                )
204            }))
205            .transpose();
206        }
207    }
208
209    Ok(None)
210}