servers/http/result/
csv_result.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use axum::http::{header, HeaderValue};
16use axum::response::{IntoResponse, Response};
17use common_error::status_code::StatusCode;
18use common_query::Output;
19use mime_guess::mime;
20use serde::{Deserialize, Serialize};
21
22use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT};
23// use super::process_with_limit;
24use crate::http::result::error_result::ErrorResponse;
25use crate::http::{handler, process_with_limit, GreptimeQueryOutput, HttpResponse, ResponseFormat};
26
27#[derive(Serialize, Deserialize, Debug)]
28pub struct CsvResponse {
29    output: Vec<GreptimeQueryOutput>,
30    execution_time_ms: u64,
31}
32
33impl CsvResponse {
34    pub async fn from_output(outputs: Vec<crate::error::Result<Output>>) -> HttpResponse {
35        match handler::from_output(outputs).await {
36            Err(err) => HttpResponse::Error(err),
37            Ok((output, _)) => {
38                if output.len() > 1 {
39                    HttpResponse::Error(ErrorResponse::from_error_message(
40                        StatusCode::InvalidArguments,
41                        "cannot output multi-statements result in csv format".to_string(),
42                    ))
43                } else {
44                    HttpResponse::Csv(CsvResponse {
45                        output,
46                        execution_time_ms: 0,
47                    })
48                }
49            }
50        }
51    }
52
53    pub fn output(&self) -> &[GreptimeQueryOutput] {
54        &self.output
55    }
56
57    pub fn with_execution_time(mut self, execution_time: u64) -> Self {
58        self.execution_time_ms = execution_time;
59        self
60    }
61
62    pub fn execution_time_ms(&self) -> u64 {
63        self.execution_time_ms
64    }
65
66    pub fn with_limit(mut self, limit: usize) -> Self {
67        self.output = process_with_limit(self.output, limit);
68        self
69    }
70}
71
72macro_rules! http_try {
73    ($handle: expr) => {
74        match $handle {
75            Ok(res) => res,
76            Err(err) => {
77                let msg = err.to_string();
78                return HttpResponse::Error(ErrorResponse::from_error_message(
79                    StatusCode::Unexpected,
80                    msg,
81                ))
82                .into_response();
83            }
84        }
85    };
86}
87
88impl IntoResponse for CsvResponse {
89    fn into_response(mut self) -> Response {
90        debug_assert!(
91            self.output.len() <= 1,
92            "self.output has extra elements: {}",
93            self.output.len()
94        );
95
96        let execution_time = self.execution_time_ms;
97        let payload = match self.output.pop() {
98            None => String::default(),
99            Some(GreptimeQueryOutput::AffectedRows(n)) => {
100                format!("{n}\n")
101            }
102            Some(GreptimeQueryOutput::Records(records)) => {
103                let mut wtr = csv::Writer::from_writer(Vec::new());
104
105                for row in records.rows {
106                    http_try!(wtr.serialize(row));
107                }
108                http_try!(wtr.flush());
109
110                let bytes = http_try!(wtr.into_inner());
111                http_try!(String::from_utf8(bytes))
112            }
113        };
114
115        let mut resp = (
116            [(
117                header::CONTENT_TYPE,
118                HeaderValue::from_static(mime::TEXT_CSV_UTF_8.as_ref()),
119            )],
120            payload,
121        )
122            .into_response();
123        resp.headers_mut().insert(
124            &GREPTIME_DB_HEADER_FORMAT,
125            HeaderValue::from_static(ResponseFormat::Csv.as_str()),
126        );
127        resp.headers_mut().insert(
128            &GREPTIME_DB_HEADER_EXECUTION_TIME,
129            HeaderValue::from(execution_time),
130        );
131        resp
132    }
133}