frontend/instance/
influxdb.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::value::ValueData;
16use api::v1::{ColumnDataType, RowInsertRequests, SemanticType};
17use async_trait::async_trait;
18use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
19use catalog::CatalogManagerRef;
20use client::Output;
21use common_error::ext::BoxedError;
22use common_time::Timestamp;
23use common_time::timestamp::TimeUnit;
24use servers::error::{
25    AuthSnafu, CatalogSnafu, Error, OtherSnafu, TimestampOverflowSnafu, UnexpectedResultSnafu,
26};
27use servers::influxdb::InfluxdbRequest;
28use servers::interceptor::{LineProtocolInterceptor, LineProtocolInterceptorRef};
29use servers::query_handler::InfluxdbLineProtocolHandler;
30use session::context::QueryContextRef;
31use snafu::{OptionExt, ResultExt};
32
33use crate::instance::Instance;
34
35#[async_trait]
36impl InfluxdbLineProtocolHandler for Instance {
37    async fn exec(
38        &self,
39        request: InfluxdbRequest,
40        ctx: QueryContextRef,
41    ) -> servers::error::Result<Output> {
42        self.plugins
43            .get::<PermissionCheckerRef>()
44            .as_ref()
45            .check_permission(ctx.current_user(), PermissionReq::LineProtocol)
46            .context(AuthSnafu)?;
47
48        let interceptor_ref = self.plugins.get::<LineProtocolInterceptorRef<Error>>();
49        interceptor_ref.pre_execute(&request.lines, ctx.clone())?;
50
51        let requests = request.try_into()?;
52
53        let aligner = InfluxdbLineTimestampAligner {
54            catalog_manager: self.catalog_manager(),
55        };
56        let requests = aligner.align_timestamps(requests, &ctx).await?;
57
58        let requests = interceptor_ref
59            .post_lines_conversion(requests, ctx.clone())
60            .await?;
61
62        let _guard = if let Some(limiter) = &self.limiter {
63            Some(
64                limiter
65                    .limit_row_inserts(&requests)
66                    .await
67                    .map_err(BoxedError::new)
68                    .context(OtherSnafu)?,
69            )
70        } else {
71            None
72        };
73
74        self.handle_influx_row_inserts(requests, ctx)
75            .await
76            .map_err(BoxedError::new)
77            .context(servers::error::ExecuteGrpcQuerySnafu)
78    }
79}
80
81/// Align the timestamp precisions in Influxdb lines (after they are converted to the GRPC row
82/// inserts) to the time index columns' time units of the created tables (if there are any).
83struct InfluxdbLineTimestampAligner<'a> {
84    catalog_manager: &'a CatalogManagerRef,
85}
86
87impl InfluxdbLineTimestampAligner<'_> {
88    async fn align_timestamps(
89        &self,
90        requests: RowInsertRequests,
91        query_context: &QueryContextRef,
92    ) -> servers::error::Result<RowInsertRequests> {
93        let mut inserts = requests.inserts;
94        for insert in inserts.iter_mut() {
95            let Some(rows) = &mut insert.rows else {
96                continue;
97            };
98
99            let Some(target_time_unit) = self
100                .catalog_manager
101                .table(
102                    query_context.current_catalog(),
103                    &query_context.current_schema(),
104                    &insert.table_name,
105                    Some(query_context),
106                )
107                .await
108                .context(CatalogSnafu)?
109                .map(|x| x.schema())
110                .and_then(|schema| {
111                    schema.timestamp_column().map(|col| {
112                        col.data_type
113                            .as_timestamp()
114                            .expect("Time index column is not of timestamp type?!")
115                            .unit()
116                    })
117                })
118            else {
119                continue;
120            };
121
122            let target_timestamp_type = match target_time_unit {
123                TimeUnit::Second => ColumnDataType::TimestampSecond,
124                TimeUnit::Millisecond => ColumnDataType::TimestampMillisecond,
125                TimeUnit::Microsecond => ColumnDataType::TimestampMicrosecond,
126                TimeUnit::Nanosecond => ColumnDataType::TimestampNanosecond,
127            };
128            let Some(to_be_aligned) = rows.schema.iter().enumerate().find_map(|(i, x)| {
129                if x.semantic_type() == SemanticType::Timestamp
130                    && x.datatype() != target_timestamp_type
131                {
132                    Some(i)
133                } else {
134                    None
135                }
136            }) else {
137                continue;
138            };
139
140            // Indexing safety: `to_be_aligned` is guaranteed to be a valid index because it's got
141            // from "enumerate" the schema vector above.
142            rows.schema[to_be_aligned].datatype = target_timestamp_type as i32;
143
144            for row in rows.rows.iter_mut() {
145                let Some(time_value) = row
146                    .values
147                    .get_mut(to_be_aligned)
148                    .and_then(|x| x.value_data.as_mut())
149                else {
150                    continue;
151                };
152                *time_value = align_time_unit(time_value, target_time_unit)?;
153            }
154        }
155        Ok(RowInsertRequests { inserts })
156    }
157}
158
159fn align_time_unit(value: &ValueData, target: TimeUnit) -> servers::error::Result<ValueData> {
160    let timestamp = match value {
161        ValueData::TimestampSecondValue(x) => Timestamp::new_second(*x),
162        ValueData::TimestampMillisecondValue(x) => Timestamp::new_millisecond(*x),
163        ValueData::TimestampMicrosecondValue(x) => Timestamp::new_microsecond(*x),
164        ValueData::TimestampNanosecondValue(x) => Timestamp::new_nanosecond(*x),
165        _ => {
166            return UnexpectedResultSnafu {
167                reason: format!("Timestamp value '{:?}' is not of timestamp type!", value),
168            }
169            .fail();
170        }
171    };
172
173    let timestamp = timestamp
174        .convert_to(target)
175        .with_context(|| TimestampOverflowSnafu {
176            error: format!("{:?} convert to {}", timestamp, target),
177        })?;
178
179    Ok(match target {
180        TimeUnit::Second => ValueData::TimestampSecondValue(timestamp.value()),
181        TimeUnit::Millisecond => ValueData::TimestampMillisecondValue(timestamp.value()),
182        TimeUnit::Microsecond => ValueData::TimestampMicrosecondValue(timestamp.value()),
183        TimeUnit::Nanosecond => ValueData::TimestampNanosecondValue(timestamp.value()),
184    })
185}