frontend/instance/
influxdb.rs1use api::v1::value::ValueData;
16use api::v1::{ColumnDataType, RowInsertRequests, SemanticType};
17use async_trait::async_trait;
18use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
19use catalog::CatalogManagerRef;
20use client::Output;
21use common_error::ext::BoxedError;
22use common_time::Timestamp;
23use common_time::timestamp::TimeUnit;
24use servers::error::{
25 AuthSnafu, CatalogSnafu, Error, TimestampOverflowSnafu, UnexpectedResultSnafu,
26};
27use servers::influxdb::InfluxdbRequest;
28use servers::interceptor::{LineProtocolInterceptor, LineProtocolInterceptorRef};
29use servers::query_handler::InfluxdbLineProtocolHandler;
30use session::context::QueryContextRef;
31use snafu::{OptionExt, ResultExt};
32
33use crate::instance::Instance;
34
35#[async_trait]
36impl InfluxdbLineProtocolHandler for Instance {
37 async fn exec(
38 &self,
39 request: InfluxdbRequest,
40 ctx: QueryContextRef,
41 ) -> servers::error::Result<Output> {
42 self.plugins
43 .get::<PermissionCheckerRef>()
44 .as_ref()
45 .check_permission(ctx.current_user(), PermissionReq::LineProtocol)
46 .context(AuthSnafu)?;
47
48 let interceptor_ref = self.plugins.get::<LineProtocolInterceptorRef<Error>>();
49 interceptor_ref.pre_execute(&request.lines, ctx.clone())?;
50
51 let requests = request.try_into()?;
52
53 let aligner = InfluxdbLineTimestampAligner {
54 catalog_manager: self.catalog_manager(),
55 };
56 let requests = aligner.align_timestamps(requests, &ctx).await?;
57
58 let requests = interceptor_ref
59 .post_lines_conversion(requests, ctx.clone())
60 .await?;
61
62 self.handle_influx_row_inserts(requests, ctx)
63 .await
64 .map_err(BoxedError::new)
65 .context(servers::error::ExecuteGrpcQuerySnafu)
66 }
67}
68
69struct InfluxdbLineTimestampAligner<'a> {
72 catalog_manager: &'a CatalogManagerRef,
73}
74
75impl InfluxdbLineTimestampAligner<'_> {
76 async fn align_timestamps(
77 &self,
78 requests: RowInsertRequests,
79 query_context: &QueryContextRef,
80 ) -> servers::error::Result<RowInsertRequests> {
81 let mut inserts = requests.inserts;
82 for insert in inserts.iter_mut() {
83 let Some(rows) = &mut insert.rows else {
84 continue;
85 };
86
87 let Some(target_time_unit) = self
88 .catalog_manager
89 .table(
90 query_context.current_catalog(),
91 &query_context.current_schema(),
92 &insert.table_name,
93 Some(query_context),
94 )
95 .await
96 .context(CatalogSnafu)?
97 .map(|x| x.schema())
98 .and_then(|schema| {
99 schema.timestamp_column().map(|col| {
100 col.data_type
101 .as_timestamp()
102 .expect("Time index column is not of timestamp type?!")
103 .unit()
104 })
105 })
106 else {
107 continue;
108 };
109
110 let target_timestamp_type = match target_time_unit {
111 TimeUnit::Second => ColumnDataType::TimestampSecond,
112 TimeUnit::Millisecond => ColumnDataType::TimestampMillisecond,
113 TimeUnit::Microsecond => ColumnDataType::TimestampMicrosecond,
114 TimeUnit::Nanosecond => ColumnDataType::TimestampNanosecond,
115 };
116 let Some(to_be_aligned) = rows.schema.iter().enumerate().find_map(|(i, x)| {
117 if x.semantic_type() == SemanticType::Timestamp
118 && x.datatype() != target_timestamp_type
119 {
120 Some(i)
121 } else {
122 None
123 }
124 }) else {
125 continue;
126 };
127
128 rows.schema[to_be_aligned].datatype = target_timestamp_type as i32;
131
132 for row in rows.rows.iter_mut() {
133 let Some(time_value) = row
134 .values
135 .get_mut(to_be_aligned)
136 .and_then(|x| x.value_data.as_mut())
137 else {
138 continue;
139 };
140 *time_value = align_time_unit(time_value, target_time_unit)?;
141 }
142 }
143 Ok(RowInsertRequests { inserts })
144 }
145}
146
147fn align_time_unit(value: &ValueData, target: TimeUnit) -> servers::error::Result<ValueData> {
148 let timestamp = match value {
149 ValueData::TimestampSecondValue(x) => Timestamp::new_second(*x),
150 ValueData::TimestampMillisecondValue(x) => Timestamp::new_millisecond(*x),
151 ValueData::TimestampMicrosecondValue(x) => Timestamp::new_microsecond(*x),
152 ValueData::TimestampNanosecondValue(x) => Timestamp::new_nanosecond(*x),
153 _ => {
154 return UnexpectedResultSnafu {
155 reason: format!("Timestamp value '{:?}' is not of timestamp type!", value),
156 }
157 .fail();
158 }
159 };
160
161 let timestamp = timestamp
162 .convert_to(target)
163 .with_context(|| TimestampOverflowSnafu {
164 error: format!("{:?} convert to {}", timestamp, target),
165 })?;
166
167 Ok(match target {
168 TimeUnit::Second => ValueData::TimestampSecondValue(timestamp.value()),
169 TimeUnit::Millisecond => ValueData::TimestampMillisecondValue(timestamp.value()),
170 TimeUnit::Microsecond => ValueData::TimestampMicrosecondValue(timestamp.value()),
171 TimeUnit::Nanosecond => ValueData::TimestampNanosecondValue(timestamp.value()),
172 })
173}