frontend/instance/
influxdb.rs1use api::v1::value::ValueData;
16use api::v1::{ColumnDataType, RowInsertRequests, SemanticType};
17use async_trait::async_trait;
18use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
19use catalog::CatalogManagerRef;
20use client::Output;
21use common_error::ext::BoxedError;
22use common_time::Timestamp;
23use common_time::timestamp::TimeUnit;
24use servers::error::{
25 AuthSnafu, CatalogSnafu, Error, OtherSnafu, TimestampOverflowSnafu, UnexpectedResultSnafu,
26};
27use servers::influxdb::InfluxdbRequest;
28use servers::interceptor::{LineProtocolInterceptor, LineProtocolInterceptorRef};
29use servers::query_handler::InfluxdbLineProtocolHandler;
30use session::context::QueryContextRef;
31use snafu::{OptionExt, ResultExt};
32
33use crate::instance::Instance;
34
35#[async_trait]
36impl InfluxdbLineProtocolHandler for Instance {
37 async fn exec(
38 &self,
39 request: InfluxdbRequest,
40 ctx: QueryContextRef,
41 ) -> servers::error::Result<Output> {
42 self.plugins
43 .get::<PermissionCheckerRef>()
44 .as_ref()
45 .check_permission(ctx.current_user(), PermissionReq::LineProtocol)
46 .context(AuthSnafu)?;
47
48 let interceptor_ref = self.plugins.get::<LineProtocolInterceptorRef<Error>>();
49 interceptor_ref.pre_execute(&request.lines, ctx.clone())?;
50
51 let requests = request.try_into()?;
52
53 let aligner = InfluxdbLineTimestampAligner {
54 catalog_manager: self.catalog_manager(),
55 };
56 let requests = aligner.align_timestamps(requests, &ctx).await?;
57
58 let requests = interceptor_ref
59 .post_lines_conversion(requests, ctx.clone())
60 .await?;
61
62 let _guard = if let Some(limiter) = &self.limiter {
63 Some(
64 limiter
65 .limit_row_inserts(&requests)
66 .await
67 .map_err(BoxedError::new)
68 .context(OtherSnafu)?,
69 )
70 } else {
71 None
72 };
73
74 self.handle_influx_row_inserts(requests, ctx)
75 .await
76 .map_err(BoxedError::new)
77 .context(servers::error::ExecuteGrpcQuerySnafu)
78 }
79}
80
81struct InfluxdbLineTimestampAligner<'a> {
84 catalog_manager: &'a CatalogManagerRef,
85}
86
87impl InfluxdbLineTimestampAligner<'_> {
88 async fn align_timestamps(
89 &self,
90 requests: RowInsertRequests,
91 query_context: &QueryContextRef,
92 ) -> servers::error::Result<RowInsertRequests> {
93 let mut inserts = requests.inserts;
94 for insert in inserts.iter_mut() {
95 let Some(rows) = &mut insert.rows else {
96 continue;
97 };
98
99 let Some(target_time_unit) = self
100 .catalog_manager
101 .table(
102 query_context.current_catalog(),
103 &query_context.current_schema(),
104 &insert.table_name,
105 Some(query_context),
106 )
107 .await
108 .context(CatalogSnafu)?
109 .map(|x| x.schema())
110 .and_then(|schema| {
111 schema.timestamp_column().map(|col| {
112 col.data_type
113 .as_timestamp()
114 .expect("Time index column is not of timestamp type?!")
115 .unit()
116 })
117 })
118 else {
119 continue;
120 };
121
122 let target_timestamp_type = match target_time_unit {
123 TimeUnit::Second => ColumnDataType::TimestampSecond,
124 TimeUnit::Millisecond => ColumnDataType::TimestampMillisecond,
125 TimeUnit::Microsecond => ColumnDataType::TimestampMicrosecond,
126 TimeUnit::Nanosecond => ColumnDataType::TimestampNanosecond,
127 };
128 let Some(to_be_aligned) = rows.schema.iter().enumerate().find_map(|(i, x)| {
129 if x.semantic_type() == SemanticType::Timestamp
130 && x.datatype() != target_timestamp_type
131 {
132 Some(i)
133 } else {
134 None
135 }
136 }) else {
137 continue;
138 };
139
140 rows.schema[to_be_aligned].datatype = target_timestamp_type as i32;
143
144 for row in rows.rows.iter_mut() {
145 let Some(time_value) = row
146 .values
147 .get_mut(to_be_aligned)
148 .and_then(|x| x.value_data.as_mut())
149 else {
150 continue;
151 };
152 *time_value = align_time_unit(time_value, target_time_unit)?;
153 }
154 }
155 Ok(RowInsertRequests { inserts })
156 }
157}
158
159fn align_time_unit(value: &ValueData, target: TimeUnit) -> servers::error::Result<ValueData> {
160 let timestamp = match value {
161 ValueData::TimestampSecondValue(x) => Timestamp::new_second(*x),
162 ValueData::TimestampMillisecondValue(x) => Timestamp::new_millisecond(*x),
163 ValueData::TimestampMicrosecondValue(x) => Timestamp::new_microsecond(*x),
164 ValueData::TimestampNanosecondValue(x) => Timestamp::new_nanosecond(*x),
165 _ => {
166 return UnexpectedResultSnafu {
167 reason: format!("Timestamp value '{:?}' is not of timestamp type!", value),
168 }
169 .fail();
170 }
171 };
172
173 let timestamp = timestamp
174 .convert_to(target)
175 .with_context(|| TimestampOverflowSnafu {
176 error: format!("{:?} convert to {}", timestamp, target),
177 })?;
178
179 Ok(match target {
180 TimeUnit::Second => ValueData::TimestampSecondValue(timestamp.value()),
181 TimeUnit::Millisecond => ValueData::TimestampMillisecondValue(timestamp.value()),
182 TimeUnit::Microsecond => ValueData::TimestampMicrosecondValue(timestamp.value()),
183 TimeUnit::Nanosecond => ValueData::TimestampNanosecondValue(timestamp.value()),
184 })
185}