servers/http/
result.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use arrow::array::AsArray;
16use arrow::datatypes::{
17    Date32Type, Date64Type, Decimal128Type, Float32Type, Float64Type, Int8Type, Int16Type,
18    Int32Type, Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType,
19    UInt8Type, UInt16Type, UInt32Type, UInt64Type,
20};
21use arrow_schema::{DataType, IntervalUnit};
22use common_decimal::Decimal128;
23use common_recordbatch::RecordBatch;
24use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
25use datafusion_common::ScalarValue;
26use datatypes::data_type::ConcreteDataType;
27use datatypes::value::Value;
28use snafu::ResultExt;
29
30use crate::error::{
31    ConvertScalarValueSnafu, DataFusionSnafu, NotSupportedSnafu, Result, ToJsonSnafu,
32    UnexpectedResultSnafu,
33};
34
35pub(crate) mod arrow_result;
36pub(crate) mod csv_result;
37pub mod error_result;
38pub(crate) mod greptime_manage_resp;
39pub mod greptime_result_v1;
40pub mod influxdb_result_v1;
41pub(crate) mod json_result;
42pub(crate) mod null_result;
43pub(crate) mod prometheus_resp;
44pub(crate) mod table_result;
45
46pub struct HttpOutputWriter {
47    columns: usize,
48    value_transformer: Option<Box<dyn Fn(Value) -> Value>>,
49    current: Option<Vec<serde_json::Value>>,
50}
51
52impl HttpOutputWriter {
53    pub fn new(columns: usize, value_transformer: Option<Box<dyn Fn(Value) -> Value>>) -> Self {
54        Self {
55            columns,
56            value_transformer,
57            current: None,
58        }
59    }
60
61    fn write_bytes(&mut self, bytes: &[u8], datatype: &ConcreteDataType) -> Result<()> {
62        if datatype.is_json() {
63            let value = datatypes::types::jsonb_to_serde_json(bytes).map_err(|e| {
64                UnexpectedResultSnafu {
65                    reason: format!("corrupted jsonb data: {bytes:?}, error: {e}"),
66                }
67                .build()
68            })?;
69            self.push(value);
70            Ok(())
71        } else {
72            self.write_value(bytes)
73        }
74    }
75
76    fn write_value(&mut self, value: impl Into<Value>) -> Result<()> {
77        let value = value.into();
78
79        let value = if let Some(f) = &self.value_transformer {
80            f(value)
81        } else {
82            value
83        };
84
85        let value = serde_json::Value::try_from(value).context(ToJsonSnafu)?;
86        self.push(value);
87        Ok(())
88    }
89
90    fn push(&mut self, value: serde_json::Value) {
91        let current = self
92            .current
93            .get_or_insert_with(|| Vec::with_capacity(self.columns));
94        current.push(value);
95    }
96
97    fn finish(&mut self) -> Vec<serde_json::Value> {
98        self.current.take().unwrap_or_default()
99    }
100
101    pub fn write(
102        &mut self,
103        record_batch: RecordBatch,
104        rows: &mut Vec<Vec<serde_json::Value>>,
105    ) -> Result<()> {
106        let schema = record_batch.schema.clone();
107        let record_batch = record_batch.into_df_record_batch();
108        for i in 0..record_batch.num_rows() {
109            for (schema, array) in schema
110                .column_schemas()
111                .iter()
112                .zip(record_batch.columns().iter())
113            {
114                if array.is_null(i) {
115                    self.write_value(Value::Null)?;
116                    continue;
117                }
118
119                match array.data_type() {
120                    DataType::Null => {
121                        self.write_value(Value::Null)?;
122                    }
123                    DataType::Boolean => {
124                        let array = array.as_boolean();
125                        let v = array.value(i);
126                        self.write_value(v)?;
127                    }
128                    DataType::UInt8 => {
129                        let array = array.as_primitive::<UInt8Type>();
130                        let v = array.value(i);
131                        self.write_value(v)?;
132                    }
133                    DataType::UInt16 => {
134                        let array = array.as_primitive::<UInt16Type>();
135                        let v = array.value(i);
136                        self.write_value(v)?;
137                    }
138                    DataType::UInt32 => {
139                        let array = array.as_primitive::<UInt32Type>();
140                        let v = array.value(i);
141                        self.write_value(v)?;
142                    }
143                    DataType::UInt64 => {
144                        let array = array.as_primitive::<UInt64Type>();
145                        let v = array.value(i);
146                        self.write_value(v)?;
147                    }
148                    DataType::Int8 => {
149                        let array = array.as_primitive::<Int8Type>();
150                        let v = array.value(i);
151                        self.write_value(v)?;
152                    }
153                    DataType::Int16 => {
154                        let array = array.as_primitive::<Int16Type>();
155                        let v = array.value(i);
156                        self.write_value(v)?;
157                    }
158                    DataType::Int32 => {
159                        let array = array.as_primitive::<Int32Type>();
160                        let v = array.value(i);
161                        self.write_value(v)?;
162                    }
163                    DataType::Int64 => {
164                        let array = array.as_primitive::<Int64Type>();
165                        let v = array.value(i);
166                        self.write_value(v)?;
167                    }
168                    DataType::Float32 => {
169                        let array = array.as_primitive::<Float32Type>();
170                        let v = array.value(i);
171                        self.write_value(v)?;
172                    }
173                    DataType::Float64 => {
174                        let array = array.as_primitive::<Float64Type>();
175                        let v = array.value(i);
176                        self.write_value(v)?;
177                    }
178                    DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
179                        let v = datatypes::arrow_array::string_array_value(array, i);
180                        self.write_value(v)?;
181                    }
182                    DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
183                        let v = datatypes::arrow_array::binary_array_value(array, i);
184                        self.write_bytes(v, &schema.data_type)?;
185                    }
186                    DataType::Date32 => {
187                        let array = array.as_primitive::<Date32Type>();
188                        let v = Date::new(array.value(i));
189                        self.write_value(v)?;
190                    }
191                    DataType::Date64 => {
192                        let array = array.as_primitive::<Date64Type>();
193                        // `Date64` values are milliseconds representation of `Date32` values,
194                        // according to its specification. So we convert the `Date64` value here to
195                        // the `Date32` value to process them unified.
196                        let v = Date::new((array.value(i) / 86_400_000) as i32);
197                        self.write_value(v)?;
198                    }
199                    DataType::Timestamp(_, _) => {
200                        let ts = datatypes::arrow_array::timestamp_array_value(array, i);
201                        self.write_value(ts)?;
202                    }
203                    DataType::Time32(_) | DataType::Time64(_) => {
204                        let v = datatypes::arrow_array::time_array_value(array, i);
205                        self.write_value(v)?;
206                    }
207                    DataType::Interval(interval_unit) => match interval_unit {
208                        IntervalUnit::YearMonth => {
209                            let array = array.as_primitive::<IntervalYearMonthType>();
210                            let v: IntervalYearMonth = array.value(i).into();
211                            self.write_value(v)?;
212                        }
213                        IntervalUnit::DayTime => {
214                            let array = array.as_primitive::<IntervalDayTimeType>();
215                            let v: IntervalDayTime = array.value(i).into();
216                            self.write_value(v)?;
217                        }
218                        IntervalUnit::MonthDayNano => {
219                            let array = array.as_primitive::<IntervalMonthDayNanoType>();
220                            let v: IntervalMonthDayNano = array.value(i).into();
221                            self.write_value(v)?;
222                        }
223                    },
224                    DataType::Duration(_) => {
225                        let d = datatypes::arrow_array::duration_array_value(array, i);
226                        self.write_value(d)?;
227                    }
228                    DataType::List(_) => {
229                        let v = ScalarValue::try_from_array(array, i).context(DataFusionSnafu)?;
230                        let v: Value = v.try_into().context(ConvertScalarValueSnafu)?;
231                        self.write_value(v)?;
232                    }
233                    DataType::Struct(_) => {
234                        let v = ScalarValue::try_from_array(array, i).context(DataFusionSnafu)?;
235                        let v: Value = v.try_into().context(ConvertScalarValueSnafu)?;
236                        self.write_value(v)?;
237                    }
238                    DataType::Decimal128(precision, scale) => {
239                        let array = array.as_primitive::<Decimal128Type>();
240                        let v = Decimal128::new(array.value(i), *precision, *scale);
241                        self.write_value(v)?;
242                    }
243                    _ => {
244                        return NotSupportedSnafu {
245                            feat: format!("convert {} to http output value", array.data_type()),
246                        }
247                        .fail();
248                    }
249                }
250            }
251
252            rows.push(self.finish())
253        }
254        Ok(())
255    }
256}