servers/http/result/
influxdb_result_v1.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use axum::http::HeaderValue;
16use axum::response::{IntoResponse, Response};
17use axum::Json;
18use common_query::{Output, OutputData};
19use common_recordbatch::{util, RecordBatch};
20use serde::{Deserialize, Serialize};
21use serde_json::Value;
22use snafu::ResultExt;
23
24use crate::error::{Error, ToJsonSnafu};
25use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT};
26use crate::http::result::error_result::ErrorResponse;
27use crate::http::{Epoch, HttpResponse, ResponseFormat};
28
29#[derive(Debug, Default, Serialize, Deserialize)]
30pub struct SqlQuery {
31    pub db: Option<String>,
32    // Returns epoch timestamps with the specified precision.
33    // Both u and µ indicate microseconds.
34    // epoch = [ns,u,µ,ms,s],
35    pub epoch: Option<String>,
36    pub sql: Option<String>,
37}
38
39#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
40pub struct InfluxdbRecordsOutput {
41    // The SQL query does not return the table name, but in InfluxDB,
42    // we require the table name, so we set it to an empty string “”.
43    name: String,
44    pub(crate) columns: Vec<String>,
45    pub(crate) values: Vec<Vec<Value>>,
46}
47
48impl InfluxdbRecordsOutput {
49    pub fn new(columns: Vec<String>, values: Vec<Vec<Value>>) -> Self {
50        Self {
51            name: String::default(),
52            columns,
53            values,
54        }
55    }
56}
57
58impl TryFrom<(Option<Epoch>, Vec<RecordBatch>)> for InfluxdbRecordsOutput {
59    type Error = Error;
60
61    fn try_from(
62        (epoch, recordbatches): (Option<Epoch>, Vec<RecordBatch>),
63    ) -> Result<InfluxdbRecordsOutput, Self::Error> {
64        if recordbatches.is_empty() {
65            Ok(InfluxdbRecordsOutput::new(vec![], vec![]))
66        } else {
67            // Safety: ensured by previous empty check
68            let first = &recordbatches[0];
69            let columns = first
70                .schema
71                .column_schemas()
72                .iter()
73                .map(|cs| cs.name.clone())
74                .collect::<Vec<_>>();
75
76            let mut rows =
77                Vec::with_capacity(recordbatches.iter().map(|r| r.num_rows()).sum::<usize>());
78
79            for recordbatch in recordbatches {
80                for row in recordbatch.rows() {
81                    let value_row = row
82                        .into_iter()
83                        .map(|value| {
84                            let value = match (epoch, &value) {
85                                (Some(epoch), datatypes::value::Value::Timestamp(ts)) => {
86                                    if let Some(timestamp) = epoch.convert_timestamp(*ts) {
87                                        datatypes::value::Value::Timestamp(timestamp)
88                                    } else {
89                                        value
90                                    }
91                                }
92                                _ => value,
93                            };
94                            Value::try_from(value)
95                        })
96                        .collect::<Result<Vec<Value>, _>>()
97                        .context(ToJsonSnafu)?;
98
99                    rows.push(value_row);
100                }
101            }
102
103            Ok(InfluxdbRecordsOutput::new(columns, rows))
104        }
105    }
106}
107
108#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
109pub struct InfluxdbOutput {
110    pub statement_id: u32,
111    pub series: Vec<InfluxdbRecordsOutput>,
112}
113
114impl InfluxdbOutput {
115    pub fn num_rows(&self) -> usize {
116        self.series.iter().map(|r| r.values.len()).sum()
117    }
118
119    pub fn num_cols(&self) -> usize {
120        self.series
121            .first()
122            .map(|r| r.columns.len())
123            .unwrap_or(0usize)
124    }
125}
126
127#[derive(Serialize, Deserialize, Debug)]
128pub struct InfluxdbV1Response {
129    results: Vec<InfluxdbOutput>,
130    execution_time_ms: u64,
131}
132
133impl InfluxdbV1Response {
134    pub fn with_execution_time(mut self, execution_time: u64) -> Self {
135        self.execution_time_ms = execution_time;
136        self
137    }
138
139    /// Create a influxdb v1 response from query result
140    pub async fn from_output(
141        outputs: Vec<crate::error::Result<Output>>,
142        epoch: Option<Epoch>,
143    ) -> HttpResponse {
144        // TODO(sunng87): this api response structure cannot represent error well.
145        //  It hides successful execution results from error response
146        let mut results = Vec::with_capacity(outputs.len());
147        for (statement_id, out) in outputs.into_iter().enumerate() {
148            let statement_id = statement_id as u32;
149            match out {
150                Ok(o) => {
151                    match o.data {
152                        OutputData::AffectedRows(_) => {
153                            results.push(InfluxdbOutput {
154                                statement_id,
155                                series: vec![],
156                            });
157                        }
158                        OutputData::Stream(stream) => {
159                            // TODO(sunng87): streaming response
160                            match util::collect(stream).await {
161                                Ok(rows) => match InfluxdbRecordsOutput::try_from((epoch, rows)) {
162                                    Ok(rows) => {
163                                        results.push(InfluxdbOutput {
164                                            statement_id,
165                                            series: vec![rows],
166                                        });
167                                    }
168                                    Err(err) => {
169                                        return HttpResponse::Error(ErrorResponse::from_error(err));
170                                    }
171                                },
172                                Err(err) => {
173                                    return HttpResponse::Error(ErrorResponse::from_error(err));
174                                }
175                            }
176                        }
177                        OutputData::RecordBatches(rbs) => {
178                            match InfluxdbRecordsOutput::try_from((epoch, rbs.take())) {
179                                Ok(rows) => {
180                                    results.push(InfluxdbOutput {
181                                        statement_id,
182                                        series: vec![rows],
183                                    });
184                                }
185                                Err(err) => {
186                                    return HttpResponse::Error(ErrorResponse::from_error(err));
187                                }
188                            }
189                        }
190                    }
191                }
192                Err(err) => {
193                    return HttpResponse::Error(ErrorResponse::from_error(err));
194                }
195            }
196        }
197
198        HttpResponse::InfluxdbV1(InfluxdbV1Response {
199            results,
200            execution_time_ms: 0,
201        })
202    }
203
204    pub fn results(&self) -> &[InfluxdbOutput] {
205        &self.results
206    }
207
208    pub fn execution_time_ms(&self) -> u64 {
209        self.execution_time_ms
210    }
211}
212
213impl IntoResponse for InfluxdbV1Response {
214    fn into_response(self) -> Response {
215        let execution_time = self.execution_time_ms;
216        let mut resp = Json(self).into_response();
217        resp.headers_mut().insert(
218            &GREPTIME_DB_HEADER_FORMAT,
219            HeaderValue::from_static(ResponseFormat::InfluxdbV1.as_str()),
220        );
221        resp.headers_mut().insert(
222            &GREPTIME_DB_HEADER_EXECUTION_TIME,
223            HeaderValue::from(execution_time),
224        );
225        resp
226    }
227}