servers/http/
result.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use arrow::array::AsArray;
16use arrow::datatypes::{
17    Date32Type, Date64Type, Decimal128Type, Float32Type, Float64Type, Int8Type, Int16Type,
18    Int32Type, Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType,
19    UInt8Type, UInt16Type, UInt32Type, UInt64Type,
20};
21use arrow_schema::{DataType, IntervalUnit};
22use common_decimal::Decimal128;
23use common_recordbatch::RecordBatch;
24use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
25use datafusion_common::ScalarValue;
26use datatypes::data_type::ConcreteDataType;
27use datatypes::value::Value;
28use snafu::ResultExt;
29
30use crate::error::{
31    ConvertScalarValueSnafu, DataFusionSnafu, NotSupportedSnafu, Result, ToJsonSnafu,
32    UnexpectedResultSnafu,
33};
34
35pub(crate) mod arrow_result;
36pub(crate) mod csv_result;
37pub mod error_result;
38pub(crate) mod greptime_manage_resp;
39pub mod greptime_result_v1;
40pub mod influxdb_result_v1;
41pub(crate) mod json_result;
42pub(crate) mod null_result;
43pub(crate) mod prometheus_resp;
44pub(crate) mod table_result;
45
46pub struct HttpOutputWriter {
47    columns: usize,
48    value_transformer: Option<Box<dyn Fn(Value) -> Value>>,
49    current: Option<Vec<serde_json::Value>>,
50}
51
52impl HttpOutputWriter {
53    pub fn new(columns: usize, value_transformer: Option<Box<dyn Fn(Value) -> Value>>) -> Self {
54        Self {
55            columns,
56            value_transformer,
57            current: None,
58        }
59    }
60
61    fn write_bytes(&mut self, bytes: &[u8], datatype: &ConcreteDataType) -> Result<()> {
62        if datatype.is_json() {
63            let value = datatypes::types::jsonb_to_serde_json(bytes).map_err(|e| {
64                UnexpectedResultSnafu {
65                    reason: format!("corrupted jsonb data: {bytes:?}, error: {e}"),
66                }
67                .build()
68            })?;
69            self.push(value);
70            Ok(())
71        } else {
72            self.write_value(bytes)
73        }
74    }
75
76    fn write_value(&mut self, value: impl Into<Value>) -> Result<()> {
77        let value = value.into();
78
79        let value = if let Some(f) = &self.value_transformer {
80            f(value)
81        } else {
82            value
83        };
84
85        let value = serde_json::Value::try_from(value).context(ToJsonSnafu)?;
86        self.push(value);
87        Ok(())
88    }
89
90    fn push(&mut self, value: serde_json::Value) {
91        let current = self
92            .current
93            .get_or_insert_with(|| Vec::with_capacity(self.columns));
94        current.push(value);
95    }
96
97    fn finish(&mut self) -> Vec<serde_json::Value> {
98        self.current.take().unwrap_or_default()
99    }
100
101    pub fn write(
102        &mut self,
103        record_batch: RecordBatch,
104        rows: &mut Vec<Vec<serde_json::Value>>,
105    ) -> Result<()> {
106        let schema = record_batch.schema.clone();
107        let record_batch = record_batch.into_df_record_batch();
108        for i in 0..record_batch.num_rows() {
109            for (schema, array) in schema
110                .column_schemas()
111                .iter()
112                .zip(record_batch.columns().iter())
113            {
114                if array.is_null(i) {
115                    self.write_value(Value::Null)?;
116                    continue;
117                }
118
119                match array.data_type() {
120                    DataType::Null => {
121                        self.write_value(Value::Null)?;
122                    }
123                    DataType::Boolean => {
124                        let array = array.as_boolean();
125                        let v = array.value(i);
126                        self.write_value(v)?;
127                    }
128                    DataType::UInt8 => {
129                        let array = array.as_primitive::<UInt8Type>();
130                        let v = array.value(i);
131                        self.write_value(v)?;
132                    }
133                    DataType::UInt16 => {
134                        let array = array.as_primitive::<UInt16Type>();
135                        let v = array.value(i);
136                        self.write_value(v)?;
137                    }
138                    DataType::UInt32 => {
139                        let array = array.as_primitive::<UInt32Type>();
140                        let v = array.value(i);
141                        self.write_value(v)?;
142                    }
143                    DataType::UInt64 => {
144                        let array = array.as_primitive::<UInt64Type>();
145                        let v = array.value(i);
146                        self.write_value(v)?;
147                    }
148                    DataType::Int8 => {
149                        let array = array.as_primitive::<Int8Type>();
150                        let v = array.value(i);
151                        self.write_value(v)?;
152                    }
153                    DataType::Int16 => {
154                        let array = array.as_primitive::<Int16Type>();
155                        let v = array.value(i);
156                        self.write_value(v)?;
157                    }
158                    DataType::Int32 => {
159                        let array = array.as_primitive::<Int32Type>();
160                        let v = array.value(i);
161                        self.write_value(v)?;
162                    }
163                    DataType::Int64 => {
164                        let array = array.as_primitive::<Int64Type>();
165                        let v = array.value(i);
166                        self.write_value(v)?;
167                    }
168                    DataType::Float32 => {
169                        let array = array.as_primitive::<Float32Type>();
170                        let v = array.value(i);
171                        self.write_value(v)?;
172                    }
173                    DataType::Float64 => {
174                        let array = array.as_primitive::<Float64Type>();
175                        let v = array.value(i);
176                        self.write_value(v)?;
177                    }
178                    DataType::Utf8 => {
179                        let array = array.as_string::<i32>();
180                        let v = array.value(i);
181                        self.write_value(v)?;
182                    }
183                    DataType::LargeUtf8 => {
184                        let array = array.as_string::<i64>();
185                        let v = array.value(i);
186                        self.write_value(v)?;
187                    }
188                    DataType::Utf8View => {
189                        let array = array.as_string_view();
190                        let v = array.value(i);
191                        self.write_value(v)?;
192                    }
193                    DataType::Binary => {
194                        let array = array.as_binary::<i32>();
195                        let v = array.value(i);
196                        self.write_bytes(v, &schema.data_type)?;
197                    }
198                    DataType::LargeBinary => {
199                        let array = array.as_binary::<i64>();
200                        let v = array.value(i);
201                        self.write_bytes(v, &schema.data_type)?;
202                    }
203                    DataType::BinaryView => {
204                        let array = array.as_binary_view();
205                        let v = array.value(i);
206                        self.write_bytes(v, &schema.data_type)?;
207                    }
208                    DataType::Date32 => {
209                        let array = array.as_primitive::<Date32Type>();
210                        let v = Date::new(array.value(i));
211                        self.write_value(v)?;
212                    }
213                    DataType::Date64 => {
214                        let array = array.as_primitive::<Date64Type>();
215                        // `Date64` values are milliseconds representation of `Date32` values,
216                        // according to its specification. So we convert the `Date64` value here to
217                        // the `Date32` value to process them unified.
218                        let v = Date::new((array.value(i) / 86_400_000) as i32);
219                        self.write_value(v)?;
220                    }
221                    DataType::Timestamp(_, _) => {
222                        let ts = datatypes::arrow_array::timestamp_array_value(array, i);
223                        self.write_value(ts)?;
224                    }
225                    DataType::Time32(_) | DataType::Time64(_) => {
226                        let v = datatypes::arrow_array::time_array_value(array, i);
227                        self.write_value(v)?;
228                    }
229                    DataType::Interval(interval_unit) => match interval_unit {
230                        IntervalUnit::YearMonth => {
231                            let array = array.as_primitive::<IntervalYearMonthType>();
232                            let v: IntervalYearMonth = array.value(i).into();
233                            self.write_value(v)?;
234                        }
235                        IntervalUnit::DayTime => {
236                            let array = array.as_primitive::<IntervalDayTimeType>();
237                            let v: IntervalDayTime = array.value(i).into();
238                            self.write_value(v)?;
239                        }
240                        IntervalUnit::MonthDayNano => {
241                            let array = array.as_primitive::<IntervalMonthDayNanoType>();
242                            let v: IntervalMonthDayNano = array.value(i).into();
243                            self.write_value(v)?;
244                        }
245                    },
246                    DataType::Duration(_) => {
247                        let d = datatypes::arrow_array::duration_array_value(array, i);
248                        self.write_value(d)?;
249                    }
250                    DataType::List(_) => {
251                        let v = ScalarValue::try_from_array(array, i).context(DataFusionSnafu)?;
252                        let v: Value = v.try_into().context(ConvertScalarValueSnafu)?;
253                        self.write_value(v)?;
254                    }
255                    DataType::Struct(_) => {
256                        let v = ScalarValue::try_from_array(array, i).context(DataFusionSnafu)?;
257                        let v: Value = v.try_into().context(ConvertScalarValueSnafu)?;
258                        self.write_value(v)?;
259                    }
260                    DataType::Decimal128(precision, scale) => {
261                        let array = array.as_primitive::<Decimal128Type>();
262                        let v = Decimal128::new(array.value(i), *precision, *scale);
263                        self.write_value(v)?;
264                    }
265                    _ => {
266                        return NotSupportedSnafu {
267                            feat: format!("convert {} to http output value", array.data_type()),
268                        }
269                        .fail();
270                    }
271                }
272            }
273
274            rows.push(self.finish())
275        }
276        Ok(())
277    }
278}