1use std::collections::HashMap;
16use std::sync::Arc;
17
18use axum::extract::{Query, State};
19use axum::http::StatusCode;
20use axum::response::IntoResponse;
21use axum::Extension;
22use common_catalog::consts::DEFAULT_SCHEMA_NAME;
23use common_grpc::precision::Precision;
24use common_telemetry::tracing;
25use session::context::{Channel, QueryContext, QueryContextRef};
26
27use crate::error::{Result, TimePrecisionSnafu};
28use crate::http::header::write_cost_header_map;
29use crate::influxdb::InfluxdbRequest;
30use crate::query_handler::InfluxdbLineProtocolHandlerRef;
31
32#[axum_macros::debug_handler]
34pub async fn influxdb_ping() -> Result<impl IntoResponse> {
35 Ok(StatusCode::NO_CONTENT)
36}
37
38#[axum_macros::debug_handler]
40pub async fn influxdb_health() -> Result<impl IntoResponse> {
41 Ok(StatusCode::OK)
42}
43
44#[axum_macros::debug_handler]
45#[tracing::instrument(skip_all, fields(protocol = "influxdb", request_type = "write_v1"))]
46pub async fn influxdb_write_v1(
47 State(handler): State<InfluxdbLineProtocolHandlerRef>,
48 Query(mut params): Query<HashMap<String, String>>,
49 Extension(mut query_ctx): Extension<QueryContext>,
50 lines: String,
51) -> Result<impl IntoResponse> {
52 let db = params
53 .remove("db")
54 .unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string());
55 query_ctx.set_channel(Channel::Influx);
56 let query_ctx = Arc::new(query_ctx);
57
58 let precision = params
59 .get("precision")
60 .map(|val| parse_time_precision(val))
61 .transpose()?;
62
63 influxdb_write(&db, precision, lines, handler, query_ctx).await
64}
65
66#[axum_macros::debug_handler]
67#[tracing::instrument(skip_all, fields(protocol = "influxdb", request_type = "write_v2"))]
68pub async fn influxdb_write_v2(
69 State(handler): State<InfluxdbLineProtocolHandlerRef>,
70 Query(mut params): Query<HashMap<String, String>>,
71 Extension(mut query_ctx): Extension<QueryContext>,
72 lines: String,
73) -> Result<impl IntoResponse> {
74 let db = match (params.remove("db"), params.remove("bucket")) {
75 (_, Some(bucket)) => bucket.clone(),
76 (Some(db), None) => db.clone(),
77 _ => DEFAULT_SCHEMA_NAME.to_string(),
78 };
79 query_ctx.set_channel(Channel::Influx);
80 let query_ctx = Arc::new(query_ctx);
81
82 let precision = params
83 .get("precision")
84 .map(|val| parse_time_precision(val))
85 .transpose()?;
86
87 influxdb_write(&db, precision, lines, handler, query_ctx).await
88}
89
90pub async fn influxdb_write(
91 db: &str,
92 precision: Option<Precision>,
93 lines: String,
94 handler: InfluxdbLineProtocolHandlerRef,
95 ctx: QueryContextRef,
96) -> Result<impl IntoResponse> {
97 let _timer = crate::metrics::METRIC_HTTP_INFLUXDB_WRITE_ELAPSED
98 .with_label_values(&[db])
99 .start_timer();
100
101 let request = InfluxdbRequest { precision, lines };
102 let output = handler.exec(request, ctx).await?;
103
104 Ok((
105 StatusCode::NO_CONTENT,
106 write_cost_header_map(output.meta.cost),
107 ))
108}
109
110fn parse_time_precision(value: &str) -> Result<Precision> {
111 match value {
116 "n" | "ns" => Ok(Precision::Nanosecond),
117 "u" | "us" => Ok(Precision::Microsecond),
118 "ms" => Ok(Precision::Millisecond),
119 "s" => Ok(Precision::Second),
120 "m" => Ok(Precision::Minute),
121 "h" => Ok(Precision::Hour),
122 unknown => TimePrecisionSnafu {
123 name: unknown.to_string(),
124 }
125 .fail(),
126 }
127}
128
129#[cfg(test)]
130mod tests {
131 use common_grpc::precision::Precision;
132
133 use crate::http::influxdb::parse_time_precision;
134
135 #[test]
136 fn test_parse_time_precision() {
137 assert_eq!(Precision::Nanosecond, parse_time_precision("n").unwrap());
138 assert_eq!(Precision::Nanosecond, parse_time_precision("ns").unwrap());
139 assert_eq!(Precision::Microsecond, parse_time_precision("u").unwrap());
140 assert_eq!(Precision::Microsecond, parse_time_precision("us").unwrap());
141 assert_eq!(Precision::Millisecond, parse_time_precision("ms").unwrap());
142 assert_eq!(Precision::Second, parse_time_precision("s").unwrap());
143 assert_eq!(Precision::Minute, parse_time_precision("m").unwrap());
144 assert_eq!(Precision::Hour, parse_time_precision("h").unwrap());
145 assert!(parse_time_precision("unknown").is_err());
146 }
147}