frontend/instance/
influxdb.rs1use api::v1::value::ValueData;
16use api::v1::{ColumnDataType, RowInsertRequests, SemanticType};
17use async_trait::async_trait;
18use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
19use catalog::CatalogManagerRef;
20use client::Output;
21use common_error::ext::BoxedError;
22use common_time::Timestamp;
23use common_time::timestamp::TimeUnit;
24use servers::error::{AuthSnafu, Error, TimestampOverflowSnafu, UnexpectedResultSnafu};
25use servers::influxdb::InfluxdbRequest;
26use servers::interceptor::{LineProtocolInterceptor, LineProtocolInterceptorRef};
27use servers::query_handler::InfluxdbLineProtocolHandler;
28use session::context::QueryContextRef;
29use snafu::{OptionExt, ResultExt};
30
31use crate::instance::Instance;
32
33#[async_trait]
34impl InfluxdbLineProtocolHandler for Instance {
35 async fn exec(
36 &self,
37 request: InfluxdbRequest,
38 ctx: QueryContextRef,
39 ) -> servers::error::Result<Output> {
40 self.plugins
41 .get::<PermissionCheckerRef>()
42 .as_ref()
43 .check_permission(ctx.current_user(), PermissionReq::LineProtocol)
44 .context(AuthSnafu)?;
45
46 let interceptor_ref = self.plugins.get::<LineProtocolInterceptorRef<Error>>();
47 interceptor_ref.pre_execute(&request.lines, ctx.clone())?;
48
49 let requests = request.try_into()?;
50
51 let aligner = InfluxdbLineTimestampAligner {
52 catalog_manager: self.catalog_manager(),
53 };
54 let requests = aligner.align_timestamps(requests, &ctx).await?;
55
56 let requests = interceptor_ref
57 .post_lines_conversion(requests, ctx.clone())
58 .await?;
59
60 self.handle_influx_row_inserts(requests, ctx)
61 .await
62 .map_err(BoxedError::new)
63 .context(servers::error::ExecuteGrpcQuerySnafu)
64 }
65}
66
67struct InfluxdbLineTimestampAligner<'a> {
70 catalog_manager: &'a CatalogManagerRef,
71}
72
73impl InfluxdbLineTimestampAligner<'_> {
74 async fn align_timestamps(
75 &self,
76 requests: RowInsertRequests,
77 query_context: &QueryContextRef,
78 ) -> servers::error::Result<RowInsertRequests> {
79 let mut inserts = requests.inserts;
80 for insert in inserts.iter_mut() {
81 let Some(rows) = &mut insert.rows else {
82 continue;
83 };
84
85 let Some(target_time_unit) = self
86 .catalog_manager
87 .table(
88 query_context.current_catalog(),
89 &query_context.current_schema(),
90 &insert.table_name,
91 Some(query_context),
92 )
93 .await?
94 .map(|x| x.schema())
95 .and_then(|schema| {
96 schema.timestamp_column().map(|col| {
97 col.data_type
98 .as_timestamp()
99 .expect("Time index column is not of timestamp type?!")
100 .unit()
101 })
102 })
103 else {
104 continue;
105 };
106
107 let target_timestamp_type = match target_time_unit {
108 TimeUnit::Second => ColumnDataType::TimestampSecond,
109 TimeUnit::Millisecond => ColumnDataType::TimestampMillisecond,
110 TimeUnit::Microsecond => ColumnDataType::TimestampMicrosecond,
111 TimeUnit::Nanosecond => ColumnDataType::TimestampNanosecond,
112 };
113 let Some(to_be_aligned) = rows.schema.iter().enumerate().find_map(|(i, x)| {
114 if x.semantic_type() == SemanticType::Timestamp
115 && x.datatype() != target_timestamp_type
116 {
117 Some(i)
118 } else {
119 None
120 }
121 }) else {
122 continue;
123 };
124
125 rows.schema[to_be_aligned].datatype = target_timestamp_type as i32;
128
129 for row in rows.rows.iter_mut() {
130 let Some(time_value) = row
131 .values
132 .get_mut(to_be_aligned)
133 .and_then(|x| x.value_data.as_mut())
134 else {
135 continue;
136 };
137 *time_value = align_time_unit(time_value, target_time_unit)?;
138 }
139 }
140 Ok(RowInsertRequests { inserts })
141 }
142}
143
144fn align_time_unit(value: &ValueData, target: TimeUnit) -> servers::error::Result<ValueData> {
145 let timestamp = match value {
146 ValueData::TimestampSecondValue(x) => Timestamp::new_second(*x),
147 ValueData::TimestampMillisecondValue(x) => Timestamp::new_millisecond(*x),
148 ValueData::TimestampMicrosecondValue(x) => Timestamp::new_microsecond(*x),
149 ValueData::TimestampNanosecondValue(x) => Timestamp::new_nanosecond(*x),
150 _ => {
151 return UnexpectedResultSnafu {
152 reason: format!("Timestamp value '{:?}' is not of timestamp type!", value),
153 }
154 .fail();
155 }
156 };
157
158 let timestamp = timestamp
159 .convert_to(target)
160 .with_context(|| TimestampOverflowSnafu {
161 error: format!("{:?} convert to {}", timestamp, target),
162 })?;
163
164 Ok(match target {
165 TimeUnit::Second => ValueData::TimestampSecondValue(timestamp.value()),
166 TimeUnit::Millisecond => ValueData::TimestampMillisecondValue(timestamp.value()),
167 TimeUnit::Microsecond => ValueData::TimestampMicrosecondValue(timestamp.value()),
168 TimeUnit::Nanosecond => ValueData::TimestampNanosecondValue(timestamp.value()),
169 })
170}