servers/http/
influxdb.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use axum::extract::{Query, State};
19use axum::http::StatusCode;
20use axum::response::IntoResponse;
21use axum::Extension;
22use common_catalog::consts::DEFAULT_SCHEMA_NAME;
23use common_grpc::precision::Precision;
24use common_telemetry::tracing;
25use session::context::{Channel, QueryContext, QueryContextRef};
26
27use crate::error::{Result, TimePrecisionSnafu};
28use crate::http::header::write_cost_header_map;
29use crate::influxdb::InfluxdbRequest;
30use crate::query_handler::InfluxdbLineProtocolHandlerRef;
31
32// https://docs.influxdata.com/influxdb/v1.8/tools/api/#ping-http-endpoint
33#[axum_macros::debug_handler]
34pub async fn influxdb_ping() -> Result<impl IntoResponse> {
35    Ok(StatusCode::NO_CONTENT)
36}
37
38// https://docs.influxdata.com/influxdb/v1.8/tools/api/#health-http-endpoint
39#[axum_macros::debug_handler]
40pub async fn influxdb_health() -> Result<impl IntoResponse> {
41    Ok(StatusCode::OK)
42}
43
44#[axum_macros::debug_handler]
45#[tracing::instrument(skip_all, fields(protocol = "influxdb", request_type = "write_v1"))]
46pub async fn influxdb_write_v1(
47    State(handler): State<InfluxdbLineProtocolHandlerRef>,
48    Query(mut params): Query<HashMap<String, String>>,
49    Extension(mut query_ctx): Extension<QueryContext>,
50    lines: String,
51) -> Result<impl IntoResponse> {
52    let db = params
53        .remove("db")
54        .unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string());
55    query_ctx.set_channel(Channel::Influx);
56    let query_ctx = Arc::new(query_ctx);
57
58    let precision = params
59        .get("precision")
60        .map(|val| parse_time_precision(val))
61        .transpose()?;
62
63    influxdb_write(&db, precision, lines, handler, query_ctx).await
64}
65
66#[axum_macros::debug_handler]
67#[tracing::instrument(skip_all, fields(protocol = "influxdb", request_type = "write_v2"))]
68pub async fn influxdb_write_v2(
69    State(handler): State<InfluxdbLineProtocolHandlerRef>,
70    Query(mut params): Query<HashMap<String, String>>,
71    Extension(mut query_ctx): Extension<QueryContext>,
72    lines: String,
73) -> Result<impl IntoResponse> {
74    let db = match (params.remove("db"), params.remove("bucket")) {
75        (_, Some(bucket)) => bucket.clone(),
76        (Some(db), None) => db.clone(),
77        _ => DEFAULT_SCHEMA_NAME.to_string(),
78    };
79    query_ctx.set_channel(Channel::Influx);
80    let query_ctx = Arc::new(query_ctx);
81
82    let precision = params
83        .get("precision")
84        .map(|val| parse_time_precision(val))
85        .transpose()?;
86
87    influxdb_write(&db, precision, lines, handler, query_ctx).await
88}
89
90pub async fn influxdb_write(
91    db: &str,
92    precision: Option<Precision>,
93    lines: String,
94    handler: InfluxdbLineProtocolHandlerRef,
95    ctx: QueryContextRef,
96) -> Result<impl IntoResponse> {
97    let _timer = crate::metrics::METRIC_HTTP_INFLUXDB_WRITE_ELAPSED
98        .with_label_values(&[db])
99        .start_timer();
100
101    let request = InfluxdbRequest { precision, lines };
102    let output = handler.exec(request, ctx).await?;
103
104    Ok((
105        StatusCode::NO_CONTENT,
106        write_cost_header_map(output.meta.cost),
107    ))
108}
109
110fn parse_time_precision(value: &str) -> Result<Precision> {
111    // Precision conversion needs to be compatible with influxdb v1 v2 api.
112    // For details, see the Influxdb documents.
113    // https://docs.influxdata.com/influxdb/v1.8/tools/api/#apiv2write-http-endpoint
114    // https://docs.influxdata.com/influxdb/v1.8/tools/api/#write-http-endpoint
115    match value {
116        "n" | "ns" => Ok(Precision::Nanosecond),
117        "u" | "us" => Ok(Precision::Microsecond),
118        "ms" => Ok(Precision::Millisecond),
119        "s" => Ok(Precision::Second),
120        "m" => Ok(Precision::Minute),
121        "h" => Ok(Precision::Hour),
122        unknown => TimePrecisionSnafu {
123            name: unknown.to_string(),
124        }
125        .fail(),
126    }
127}
128
129#[cfg(test)]
130mod tests {
131    use common_grpc::precision::Precision;
132
133    use crate::http::influxdb::parse_time_precision;
134
135    #[test]
136    fn test_parse_time_precision() {
137        assert_eq!(Precision::Nanosecond, parse_time_precision("n").unwrap());
138        assert_eq!(Precision::Nanosecond, parse_time_precision("ns").unwrap());
139        assert_eq!(Precision::Microsecond, parse_time_precision("u").unwrap());
140        assert_eq!(Precision::Microsecond, parse_time_precision("us").unwrap());
141        assert_eq!(Precision::Millisecond, parse_time_precision("ms").unwrap());
142        assert_eq!(Precision::Second, parse_time_precision("s").unwrap());
143        assert_eq!(Precision::Minute, parse_time_precision("m").unwrap());
144        assert_eq!(Precision::Hour, parse_time_precision("h").unwrap());
145        assert!(parse_time_precision("unknown").is_err());
146    }
147}