1use axum::http::{header, HeaderValue};
16use axum::response::{IntoResponse, Response};
17use common_error::status_code::StatusCode;
18use common_query::Output;
19use mime_guess::mime;
20use serde::{Deserialize, Serialize};
21use serde_json::Value as JsonValue;
22
23use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT};
24use crate::http::result::error_result::ErrorResponse;
25use crate::http::{handler, process_with_limit, GreptimeQueryOutput, HttpResponse, ResponseFormat};
26
27#[derive(Serialize, Deserialize, Debug)]
28pub struct CsvResponse {
29 output: Vec<GreptimeQueryOutput>,
30 execution_time_ms: u64,
31 with_names: bool,
32 with_types: bool,
33}
34
35impl CsvResponse {
36 pub async fn from_output(
37 outputs: Vec<crate::error::Result<Output>>,
38 with_names: bool,
39 with_types: bool,
40 ) -> HttpResponse {
41 match handler::from_output(outputs).await {
42 Err(err) => HttpResponse::Error(err),
43 Ok((output, _)) => {
44 if output.len() > 1 {
45 HttpResponse::Error(ErrorResponse::from_error_message(
46 StatusCode::InvalidArguments,
47 "cannot output multi-statements result in csv format".to_string(),
48 ))
49 } else {
50 let csv_resp = CsvResponse {
51 output,
52 execution_time_ms: 0,
53 with_names: false,
54 with_types: false,
55 };
56
57 HttpResponse::Csv(csv_resp.with_names(with_names).with_types(with_types))
58 }
59 }
60 }
61 }
62
63 pub fn output(&self) -> &[GreptimeQueryOutput] {
64 &self.output
65 }
66
67 pub fn with_execution_time(mut self, execution_time: u64) -> Self {
68 self.execution_time_ms = execution_time;
69 self
70 }
71
72 pub fn execution_time_ms(&self) -> u64 {
73 self.execution_time_ms
74 }
75
76 pub fn with_limit(mut self, limit: usize) -> Self {
77 self.output = process_with_limit(self.output, limit);
78 self
79 }
80
81 pub fn with_names(mut self, with_names: bool) -> Self {
82 self.with_names = with_names;
83 self
84 }
85
86 pub fn with_types(mut self, with_types: bool) -> Self {
87 self.with_types = with_types;
88
89 if with_types {
91 self.with_names = true;
92 }
93 self
94 }
95}
96
97macro_rules! http_try {
98 ($handle: expr) => {
99 match $handle {
100 Ok(res) => res,
101 Err(err) => {
102 let msg = err.to_string();
103 return HttpResponse::Error(ErrorResponse::from_error_message(
104 StatusCode::Unexpected,
105 msg,
106 ))
107 .into_response();
108 }
109 }
110 };
111}
112
113impl IntoResponse for CsvResponse {
114 fn into_response(mut self) -> Response {
115 debug_assert!(
116 self.output.len() <= 1,
117 "self.output has extra elements: {}",
118 self.output.len()
119 );
120
121 let execution_time = self.execution_time_ms;
122 let payload = match self.output.pop() {
123 None => String::default(),
124 Some(GreptimeQueryOutput::AffectedRows(n)) => {
125 format!("{n}\n")
126 }
127 Some(GreptimeQueryOutput::Records(records)) => {
128 let mut wtr = csv::WriterBuilder::new()
129 .terminator(csv::Terminator::CRLF) .from_writer(Vec::new());
131
132 if self.with_names {
133 let names = records
134 .schema
135 .column_schemas
136 .iter()
137 .map(|c| &c.name)
138 .collect::<Vec<_>>();
139 http_try!(wtr.serialize(names));
140 }
141
142 if self.with_types {
143 let types = records
144 .schema
145 .column_schemas
146 .iter()
147 .map(|c| &c.data_type)
148 .collect::<Vec<_>>();
149 http_try!(wtr.serialize(types));
150 }
151
152 for row in records.rows {
153 let row = row
154 .into_iter()
155 .map(|value| {
156 match value {
157 JsonValue::Array(a) => {
159 JsonValue::String(serde_json::to_string(&a).unwrap_or_default())
160 }
161 JsonValue::Object(o) => {
162 JsonValue::String(serde_json::to_string(&o).unwrap_or_default())
163 }
164 v => v,
165 }
166 })
167 .collect::<Vec<_>>();
168
169 http_try!(wtr.serialize(row));
170 }
171
172 http_try!(wtr.flush());
173
174 let bytes = http_try!(wtr.into_inner());
175 http_try!(String::from_utf8(bytes))
176 }
177 };
178
179 let mut resp = (
180 [(
181 header::CONTENT_TYPE,
182 HeaderValue::from_static(mime::TEXT_CSV_UTF_8.as_ref()),
183 )],
184 payload,
185 )
186 .into_response();
187 resp.headers_mut().insert(
188 &GREPTIME_DB_HEADER_FORMAT,
189 HeaderValue::from_static(
190 ResponseFormat::Csv(self.with_names, self.with_types).as_str(),
191 ),
192 );
193 resp.headers_mut().insert(
194 &GREPTIME_DB_HEADER_EXECUTION_TIME,
195 HeaderValue::from(execution_time),
196 );
197 resp
198 }
199}
200
201#[cfg(test)]
202mod tests {
203 use std::sync::Arc;
204
205 use common_query::Output;
206 use common_recordbatch::{RecordBatch, RecordBatches};
207 use datatypes::prelude::{ConcreteDataType, ScalarVector};
208 use datatypes::schema::{ColumnSchema, Schema};
209 use datatypes::vectors::{BinaryVector, Float32Vector, StringVector, UInt32Vector, VectorRef};
210
211 use super::*;
212 #[tokio::test]
213 async fn test_csv_response_with_names_and_types() {
214 let (schema, columns) = create_test_data();
215
216 let data = r#"1,,-1000.1400146484375,"{""a"":{""b"":2},""b"":2,""c"":3}"
2172,hello,1.9900000095367432,"{""a"":4,""b"":{""c"":6},""c"":6}""#
218 .replace("\n", "\r\n");
219
220 {
222 let body = get_csv_body(&schema, &columns, true, true).await;
223 assert!(body.starts_with("col1,col2,col3,col4\r\nUInt32,String,Float32,Json\r\n"));
224 assert!(body.contains(&data));
225 }
226
227 {
229 let body = get_csv_body(&schema, &columns, true, false).await;
230 assert!(body.starts_with("col1,col2,col3,col4\r\n"));
231 assert!(!body.contains("UInt32,String,Float32,Json"));
232 assert!(body.contains(&data));
233 }
234
235 {
237 let body = get_csv_body(&schema, &columns, false, false).await;
238 assert!(!body.starts_with("col1,col2,col3,col4"));
239 assert!(!body.contains("UInt32,String,Float32,Json"));
240 assert!(body.contains(&data));
241 }
242 }
243
244 fn create_test_data() -> (Arc<Schema>, Vec<VectorRef>) {
245 let column_schemas = vec![
246 ColumnSchema::new("col1", ConcreteDataType::uint32_datatype(), false),
247 ColumnSchema::new("col2", ConcreteDataType::string_datatype(), true),
248 ColumnSchema::new("col3", ConcreteDataType::float32_datatype(), true),
249 ColumnSchema::new("col4", ConcreteDataType::json_datatype(), true),
250 ];
251 let schema = Arc::new(Schema::new(column_schemas));
252
253 let json_strings = [
254 r#"{"a": {"b": 2}, "b": 2, "c": 3}"#,
255 r#"{"a": 4, "b": {"c": 6}, "c": 6}"#,
256 ];
257
258 let jsonbs = json_strings
259 .iter()
260 .map(|s| {
261 let value = jsonb::parse_value(s.as_bytes()).unwrap();
262 value.to_vec()
263 })
264 .collect::<Vec<_>>();
265
266 let columns: Vec<VectorRef> = vec![
267 Arc::new(UInt32Vector::from_slice(vec![1, 2])),
268 Arc::new(StringVector::from(vec![None, Some("hello")])),
269 Arc::new(Float32Vector::from_slice(vec![-1000.14, 1.99])),
270 Arc::new(BinaryVector::from_vec(jsonbs)),
271 ];
272
273 (schema, columns)
274 }
275
276 async fn get_csv_body(
277 schema: &Arc<Schema>,
278 columns: &[VectorRef],
279 with_names: bool,
280 with_types: bool,
281 ) -> String {
282 let recordbatch = RecordBatch::new(schema.clone(), columns.to_vec()).unwrap();
283 let recordbatches = RecordBatches::try_new(schema.clone(), vec![recordbatch]).unwrap();
284 let output = Output::new_with_record_batches(recordbatches);
285 let outputs = vec![Ok(output)];
286
287 let resp = CsvResponse::from_output(outputs, with_names, with_types)
288 .await
289 .into_response();
290 let bytes = axum::body::to_bytes(resp.into_body(), 1024).await.unwrap();
291 String::from_utf8(bytes.to_vec()).unwrap()
292 }
293}