frontend/instance/
influxdb.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::value::ValueData;
16use api::v1::{ColumnDataType, RowInsertRequests, SemanticType};
17use async_trait::async_trait;
18use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
19use catalog::CatalogManagerRef;
20use client::Output;
21use common_error::ext::BoxedError;
22use common_time::Timestamp;
23use common_time::timestamp::TimeUnit;
24use servers::error::{
25    AuthSnafu, CatalogSnafu, Error, TimestampOverflowSnafu, UnexpectedResultSnafu,
26};
27use servers::influxdb::InfluxdbRequest;
28use servers::interceptor::{LineProtocolInterceptor, LineProtocolInterceptorRef};
29use servers::query_handler::InfluxdbLineProtocolHandler;
30use session::context::QueryContextRef;
31use snafu::{OptionExt, ResultExt};
32
33use crate::instance::Instance;
34
35#[async_trait]
36impl InfluxdbLineProtocolHandler for Instance {
37    async fn exec(
38        &self,
39        request: InfluxdbRequest,
40        ctx: QueryContextRef,
41    ) -> servers::error::Result<Output> {
42        self.plugins
43            .get::<PermissionCheckerRef>()
44            .as_ref()
45            .check_permission(ctx.current_user(), PermissionReq::LineProtocol)
46            .context(AuthSnafu)?;
47
48        let interceptor_ref = self.plugins.get::<LineProtocolInterceptorRef<Error>>();
49        interceptor_ref.pre_execute(&request.lines, ctx.clone())?;
50
51        let requests = request.try_into()?;
52
53        let aligner = InfluxdbLineTimestampAligner {
54            catalog_manager: self.catalog_manager(),
55        };
56        let requests = aligner.align_timestamps(requests, &ctx).await?;
57
58        let requests = interceptor_ref
59            .post_lines_conversion(requests, ctx.clone())
60            .await?;
61
62        self.handle_influx_row_inserts(requests, ctx)
63            .await
64            .map_err(BoxedError::new)
65            .context(servers::error::ExecuteGrpcQuerySnafu)
66    }
67}
68
69/// Align the timestamp precisions in Influxdb lines (after they are converted to the GRPC row
70/// inserts) to the time index columns' time units of the created tables (if there are any).
71struct InfluxdbLineTimestampAligner<'a> {
72    catalog_manager: &'a CatalogManagerRef,
73}
74
75impl InfluxdbLineTimestampAligner<'_> {
76    async fn align_timestamps(
77        &self,
78        requests: RowInsertRequests,
79        query_context: &QueryContextRef,
80    ) -> servers::error::Result<RowInsertRequests> {
81        let mut inserts = requests.inserts;
82        for insert in inserts.iter_mut() {
83            let Some(rows) = &mut insert.rows else {
84                continue;
85            };
86
87            let Some(target_time_unit) = self
88                .catalog_manager
89                .table(
90                    query_context.current_catalog(),
91                    &query_context.current_schema(),
92                    &insert.table_name,
93                    Some(query_context),
94                )
95                .await
96                .context(CatalogSnafu)?
97                .map(|x| x.schema())
98                .and_then(|schema| {
99                    schema.timestamp_column().map(|col| {
100                        col.data_type
101                            .as_timestamp()
102                            .expect("Time index column is not of timestamp type?!")
103                            .unit()
104                    })
105                })
106            else {
107                continue;
108            };
109
110            let target_timestamp_type = match target_time_unit {
111                TimeUnit::Second => ColumnDataType::TimestampSecond,
112                TimeUnit::Millisecond => ColumnDataType::TimestampMillisecond,
113                TimeUnit::Microsecond => ColumnDataType::TimestampMicrosecond,
114                TimeUnit::Nanosecond => ColumnDataType::TimestampNanosecond,
115            };
116            let Some(to_be_aligned) = rows.schema.iter().enumerate().find_map(|(i, x)| {
117                if x.semantic_type() == SemanticType::Timestamp
118                    && x.datatype() != target_timestamp_type
119                {
120                    Some(i)
121                } else {
122                    None
123                }
124            }) else {
125                continue;
126            };
127
128            // Indexing safety: `to_be_aligned` is guaranteed to be a valid index because it's got
129            // from "enumerate" the schema vector above.
130            rows.schema[to_be_aligned].datatype = target_timestamp_type as i32;
131
132            for row in rows.rows.iter_mut() {
133                let Some(time_value) = row
134                    .values
135                    .get_mut(to_be_aligned)
136                    .and_then(|x| x.value_data.as_mut())
137                else {
138                    continue;
139                };
140                *time_value = align_time_unit(time_value, target_time_unit)?;
141            }
142        }
143        Ok(RowInsertRequests { inserts })
144    }
145}
146
147fn align_time_unit(value: &ValueData, target: TimeUnit) -> servers::error::Result<ValueData> {
148    let timestamp = match value {
149        ValueData::TimestampSecondValue(x) => Timestamp::new_second(*x),
150        ValueData::TimestampMillisecondValue(x) => Timestamp::new_millisecond(*x),
151        ValueData::TimestampMicrosecondValue(x) => Timestamp::new_microsecond(*x),
152        ValueData::TimestampNanosecondValue(x) => Timestamp::new_nanosecond(*x),
153        _ => {
154            return UnexpectedResultSnafu {
155                reason: format!("Timestamp value '{:?}' is not of timestamp type!", value),
156            }
157            .fail();
158        }
159    };
160
161    let timestamp = timestamp
162        .convert_to(target)
163        .with_context(|| TimestampOverflowSnafu {
164            error: format!("{:?} convert to {}", timestamp, target),
165        })?;
166
167    Ok(match target {
168        TimeUnit::Second => ValueData::TimestampSecondValue(timestamp.value()),
169        TimeUnit::Millisecond => ValueData::TimestampMillisecondValue(timestamp.value()),
170        TimeUnit::Microsecond => ValueData::TimestampMicrosecondValue(timestamp.value()),
171        TimeUnit::Nanosecond => ValueData::TimestampNanosecondValue(timestamp.value()),
172    })
173}