Skip to main content

frontend/instance/
influxdb.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::value::ValueData;
16use api::v1::{ColumnDataType, RowInsertRequests, SemanticType};
17use async_trait::async_trait;
18use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
19use catalog::CatalogManagerRef;
20use client::Output;
21use common_error::ext::BoxedError;
22use common_time::Timestamp;
23use common_time::timestamp::TimeUnit;
24use servers::error::{AuthSnafu, Error, TimestampOverflowSnafu, UnexpectedResultSnafu};
25use servers::influxdb::InfluxdbRequest;
26use servers::interceptor::{LineProtocolInterceptor, LineProtocolInterceptorRef};
27use servers::query_handler::InfluxdbLineProtocolHandler;
28use session::context::QueryContextRef;
29use snafu::{OptionExt, ResultExt};
30
31use crate::instance::Instance;
32
33#[async_trait]
34impl InfluxdbLineProtocolHandler for Instance {
35    async fn exec(
36        &self,
37        request: InfluxdbRequest,
38        ctx: QueryContextRef,
39    ) -> servers::error::Result<Output> {
40        self.plugins
41            .get::<PermissionCheckerRef>()
42            .as_ref()
43            .check_permission(ctx.current_user(), PermissionReq::LineProtocol)
44            .context(AuthSnafu)?;
45
46        let interceptor_ref = self.plugins.get::<LineProtocolInterceptorRef<Error>>();
47        interceptor_ref.pre_execute(&request.lines, ctx.clone())?;
48
49        let requests = request.try_into()?;
50
51        let aligner = InfluxdbLineTimestampAligner {
52            catalog_manager: self.catalog_manager(),
53        };
54        let requests = aligner.align_timestamps(requests, &ctx).await?;
55
56        let requests = interceptor_ref
57            .post_lines_conversion(requests, ctx.clone())
58            .await?;
59
60        self.handle_influx_row_inserts(requests, ctx)
61            .await
62            .map_err(BoxedError::new)
63            .context(servers::error::ExecuteGrpcQuerySnafu)
64    }
65}
66
67/// Align the timestamp precisions in Influxdb lines (after they are converted to the GRPC row
68/// inserts) to the time index columns' time units of the created tables (if there are any).
69struct InfluxdbLineTimestampAligner<'a> {
70    catalog_manager: &'a CatalogManagerRef,
71}
72
73impl InfluxdbLineTimestampAligner<'_> {
74    async fn align_timestamps(
75        &self,
76        requests: RowInsertRequests,
77        query_context: &QueryContextRef,
78    ) -> servers::error::Result<RowInsertRequests> {
79        let mut inserts = requests.inserts;
80        for insert in inserts.iter_mut() {
81            let Some(rows) = &mut insert.rows else {
82                continue;
83            };
84
85            let Some(target_time_unit) = self
86                .catalog_manager
87                .table(
88                    query_context.current_catalog(),
89                    &query_context.current_schema(),
90                    &insert.table_name,
91                    Some(query_context),
92                )
93                .await?
94                .map(|x| x.schema())
95                .and_then(|schema| {
96                    schema.timestamp_column().map(|col| {
97                        col.data_type
98                            .as_timestamp()
99                            .expect("Time index column is not of timestamp type?!")
100                            .unit()
101                    })
102                })
103            else {
104                continue;
105            };
106
107            let target_timestamp_type = match target_time_unit {
108                TimeUnit::Second => ColumnDataType::TimestampSecond,
109                TimeUnit::Millisecond => ColumnDataType::TimestampMillisecond,
110                TimeUnit::Microsecond => ColumnDataType::TimestampMicrosecond,
111                TimeUnit::Nanosecond => ColumnDataType::TimestampNanosecond,
112            };
113            let Some(to_be_aligned) = rows.schema.iter().enumerate().find_map(|(i, x)| {
114                if x.semantic_type() == SemanticType::Timestamp
115                    && x.datatype() != target_timestamp_type
116                {
117                    Some(i)
118                } else {
119                    None
120                }
121            }) else {
122                continue;
123            };
124
125            // Indexing safety: `to_be_aligned` is guaranteed to be a valid index because it's got
126            // from "enumerate" the schema vector above.
127            rows.schema[to_be_aligned].datatype = target_timestamp_type as i32;
128
129            for row in rows.rows.iter_mut() {
130                let Some(time_value) = row
131                    .values
132                    .get_mut(to_be_aligned)
133                    .and_then(|x| x.value_data.as_mut())
134                else {
135                    continue;
136                };
137                *time_value = align_time_unit(time_value, target_time_unit)?;
138            }
139        }
140        Ok(RowInsertRequests { inserts })
141    }
142}
143
144fn align_time_unit(value: &ValueData, target: TimeUnit) -> servers::error::Result<ValueData> {
145    let timestamp = match value {
146        ValueData::TimestampSecondValue(x) => Timestamp::new_second(*x),
147        ValueData::TimestampMillisecondValue(x) => Timestamp::new_millisecond(*x),
148        ValueData::TimestampMicrosecondValue(x) => Timestamp::new_microsecond(*x),
149        ValueData::TimestampNanosecondValue(x) => Timestamp::new_nanosecond(*x),
150        _ => {
151            return UnexpectedResultSnafu {
152                reason: format!("Timestamp value '{:?}' is not of timestamp type!", value),
153            }
154            .fail();
155        }
156    };
157
158    let timestamp = timestamp
159        .convert_to(target)
160        .with_context(|| TimestampOverflowSnafu {
161            error: format!("{:?} convert to {}", timestamp, target),
162        })?;
163
164    Ok(match target {
165        TimeUnit::Second => ValueData::TimestampSecondValue(timestamp.value()),
166        TimeUnit::Millisecond => ValueData::TimestampMillisecondValue(timestamp.value()),
167        TimeUnit::Microsecond => ValueData::TimestampMicrosecondValue(timestamp.value()),
168        TimeUnit::Nanosecond => ValueData::TimestampNanosecondValue(timestamp.value()),
169    })
170}