servers/http/result/
csv_result.rs1use axum::http::{header, HeaderValue};
16use axum::response::{IntoResponse, Response};
17use common_error::status_code::StatusCode;
18use common_query::Output;
19use mime_guess::mime;
20use serde::{Deserialize, Serialize};
21
22use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT};
23use crate::http::result::error_result::ErrorResponse;
25use crate::http::{handler, process_with_limit, GreptimeQueryOutput, HttpResponse, ResponseFormat};
26
27#[derive(Serialize, Deserialize, Debug)]
28pub struct CsvResponse {
29 output: Vec<GreptimeQueryOutput>,
30 execution_time_ms: u64,
31}
32
33impl CsvResponse {
34 pub async fn from_output(outputs: Vec<crate::error::Result<Output>>) -> HttpResponse {
35 match handler::from_output(outputs).await {
36 Err(err) => HttpResponse::Error(err),
37 Ok((output, _)) => {
38 if output.len() > 1 {
39 HttpResponse::Error(ErrorResponse::from_error_message(
40 StatusCode::InvalidArguments,
41 "cannot output multi-statements result in csv format".to_string(),
42 ))
43 } else {
44 HttpResponse::Csv(CsvResponse {
45 output,
46 execution_time_ms: 0,
47 })
48 }
49 }
50 }
51 }
52
53 pub fn output(&self) -> &[GreptimeQueryOutput] {
54 &self.output
55 }
56
57 pub fn with_execution_time(mut self, execution_time: u64) -> Self {
58 self.execution_time_ms = execution_time;
59 self
60 }
61
62 pub fn execution_time_ms(&self) -> u64 {
63 self.execution_time_ms
64 }
65
66 pub fn with_limit(mut self, limit: usize) -> Self {
67 self.output = process_with_limit(self.output, limit);
68 self
69 }
70}
71
72macro_rules! http_try {
73 ($handle: expr) => {
74 match $handle {
75 Ok(res) => res,
76 Err(err) => {
77 let msg = err.to_string();
78 return HttpResponse::Error(ErrorResponse::from_error_message(
79 StatusCode::Unexpected,
80 msg,
81 ))
82 .into_response();
83 }
84 }
85 };
86}
87
88impl IntoResponse for CsvResponse {
89 fn into_response(mut self) -> Response {
90 debug_assert!(
91 self.output.len() <= 1,
92 "self.output has extra elements: {}",
93 self.output.len()
94 );
95
96 let execution_time = self.execution_time_ms;
97 let payload = match self.output.pop() {
98 None => String::default(),
99 Some(GreptimeQueryOutput::AffectedRows(n)) => {
100 format!("{n}\n")
101 }
102 Some(GreptimeQueryOutput::Records(records)) => {
103 let mut wtr = csv::Writer::from_writer(Vec::new());
104
105 for row in records.rows {
106 http_try!(wtr.serialize(row));
107 }
108 http_try!(wtr.flush());
109
110 let bytes = http_try!(wtr.into_inner());
111 http_try!(String::from_utf8(bytes))
112 }
113 };
114
115 let mut resp = (
116 [(
117 header::CONTENT_TYPE,
118 HeaderValue::from_static(mime::TEXT_CSV_UTF_8.as_ref()),
119 )],
120 payload,
121 )
122 .into_response();
123 resp.headers_mut().insert(
124 &GREPTIME_DB_HEADER_FORMAT,
125 HeaderValue::from_static(ResponseFormat::Csv.as_str()),
126 );
127 resp.headers_mut().insert(
128 &GREPTIME_DB_HEADER_EXECUTION_TIME,
129 HeaderValue::from(execution_time),
130 );
131 resp
132 }
133}