servers/http/result/
influxdb_result_v1.rs1use axum::http::HeaderValue;
16use axum::response::{IntoResponse, Response};
17use axum::Json;
18use common_query::{Output, OutputData};
19use common_recordbatch::{util, RecordBatch};
20use serde::{Deserialize, Serialize};
21use serde_json::Value;
22use snafu::ResultExt;
23
24use crate::error::{Error, ToJsonSnafu};
25use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT};
26use crate::http::result::error_result::ErrorResponse;
27use crate::http::{Epoch, HttpResponse, ResponseFormat};
28
29#[derive(Debug, Default, Serialize, Deserialize)]
30pub struct SqlQuery {
31 pub db: Option<String>,
32 pub epoch: Option<String>,
36 pub sql: Option<String>,
37}
38
39#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
40pub struct InfluxdbRecordsOutput {
41 name: String,
44 pub(crate) columns: Vec<String>,
45 pub(crate) values: Vec<Vec<Value>>,
46}
47
48impl InfluxdbRecordsOutput {
49 pub fn new(columns: Vec<String>, values: Vec<Vec<Value>>) -> Self {
50 Self {
51 name: String::default(),
52 columns,
53 values,
54 }
55 }
56}
57
58impl TryFrom<(Option<Epoch>, Vec<RecordBatch>)> for InfluxdbRecordsOutput {
59 type Error = Error;
60
61 fn try_from(
62 (epoch, recordbatches): (Option<Epoch>, Vec<RecordBatch>),
63 ) -> Result<InfluxdbRecordsOutput, Self::Error> {
64 if recordbatches.is_empty() {
65 Ok(InfluxdbRecordsOutput::new(vec![], vec![]))
66 } else {
67 let first = &recordbatches[0];
69 let columns = first
70 .schema
71 .column_schemas()
72 .iter()
73 .map(|cs| cs.name.clone())
74 .collect::<Vec<_>>();
75
76 let mut rows =
77 Vec::with_capacity(recordbatches.iter().map(|r| r.num_rows()).sum::<usize>());
78
79 for recordbatch in recordbatches {
80 for row in recordbatch.rows() {
81 let value_row = row
82 .into_iter()
83 .map(|value| {
84 let value = match (epoch, &value) {
85 (Some(epoch), datatypes::value::Value::Timestamp(ts)) => {
86 if let Some(timestamp) = epoch.convert_timestamp(*ts) {
87 datatypes::value::Value::Timestamp(timestamp)
88 } else {
89 value
90 }
91 }
92 _ => value,
93 };
94 Value::try_from(value)
95 })
96 .collect::<Result<Vec<Value>, _>>()
97 .context(ToJsonSnafu)?;
98
99 rows.push(value_row);
100 }
101 }
102
103 Ok(InfluxdbRecordsOutput::new(columns, rows))
104 }
105 }
106}
107
108#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
109pub struct InfluxdbOutput {
110 pub statement_id: u32,
111 pub series: Vec<InfluxdbRecordsOutput>,
112}
113
114impl InfluxdbOutput {
115 pub fn num_rows(&self) -> usize {
116 self.series.iter().map(|r| r.values.len()).sum()
117 }
118
119 pub fn num_cols(&self) -> usize {
120 self.series
121 .first()
122 .map(|r| r.columns.len())
123 .unwrap_or(0usize)
124 }
125}
126
127#[derive(Serialize, Deserialize, Debug)]
128pub struct InfluxdbV1Response {
129 results: Vec<InfluxdbOutput>,
130 execution_time_ms: u64,
131}
132
133impl InfluxdbV1Response {
134 pub fn with_execution_time(mut self, execution_time: u64) -> Self {
135 self.execution_time_ms = execution_time;
136 self
137 }
138
139 pub async fn from_output(
141 outputs: Vec<crate::error::Result<Output>>,
142 epoch: Option<Epoch>,
143 ) -> HttpResponse {
144 let mut results = Vec::with_capacity(outputs.len());
147 for (statement_id, out) in outputs.into_iter().enumerate() {
148 let statement_id = statement_id as u32;
149 match out {
150 Ok(o) => {
151 match o.data {
152 OutputData::AffectedRows(_) => {
153 results.push(InfluxdbOutput {
154 statement_id,
155 series: vec![],
156 });
157 }
158 OutputData::Stream(stream) => {
159 match util::collect(stream).await {
161 Ok(rows) => match InfluxdbRecordsOutput::try_from((epoch, rows)) {
162 Ok(rows) => {
163 results.push(InfluxdbOutput {
164 statement_id,
165 series: vec![rows],
166 });
167 }
168 Err(err) => {
169 return HttpResponse::Error(ErrorResponse::from_error(err));
170 }
171 },
172 Err(err) => {
173 return HttpResponse::Error(ErrorResponse::from_error(err));
174 }
175 }
176 }
177 OutputData::RecordBatches(rbs) => {
178 match InfluxdbRecordsOutput::try_from((epoch, rbs.take())) {
179 Ok(rows) => {
180 results.push(InfluxdbOutput {
181 statement_id,
182 series: vec![rows],
183 });
184 }
185 Err(err) => {
186 return HttpResponse::Error(ErrorResponse::from_error(err));
187 }
188 }
189 }
190 }
191 }
192 Err(err) => {
193 return HttpResponse::Error(ErrorResponse::from_error(err));
194 }
195 }
196 }
197
198 HttpResponse::InfluxdbV1(InfluxdbV1Response {
199 results,
200 execution_time_ms: 0,
201 })
202 }
203
204 pub fn results(&self) -> &[InfluxdbOutput] {
205 &self.results
206 }
207
208 pub fn execution_time_ms(&self) -> u64 {
209 self.execution_time_ms
210 }
211}
212
213impl IntoResponse for InfluxdbV1Response {
214 fn into_response(self) -> Response {
215 let execution_time = self.execution_time_ms;
216 let mut resp = Json(self).into_response();
217 resp.headers_mut().insert(
218 &GREPTIME_DB_HEADER_FORMAT,
219 HeaderValue::from_static(ResponseFormat::InfluxdbV1.as_str()),
220 );
221 resp.headers_mut().insert(
222 &GREPTIME_DB_HEADER_EXECUTION_TIME,
223 HeaderValue::from(execution_time),
224 );
225 resp
226 }
227}