servers/http/result/
influxdb_result_v1.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use axum::Json;
16use axum::http::HeaderValue;
17use axum::response::{IntoResponse, Response};
18use common_query::{Output, OutputData};
19use common_recordbatch::{RecordBatch, util};
20use serde::{Deserialize, Serialize};
21use serde_json::Value;
22
23use crate::error::{Error, Result};
24use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT};
25use crate::http::result::HttpOutputWriter;
26use crate::http::result::error_result::ErrorResponse;
27use crate::http::{Epoch, HttpResponse, ResponseFormat};
28
29#[derive(Debug, Default, Serialize, Deserialize)]
30pub struct SqlQuery {
31    pub db: Option<String>,
32    // Returns epoch timestamps with the specified precision.
33    // Both u and µ indicate microseconds.
34    // epoch = [ns,u,µ,ms,s],
35    pub epoch: Option<String>,
36    pub sql: Option<String>,
37}
38
39#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
40pub struct InfluxdbRecordsOutput {
41    // The SQL query does not return the table name, but in InfluxDB,
42    // we require the table name, so we set it to an empty string “”.
43    name: String,
44    pub(crate) columns: Vec<String>,
45    pub(crate) values: Vec<Vec<Value>>,
46}
47
48impl InfluxdbRecordsOutput {
49    pub fn new(columns: Vec<String>, values: Vec<Vec<Value>>) -> Self {
50        Self {
51            name: String::default(),
52            columns,
53            values,
54        }
55    }
56}
57
58impl TryFrom<(Option<Epoch>, Vec<RecordBatch>)> for InfluxdbRecordsOutput {
59    type Error = Error;
60
61    fn try_from(
62        (epoch, recordbatches): (Option<Epoch>, Vec<RecordBatch>),
63    ) -> Result<InfluxdbRecordsOutput, Self::Error> {
64        if recordbatches.is_empty() {
65            Ok(InfluxdbRecordsOutput::new(vec![], vec![]))
66        } else {
67            // Safety: ensured by previous empty check
68            let first = &recordbatches[0];
69            let schema = first.schema.clone();
70            let columns = schema
71                .column_schemas()
72                .iter()
73                .map(|cs| cs.name.clone())
74                .collect::<Vec<_>>();
75
76            let mut rows =
77                Vec::with_capacity(recordbatches.iter().map(|r| r.num_rows()).sum::<usize>());
78
79            let value_transformer =
80                move |value: datatypes::value::Value| -> datatypes::value::Value {
81                    match (value, epoch) {
82                        (datatypes::value::Value::Timestamp(ts), Some(epoch)) => {
83                            if let Some(converted) = epoch.convert_timestamp(ts) {
84                                datatypes::value::Value::Timestamp(converted)
85                            } else {
86                                datatypes::value::Value::Timestamp(ts)
87                            }
88                        }
89                        (value, _) => value,
90                    }
91                };
92
93            for recordbatch in recordbatches {
94                let mut writer =
95                    HttpOutputWriter::new(schema.num_columns(), Some(Box::new(value_transformer)));
96                writer.write(recordbatch, &mut rows)?;
97            }
98
99            Ok(InfluxdbRecordsOutput::new(columns, rows))
100        }
101    }
102}
103
104#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
105pub struct InfluxdbOutput {
106    pub statement_id: u32,
107    pub series: Vec<InfluxdbRecordsOutput>,
108}
109
110impl InfluxdbOutput {
111    pub fn num_rows(&self) -> usize {
112        self.series.iter().map(|r| r.values.len()).sum()
113    }
114
115    pub fn num_cols(&self) -> usize {
116        self.series
117            .first()
118            .map(|r| r.columns.len())
119            .unwrap_or(0usize)
120    }
121}
122
123#[derive(Serialize, Deserialize, Debug)]
124pub struct InfluxdbV1Response {
125    results: Vec<InfluxdbOutput>,
126    execution_time_ms: u64,
127}
128
129impl InfluxdbV1Response {
130    pub fn with_execution_time(mut self, execution_time: u64) -> Self {
131        self.execution_time_ms = execution_time;
132        self
133    }
134
135    /// Create a influxdb v1 response from query result
136    pub async fn from_output(
137        outputs: Vec<crate::error::Result<Output>>,
138        epoch: Option<Epoch>,
139    ) -> HttpResponse {
140        // TODO(sunng87): this api response structure cannot represent error well.
141        //  It hides successful execution results from error response
142        let mut results = Vec::with_capacity(outputs.len());
143        for (statement_id, out) in outputs.into_iter().enumerate() {
144            let statement_id = statement_id as u32;
145            match out {
146                Ok(o) => {
147                    match o.data {
148                        OutputData::AffectedRows(_) => {
149                            results.push(InfluxdbOutput {
150                                statement_id,
151                                series: vec![],
152                            });
153                        }
154                        OutputData::Stream(stream) => {
155                            // TODO(sunng87): streaming response
156                            match util::collect(stream).await {
157                                Ok(rows) => match InfluxdbRecordsOutput::try_from((epoch, rows)) {
158                                    Ok(rows) => {
159                                        results.push(InfluxdbOutput {
160                                            statement_id,
161                                            series: vec![rows],
162                                        });
163                                    }
164                                    Err(err) => {
165                                        return HttpResponse::Error(ErrorResponse::from_error(err));
166                                    }
167                                },
168                                Err(err) => {
169                                    return HttpResponse::Error(ErrorResponse::from_error(err));
170                                }
171                            }
172                        }
173                        OutputData::RecordBatches(rbs) => {
174                            match InfluxdbRecordsOutput::try_from((epoch, rbs.take())) {
175                                Ok(rows) => {
176                                    results.push(InfluxdbOutput {
177                                        statement_id,
178                                        series: vec![rows],
179                                    });
180                                }
181                                Err(err) => {
182                                    return HttpResponse::Error(ErrorResponse::from_error(err));
183                                }
184                            }
185                        }
186                    }
187                }
188                Err(err) => {
189                    return HttpResponse::Error(ErrorResponse::from_error(err));
190                }
191            }
192        }
193
194        HttpResponse::InfluxdbV1(InfluxdbV1Response {
195            results,
196            execution_time_ms: 0,
197        })
198    }
199
200    pub fn results(&self) -> &[InfluxdbOutput] {
201        &self.results
202    }
203
204    pub fn execution_time_ms(&self) -> u64 {
205        self.execution_time_ms
206    }
207}
208
209impl IntoResponse for InfluxdbV1Response {
210    fn into_response(self) -> Response {
211        let execution_time = self.execution_time_ms;
212        let mut resp = Json(self).into_response();
213        resp.headers_mut().insert(
214            &GREPTIME_DB_HEADER_FORMAT,
215            HeaderValue::from_static(ResponseFormat::InfluxdbV1.as_str()),
216        );
217        resp.headers_mut().insert(
218            &GREPTIME_DB_HEADER_EXECUTION_TIME,
219            HeaderValue::from(execution_time),
220        );
221        resp
222    }
223}