servers/http/result/
csv_result.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use axum::http::{header, HeaderValue};
16use axum::response::{IntoResponse, Response};
17use common_error::status_code::StatusCode;
18use common_query::Output;
19use mime_guess::mime;
20use serde::{Deserialize, Serialize};
21use serde_json::Value as JsonValue;
22
23use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT};
24use crate::http::result::error_result::ErrorResponse;
25use crate::http::{handler, process_with_limit, GreptimeQueryOutput, HttpResponse, ResponseFormat};
26
27#[derive(Serialize, Deserialize, Debug)]
28pub struct CsvResponse {
29    output: Vec<GreptimeQueryOutput>,
30    execution_time_ms: u64,
31    with_names: bool,
32    with_types: bool,
33}
34
35impl CsvResponse {
36    pub async fn from_output(
37        outputs: Vec<crate::error::Result<Output>>,
38        with_names: bool,
39        with_types: bool,
40    ) -> HttpResponse {
41        match handler::from_output(outputs).await {
42            Err(err) => HttpResponse::Error(err),
43            Ok((output, _)) => {
44                if output.len() > 1 {
45                    HttpResponse::Error(ErrorResponse::from_error_message(
46                        StatusCode::InvalidArguments,
47                        "cannot output multi-statements result in csv format".to_string(),
48                    ))
49                } else {
50                    let csv_resp = CsvResponse {
51                        output,
52                        execution_time_ms: 0,
53                        with_names: false,
54                        with_types: false,
55                    };
56
57                    HttpResponse::Csv(csv_resp.with_names(with_names).with_types(with_types))
58                }
59            }
60        }
61    }
62
63    pub fn output(&self) -> &[GreptimeQueryOutput] {
64        &self.output
65    }
66
67    pub fn with_execution_time(mut self, execution_time: u64) -> Self {
68        self.execution_time_ms = execution_time;
69        self
70    }
71
72    pub fn execution_time_ms(&self) -> u64 {
73        self.execution_time_ms
74    }
75
76    pub fn with_limit(mut self, limit: usize) -> Self {
77        self.output = process_with_limit(self.output, limit);
78        self
79    }
80
81    pub fn with_names(mut self, with_names: bool) -> Self {
82        self.with_names = with_names;
83        self
84    }
85
86    pub fn with_types(mut self, with_types: bool) -> Self {
87        self.with_types = with_types;
88
89        // If `with_type` is true, than always set `with_names` to be true.
90        if with_types {
91            self.with_names = true;
92        }
93        self
94    }
95}
96
97macro_rules! http_try {
98    ($handle: expr) => {
99        match $handle {
100            Ok(res) => res,
101            Err(err) => {
102                let msg = err.to_string();
103                return HttpResponse::Error(ErrorResponse::from_error_message(
104                    StatusCode::Unexpected,
105                    msg,
106                ))
107                .into_response();
108            }
109        }
110    };
111}
112
113impl IntoResponse for CsvResponse {
114    fn into_response(mut self) -> Response {
115        debug_assert!(
116            self.output.len() <= 1,
117            "self.output has extra elements: {}",
118            self.output.len()
119        );
120
121        let execution_time = self.execution_time_ms;
122        let payload = match self.output.pop() {
123            None => String::default(),
124            Some(GreptimeQueryOutput::AffectedRows(n)) => {
125                format!("{n}\n")
126            }
127            Some(GreptimeQueryOutput::Records(records)) => {
128                let mut wtr = csv::WriterBuilder::new()
129                    .terminator(csv::Terminator::CRLF) // RFC 4180
130                    .from_writer(Vec::new());
131
132                if self.with_names {
133                    let names = records
134                        .schema
135                        .column_schemas
136                        .iter()
137                        .map(|c| &c.name)
138                        .collect::<Vec<_>>();
139                    http_try!(wtr.serialize(names));
140                }
141
142                if self.with_types {
143                    let types = records
144                        .schema
145                        .column_schemas
146                        .iter()
147                        .map(|c| &c.data_type)
148                        .collect::<Vec<_>>();
149                    http_try!(wtr.serialize(types));
150                }
151
152                for row in records.rows {
153                    let row = row
154                        .into_iter()
155                        .map(|value| {
156                            match value {
157                                // Cast array and object to string
158                                JsonValue::Array(a) => {
159                                    JsonValue::String(serde_json::to_string(&a).unwrap_or_default())
160                                }
161                                JsonValue::Object(o) => {
162                                    JsonValue::String(serde_json::to_string(&o).unwrap_or_default())
163                                }
164                                v => v,
165                            }
166                        })
167                        .collect::<Vec<_>>();
168
169                    http_try!(wtr.serialize(row));
170                }
171
172                http_try!(wtr.flush());
173
174                let bytes = http_try!(wtr.into_inner());
175                http_try!(String::from_utf8(bytes))
176            }
177        };
178
179        let mut resp = (
180            [(
181                header::CONTENT_TYPE,
182                HeaderValue::from_static(mime::TEXT_CSV_UTF_8.as_ref()),
183            )],
184            payload,
185        )
186            .into_response();
187        resp.headers_mut().insert(
188            &GREPTIME_DB_HEADER_FORMAT,
189            HeaderValue::from_static(
190                ResponseFormat::Csv(self.with_names, self.with_types).as_str(),
191            ),
192        );
193        resp.headers_mut().insert(
194            &GREPTIME_DB_HEADER_EXECUTION_TIME,
195            HeaderValue::from(execution_time),
196        );
197        resp
198    }
199}
200
201#[cfg(test)]
202mod tests {
203    use std::sync::Arc;
204
205    use common_query::Output;
206    use common_recordbatch::{RecordBatch, RecordBatches};
207    use datatypes::prelude::{ConcreteDataType, ScalarVector};
208    use datatypes::schema::{ColumnSchema, Schema};
209    use datatypes::vectors::{BinaryVector, Float32Vector, StringVector, UInt32Vector, VectorRef};
210
211    use super::*;
212    #[tokio::test]
213    async fn test_csv_response_with_names_and_types() {
214        let (schema, columns) = create_test_data();
215
216        let data = r#"1,,-1000.1400146484375,"{""a"":{""b"":2},""b"":2,""c"":3}"
2172,hello,1.9900000095367432,"{""a"":4,""b"":{""c"":6},""c"":6}""#
218            .replace("\n", "\r\n");
219
220        // Test with_names=true, with_types=true
221        {
222            let body = get_csv_body(&schema, &columns, true, true).await;
223            assert!(body.starts_with("col1,col2,col3,col4\r\nUInt32,String,Float32,Json\r\n"));
224            assert!(body.contains(&data));
225        }
226
227        // Test with_names=true, with_types=false
228        {
229            let body = get_csv_body(&schema, &columns, true, false).await;
230            assert!(body.starts_with("col1,col2,col3,col4\r\n"));
231            assert!(!body.contains("UInt32,String,Float32,Json"));
232            assert!(body.contains(&data));
233        }
234
235        // Test with_names=false, with_types=false
236        {
237            let body = get_csv_body(&schema, &columns, false, false).await;
238            assert!(!body.starts_with("col1,col2,col3,col4"));
239            assert!(!body.contains("UInt32,String,Float32,Json"));
240            assert!(body.contains(&data));
241        }
242    }
243
244    fn create_test_data() -> (Arc<Schema>, Vec<VectorRef>) {
245        let column_schemas = vec![
246            ColumnSchema::new("col1", ConcreteDataType::uint32_datatype(), false),
247            ColumnSchema::new("col2", ConcreteDataType::string_datatype(), true),
248            ColumnSchema::new("col3", ConcreteDataType::float32_datatype(), true),
249            ColumnSchema::new("col4", ConcreteDataType::json_datatype(), true),
250        ];
251        let schema = Arc::new(Schema::new(column_schemas));
252
253        let json_strings = [
254            r#"{"a": {"b": 2}, "b": 2, "c": 3}"#,
255            r#"{"a": 4, "b": {"c": 6}, "c": 6}"#,
256        ];
257
258        let jsonbs = json_strings
259            .iter()
260            .map(|s| {
261                let value = jsonb::parse_value(s.as_bytes()).unwrap();
262                value.to_vec()
263            })
264            .collect::<Vec<_>>();
265
266        let columns: Vec<VectorRef> = vec![
267            Arc::new(UInt32Vector::from_slice(vec![1, 2])),
268            Arc::new(StringVector::from(vec![None, Some("hello")])),
269            Arc::new(Float32Vector::from_slice(vec![-1000.14, 1.99])),
270            Arc::new(BinaryVector::from_vec(jsonbs)),
271        ];
272
273        (schema, columns)
274    }
275
276    async fn get_csv_body(
277        schema: &Arc<Schema>,
278        columns: &[VectorRef],
279        with_names: bool,
280        with_types: bool,
281    ) -> String {
282        let recordbatch = RecordBatch::new(schema.clone(), columns.to_vec()).unwrap();
283        let recordbatches = RecordBatches::try_new(schema.clone(), vec![recordbatch]).unwrap();
284        let output = Output::new_with_record_batches(recordbatches);
285        let outputs = vec![Ok(output)];
286
287        let resp = CsvResponse::from_output(outputs, with_names, with_types)
288            .await
289            .into_response();
290        let bytes = axum::body::to_bytes(resp.into_body(), 1024).await.unwrap();
291        String::from_utf8(bytes.to_vec()).unwrap()
292    }
293}