servers/http/result/
influxdb_result_v1.rs1use axum::Json;
16use axum::http::HeaderValue;
17use axum::response::{IntoResponse, Response};
18use common_query::{Output, OutputData};
19use common_recordbatch::{RecordBatch, util};
20use serde::{Deserialize, Serialize};
21use serde_json::Value;
22
23use crate::error::{Error, Result};
24use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT};
25use crate::http::result::HttpOutputWriter;
26use crate::http::result::error_result::ErrorResponse;
27use crate::http::{Epoch, HttpResponse, ResponseFormat};
28
29#[derive(Debug, Default, Serialize, Deserialize)]
30pub struct SqlQuery {
31 pub db: Option<String>,
32 pub epoch: Option<String>,
36 pub sql: Option<String>,
37}
38
39#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
40pub struct InfluxdbRecordsOutput {
41 name: String,
44 pub(crate) columns: Vec<String>,
45 pub(crate) values: Vec<Vec<Value>>,
46}
47
48impl InfluxdbRecordsOutput {
49 pub fn new(columns: Vec<String>, values: Vec<Vec<Value>>) -> Self {
50 Self {
51 name: String::default(),
52 columns,
53 values,
54 }
55 }
56}
57
58impl TryFrom<(Option<Epoch>, Vec<RecordBatch>)> for InfluxdbRecordsOutput {
59 type Error = Error;
60
61 fn try_from(
62 (epoch, recordbatches): (Option<Epoch>, Vec<RecordBatch>),
63 ) -> Result<InfluxdbRecordsOutput, Self::Error> {
64 if recordbatches.is_empty() {
65 Ok(InfluxdbRecordsOutput::new(vec![], vec![]))
66 } else {
67 let first = &recordbatches[0];
69 let schema = first.schema.clone();
70 let columns = schema
71 .column_schemas()
72 .iter()
73 .map(|cs| cs.name.clone())
74 .collect::<Vec<_>>();
75
76 let mut rows =
77 Vec::with_capacity(recordbatches.iter().map(|r| r.num_rows()).sum::<usize>());
78
79 let value_transformer =
80 move |value: datatypes::value::Value| -> datatypes::value::Value {
81 match (value, epoch) {
82 (datatypes::value::Value::Timestamp(ts), Some(epoch)) => {
83 if let Some(converted) = epoch.convert_timestamp(ts) {
84 datatypes::value::Value::Timestamp(converted)
85 } else {
86 datatypes::value::Value::Timestamp(ts)
87 }
88 }
89 (value, _) => value,
90 }
91 };
92
93 for recordbatch in recordbatches {
94 let mut writer =
95 HttpOutputWriter::new(schema.num_columns(), Some(Box::new(value_transformer)));
96 writer.write(recordbatch, &mut rows)?;
97 }
98
99 Ok(InfluxdbRecordsOutput::new(columns, rows))
100 }
101 }
102}
103
104#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
105pub struct InfluxdbOutput {
106 pub statement_id: u32,
107 pub series: Vec<InfluxdbRecordsOutput>,
108}
109
110impl InfluxdbOutput {
111 pub fn num_rows(&self) -> usize {
112 self.series.iter().map(|r| r.values.len()).sum()
113 }
114
115 pub fn num_cols(&self) -> usize {
116 self.series
117 .first()
118 .map(|r| r.columns.len())
119 .unwrap_or(0usize)
120 }
121}
122
123#[derive(Serialize, Deserialize, Debug)]
124pub struct InfluxdbV1Response {
125 results: Vec<InfluxdbOutput>,
126 execution_time_ms: u64,
127}
128
129impl InfluxdbV1Response {
130 pub fn with_execution_time(mut self, execution_time: u64) -> Self {
131 self.execution_time_ms = execution_time;
132 self
133 }
134
135 pub async fn from_output(
137 outputs: Vec<crate::error::Result<Output>>,
138 epoch: Option<Epoch>,
139 ) -> HttpResponse {
140 let mut results = Vec::with_capacity(outputs.len());
143 for (statement_id, out) in outputs.into_iter().enumerate() {
144 let statement_id = statement_id as u32;
145 match out {
146 Ok(o) => {
147 match o.data {
148 OutputData::AffectedRows(_) => {
149 results.push(InfluxdbOutput {
150 statement_id,
151 series: vec![],
152 });
153 }
154 OutputData::Stream(stream) => {
155 match util::collect(stream).await {
157 Ok(rows) => match InfluxdbRecordsOutput::try_from((epoch, rows)) {
158 Ok(rows) => {
159 results.push(InfluxdbOutput {
160 statement_id,
161 series: vec![rows],
162 });
163 }
164 Err(err) => {
165 return HttpResponse::Error(ErrorResponse::from_error(err));
166 }
167 },
168 Err(err) => {
169 return HttpResponse::Error(ErrorResponse::from_error(err));
170 }
171 }
172 }
173 OutputData::RecordBatches(rbs) => {
174 match InfluxdbRecordsOutput::try_from((epoch, rbs.take())) {
175 Ok(rows) => {
176 results.push(InfluxdbOutput {
177 statement_id,
178 series: vec![rows],
179 });
180 }
181 Err(err) => {
182 return HttpResponse::Error(ErrorResponse::from_error(err));
183 }
184 }
185 }
186 }
187 }
188 Err(err) => {
189 return HttpResponse::Error(ErrorResponse::from_error(err));
190 }
191 }
192 }
193
194 HttpResponse::InfluxdbV1(InfluxdbV1Response {
195 results,
196 execution_time_ms: 0,
197 })
198 }
199
200 pub fn results(&self) -> &[InfluxdbOutput] {
201 &self.results
202 }
203
204 pub fn execution_time_ms(&self) -> u64 {
205 self.execution_time_ms
206 }
207}
208
209impl IntoResponse for InfluxdbV1Response {
210 fn into_response(self) -> Response {
211 let execution_time = self.execution_time_ms;
212 let mut resp = Json(self).into_response();
213 resp.headers_mut().insert(
214 &GREPTIME_DB_HEADER_FORMAT,
215 HeaderValue::from_static(ResponseFormat::InfluxdbV1.as_str()),
216 );
217 resp.headers_mut().insert(
218 &GREPTIME_DB_HEADER_EXECUTION_TIME,
219 HeaderValue::from(execution_time),
220 );
221 resp
222 }
223}