1use arrow::array::AsArray;
16use arrow::datatypes::{
17 Date32Type, Date64Type, Decimal128Type, Float32Type, Float64Type, Int8Type, Int16Type,
18 Int32Type, Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType,
19 UInt8Type, UInt16Type, UInt32Type, UInt64Type,
20};
21use arrow_schema::{DataType, IntervalUnit};
22use common_decimal::Decimal128;
23use common_recordbatch::RecordBatch;
24use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
25use datafusion_common::ScalarValue;
26use datatypes::data_type::ConcreteDataType;
27use datatypes::value::Value;
28use snafu::ResultExt;
29
30use crate::error::{
31 ConvertScalarValueSnafu, DataFusionSnafu, NotSupportedSnafu, Result, ToJsonSnafu,
32 UnexpectedResultSnafu,
33};
34
35pub(crate) mod arrow_result;
36pub(crate) mod csv_result;
37pub mod error_result;
38pub(crate) mod greptime_manage_resp;
39pub mod greptime_result_v1;
40pub mod influxdb_result_v1;
41pub(crate) mod json_result;
42pub(crate) mod null_result;
43pub(crate) mod prometheus_resp;
44pub(crate) mod table_result;
45
46pub struct HttpOutputWriter {
47 columns: usize,
48 value_transformer: Option<Box<dyn Fn(Value) -> Value>>,
49 current: Option<Vec<serde_json::Value>>,
50}
51
52impl HttpOutputWriter {
53 pub fn new(columns: usize, value_transformer: Option<Box<dyn Fn(Value) -> Value>>) -> Self {
54 Self {
55 columns,
56 value_transformer,
57 current: None,
58 }
59 }
60
61 fn write_bytes(&mut self, bytes: &[u8], datatype: &ConcreteDataType) -> Result<()> {
62 if datatype.is_json() {
63 let value = datatypes::types::jsonb_to_serde_json(bytes).map_err(|e| {
64 UnexpectedResultSnafu {
65 reason: format!("corrupted jsonb data: {bytes:?}, error: {e}"),
66 }
67 .build()
68 })?;
69 self.push(value);
70 Ok(())
71 } else {
72 self.write_value(bytes)
73 }
74 }
75
76 fn write_value(&mut self, value: impl Into<Value>) -> Result<()> {
77 let value = value.into();
78
79 let value = if let Some(f) = &self.value_transformer {
80 f(value)
81 } else {
82 value
83 };
84
85 let value = serde_json::Value::try_from(value).context(ToJsonSnafu)?;
86 self.push(value);
87 Ok(())
88 }
89
90 fn push(&mut self, value: serde_json::Value) {
91 let current = self
92 .current
93 .get_or_insert_with(|| Vec::with_capacity(self.columns));
94 current.push(value);
95 }
96
97 fn finish(&mut self) -> Vec<serde_json::Value> {
98 self.current.take().unwrap_or_default()
99 }
100
101 pub fn write(
102 &mut self,
103 record_batch: RecordBatch,
104 rows: &mut Vec<Vec<serde_json::Value>>,
105 ) -> Result<()> {
106 let schema = record_batch.schema.clone();
107 let record_batch = record_batch.into_df_record_batch();
108 for i in 0..record_batch.num_rows() {
109 for (schema, array) in schema
110 .column_schemas()
111 .iter()
112 .zip(record_batch.columns().iter())
113 {
114 if array.is_null(i) {
115 self.write_value(Value::Null)?;
116 continue;
117 }
118
119 match array.data_type() {
120 DataType::Null => {
121 self.write_value(Value::Null)?;
122 }
123 DataType::Boolean => {
124 let array = array.as_boolean();
125 let v = array.value(i);
126 self.write_value(v)?;
127 }
128 DataType::UInt8 => {
129 let array = array.as_primitive::<UInt8Type>();
130 let v = array.value(i);
131 self.write_value(v)?;
132 }
133 DataType::UInt16 => {
134 let array = array.as_primitive::<UInt16Type>();
135 let v = array.value(i);
136 self.write_value(v)?;
137 }
138 DataType::UInt32 => {
139 let array = array.as_primitive::<UInt32Type>();
140 let v = array.value(i);
141 self.write_value(v)?;
142 }
143 DataType::UInt64 => {
144 let array = array.as_primitive::<UInt64Type>();
145 let v = array.value(i);
146 self.write_value(v)?;
147 }
148 DataType::Int8 => {
149 let array = array.as_primitive::<Int8Type>();
150 let v = array.value(i);
151 self.write_value(v)?;
152 }
153 DataType::Int16 => {
154 let array = array.as_primitive::<Int16Type>();
155 let v = array.value(i);
156 self.write_value(v)?;
157 }
158 DataType::Int32 => {
159 let array = array.as_primitive::<Int32Type>();
160 let v = array.value(i);
161 self.write_value(v)?;
162 }
163 DataType::Int64 => {
164 let array = array.as_primitive::<Int64Type>();
165 let v = array.value(i);
166 self.write_value(v)?;
167 }
168 DataType::Float32 => {
169 let array = array.as_primitive::<Float32Type>();
170 let v = array.value(i);
171 self.write_value(v)?;
172 }
173 DataType::Float64 => {
174 let array = array.as_primitive::<Float64Type>();
175 let v = array.value(i);
176 self.write_value(v)?;
177 }
178 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
179 let v = datatypes::arrow_array::string_array_value(array, i);
180 self.write_value(v)?;
181 }
182 DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
183 let v = datatypes::arrow_array::binary_array_value(array, i);
184 self.write_bytes(v, &schema.data_type)?;
185 }
186 DataType::Date32 => {
187 let array = array.as_primitive::<Date32Type>();
188 let v = Date::new(array.value(i));
189 self.write_value(v)?;
190 }
191 DataType::Date64 => {
192 let array = array.as_primitive::<Date64Type>();
193 let v = Date::new((array.value(i) / 86_400_000) as i32);
197 self.write_value(v)?;
198 }
199 DataType::Timestamp(_, _) => {
200 let ts = datatypes::arrow_array::timestamp_array_value(array, i);
201 self.write_value(ts)?;
202 }
203 DataType::Time32(_) | DataType::Time64(_) => {
204 let v = datatypes::arrow_array::time_array_value(array, i);
205 self.write_value(v)?;
206 }
207 DataType::Interval(interval_unit) => match interval_unit {
208 IntervalUnit::YearMonth => {
209 let array = array.as_primitive::<IntervalYearMonthType>();
210 let v: IntervalYearMonth = array.value(i).into();
211 self.write_value(v)?;
212 }
213 IntervalUnit::DayTime => {
214 let array = array.as_primitive::<IntervalDayTimeType>();
215 let v: IntervalDayTime = array.value(i).into();
216 self.write_value(v)?;
217 }
218 IntervalUnit::MonthDayNano => {
219 let array = array.as_primitive::<IntervalMonthDayNanoType>();
220 let v: IntervalMonthDayNano = array.value(i).into();
221 self.write_value(v)?;
222 }
223 },
224 DataType::Duration(_) => {
225 let d = datatypes::arrow_array::duration_array_value(array, i);
226 self.write_value(d)?;
227 }
228 DataType::List(_) => {
229 let v = ScalarValue::try_from_array(array, i).context(DataFusionSnafu)?;
230 let v: Value = v.try_into().context(ConvertScalarValueSnafu)?;
231 self.write_value(v)?;
232 }
233 DataType::Struct(_) => {
234 let v = ScalarValue::try_from_array(array, i).context(DataFusionSnafu)?;
235 let v: Value = v.try_into().context(ConvertScalarValueSnafu)?;
236 self.write_value(v)?;
237 }
238 DataType::Decimal128(precision, scale) => {
239 let array = array.as_primitive::<Decimal128Type>();
240 let v = Decimal128::new(array.value(i), *precision, *scale);
241 self.write_value(v)?;
242 }
243 _ => {
244 return NotSupportedSnafu {
245 feat: format!("convert {} to http output value", array.data_type()),
246 }
247 .fail();
248 }
249 }
250 }
251
252 rows.push(self.finish())
253 }
254 Ok(())
255 }
256}