1use arrow::array::AsArray;
16use arrow::datatypes::{
17 Date32Type, Date64Type, Decimal128Type, Float32Type, Float64Type, Int8Type, Int16Type,
18 Int32Type, Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType,
19 UInt8Type, UInt16Type, UInt32Type, UInt64Type,
20};
21use arrow_schema::{DataType, IntervalUnit};
22use common_decimal::Decimal128;
23use common_recordbatch::RecordBatch;
24use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
25use datafusion_common::ScalarValue;
26use datatypes::data_type::ConcreteDataType;
27use datatypes::value::Value;
28use snafu::ResultExt;
29
30use crate::error::{
31 ConvertScalarValueSnafu, DataFusionSnafu, NotSupportedSnafu, Result, ToJsonSnafu,
32 UnexpectedResultSnafu,
33};
34
35pub(crate) mod arrow_result;
36pub(crate) mod csv_result;
37pub mod error_result;
38pub(crate) mod greptime_manage_resp;
39pub mod greptime_result_v1;
40pub mod influxdb_result_v1;
41pub(crate) mod json_result;
42pub(crate) mod null_result;
43pub(crate) mod prometheus_resp;
44pub(crate) mod table_result;
45
46pub struct HttpOutputWriter {
47 columns: usize,
48 value_transformer: Option<Box<dyn Fn(Value) -> Value>>,
49 current: Option<Vec<serde_json::Value>>,
50}
51
52impl HttpOutputWriter {
53 pub fn new(columns: usize, value_transformer: Option<Box<dyn Fn(Value) -> Value>>) -> Self {
54 Self {
55 columns,
56 value_transformer,
57 current: None,
58 }
59 }
60
61 fn write_bytes(&mut self, bytes: &[u8], datatype: &ConcreteDataType) -> Result<()> {
62 if datatype.is_json() {
63 let value = datatypes::types::jsonb_to_serde_json(bytes).map_err(|e| {
64 UnexpectedResultSnafu {
65 reason: format!("corrupted jsonb data: {bytes:?}, error: {e}"),
66 }
67 .build()
68 })?;
69 self.push(value);
70 Ok(())
71 } else {
72 self.write_value(bytes)
73 }
74 }
75
76 fn write_value(&mut self, value: impl Into<Value>) -> Result<()> {
77 let value = value.into();
78
79 let value = if let Some(f) = &self.value_transformer {
80 f(value)
81 } else {
82 value
83 };
84
85 let value = serde_json::Value::try_from(value).context(ToJsonSnafu)?;
86 self.push(value);
87 Ok(())
88 }
89
90 fn push(&mut self, value: serde_json::Value) {
91 let current = self
92 .current
93 .get_or_insert_with(|| Vec::with_capacity(self.columns));
94 current.push(value);
95 }
96
97 fn finish(&mut self) -> Vec<serde_json::Value> {
98 self.current.take().unwrap_or_default()
99 }
100
101 pub fn write(
102 &mut self,
103 record_batch: RecordBatch,
104 rows: &mut Vec<Vec<serde_json::Value>>,
105 ) -> Result<()> {
106 let schema = record_batch.schema.clone();
107 let record_batch = record_batch.into_df_record_batch();
108 for i in 0..record_batch.num_rows() {
109 for (schema, array) in schema
110 .column_schemas()
111 .iter()
112 .zip(record_batch.columns().iter())
113 {
114 if array.is_null(i) {
115 self.write_value(Value::Null)?;
116 continue;
117 }
118
119 match array.data_type() {
120 DataType::Null => {
121 self.write_value(Value::Null)?;
122 }
123 DataType::Boolean => {
124 let array = array.as_boolean();
125 let v = array.value(i);
126 self.write_value(v)?;
127 }
128 DataType::UInt8 => {
129 let array = array.as_primitive::<UInt8Type>();
130 let v = array.value(i);
131 self.write_value(v)?;
132 }
133 DataType::UInt16 => {
134 let array = array.as_primitive::<UInt16Type>();
135 let v = array.value(i);
136 self.write_value(v)?;
137 }
138 DataType::UInt32 => {
139 let array = array.as_primitive::<UInt32Type>();
140 let v = array.value(i);
141 self.write_value(v)?;
142 }
143 DataType::UInt64 => {
144 let array = array.as_primitive::<UInt64Type>();
145 let v = array.value(i);
146 self.write_value(v)?;
147 }
148 DataType::Int8 => {
149 let array = array.as_primitive::<Int8Type>();
150 let v = array.value(i);
151 self.write_value(v)?;
152 }
153 DataType::Int16 => {
154 let array = array.as_primitive::<Int16Type>();
155 let v = array.value(i);
156 self.write_value(v)?;
157 }
158 DataType::Int32 => {
159 let array = array.as_primitive::<Int32Type>();
160 let v = array.value(i);
161 self.write_value(v)?;
162 }
163 DataType::Int64 => {
164 let array = array.as_primitive::<Int64Type>();
165 let v = array.value(i);
166 self.write_value(v)?;
167 }
168 DataType::Float32 => {
169 let array = array.as_primitive::<Float32Type>();
170 let v = array.value(i);
171 self.write_value(v)?;
172 }
173 DataType::Float64 => {
174 let array = array.as_primitive::<Float64Type>();
175 let v = array.value(i);
176 self.write_value(v)?;
177 }
178 DataType::Utf8 => {
179 let array = array.as_string::<i32>();
180 let v = array.value(i);
181 self.write_value(v)?;
182 }
183 DataType::LargeUtf8 => {
184 let array = array.as_string::<i64>();
185 let v = array.value(i);
186 self.write_value(v)?;
187 }
188 DataType::Utf8View => {
189 let array = array.as_string_view();
190 let v = array.value(i);
191 self.write_value(v)?;
192 }
193 DataType::Binary => {
194 let array = array.as_binary::<i32>();
195 let v = array.value(i);
196 self.write_bytes(v, &schema.data_type)?;
197 }
198 DataType::LargeBinary => {
199 let array = array.as_binary::<i64>();
200 let v = array.value(i);
201 self.write_bytes(v, &schema.data_type)?;
202 }
203 DataType::BinaryView => {
204 let array = array.as_binary_view();
205 let v = array.value(i);
206 self.write_bytes(v, &schema.data_type)?;
207 }
208 DataType::Date32 => {
209 let array = array.as_primitive::<Date32Type>();
210 let v = Date::new(array.value(i));
211 self.write_value(v)?;
212 }
213 DataType::Date64 => {
214 let array = array.as_primitive::<Date64Type>();
215 let v = Date::new((array.value(i) / 86_400_000) as i32);
219 self.write_value(v)?;
220 }
221 DataType::Timestamp(_, _) => {
222 let ts = datatypes::arrow_array::timestamp_array_value(array, i);
223 self.write_value(ts)?;
224 }
225 DataType::Time32(_) | DataType::Time64(_) => {
226 let v = datatypes::arrow_array::time_array_value(array, i);
227 self.write_value(v)?;
228 }
229 DataType::Interval(interval_unit) => match interval_unit {
230 IntervalUnit::YearMonth => {
231 let array = array.as_primitive::<IntervalYearMonthType>();
232 let v: IntervalYearMonth = array.value(i).into();
233 self.write_value(v)?;
234 }
235 IntervalUnit::DayTime => {
236 let array = array.as_primitive::<IntervalDayTimeType>();
237 let v: IntervalDayTime = array.value(i).into();
238 self.write_value(v)?;
239 }
240 IntervalUnit::MonthDayNano => {
241 let array = array.as_primitive::<IntervalMonthDayNanoType>();
242 let v: IntervalMonthDayNano = array.value(i).into();
243 self.write_value(v)?;
244 }
245 },
246 DataType::Duration(_) => {
247 let d = datatypes::arrow_array::duration_array_value(array, i);
248 self.write_value(d)?;
249 }
250 DataType::List(_) => {
251 let v = ScalarValue::try_from_array(array, i).context(DataFusionSnafu)?;
252 let v: Value = v.try_into().context(ConvertScalarValueSnafu)?;
253 self.write_value(v)?;
254 }
255 DataType::Struct(_) => {
256 let v = ScalarValue::try_from_array(array, i).context(DataFusionSnafu)?;
257 let v: Value = v.try_into().context(ConvertScalarValueSnafu)?;
258 self.write_value(v)?;
259 }
260 DataType::Decimal128(precision, scale) => {
261 let array = array.as_primitive::<Decimal128Type>();
262 let v = Decimal128::new(array.value(i), *precision, *scale);
263 self.write_value(v)?;
264 }
265 _ => {
266 return NotSupportedSnafu {
267 feat: format!("convert {} to http output value", array.data_type()),
268 }
269 .fail();
270 }
271 }
272 }
273
274 rows.push(self.finish())
275 }
276 Ok(())
277 }
278}