1use api::helper::{convert_month_day_nano_to_pb, convert_to_pb_decimal128};
16use api::v1::column::Values;
17use common_base::BitVec;
18use datatypes::types::{IntervalType, TimeType, TimestampType, WrapperType};
19use datatypes::vectors::{
20 BinaryVector, BooleanVector, DateVector, Decimal128Vector, Float32Vector, Float64Vector,
21 Int16Vector, Int32Vector, Int64Vector, Int8Vector, IntervalDayTimeVector,
22 IntervalMonthDayNanoVector, IntervalYearMonthVector, StringVector, TimeMicrosecondVector,
23 TimeMillisecondVector, TimeNanosecondVector, TimeSecondVector, TimestampMicrosecondVector,
24 TimestampMillisecondVector, TimestampNanosecondVector, TimestampSecondVector, UInt16Vector,
25 UInt32Vector, UInt64Vector, UInt8Vector, VectorRef,
26};
27use snafu::OptionExt;
28
29use crate::error::{ConversionSnafu, Result};
30
31pub fn null_mask(arrays: &[VectorRef], row_count: usize) -> Vec<u8> {
32 let null_count: usize = arrays.iter().map(|a| a.null_count()).sum();
33
34 if null_count == 0 {
35 return Vec::default();
36 }
37
38 let mut null_mask = BitVec::with_capacity(row_count);
39 for array in arrays {
40 let validity = array.validity();
41 if validity.is_all_valid() {
42 null_mask.extend_from_bitslice(&BitVec::repeat(false, array.len()));
43 } else {
44 for i in 0..array.len() {
45 null_mask.push(!validity.is_set(i));
46 }
47 }
48 }
49 null_mask.into_vec()
50}
51
52macro_rules! convert_arrow_array_to_grpc_vals {
53 ($data_type: expr, $arrays: ident, $(($Type: pat, $CastType: ty, $field: ident, $MapFunction: expr)), +) => {{
54 use datatypes::data_type::{ConcreteDataType};
55 use datatypes::prelude::ScalarVector;
56
57 match $data_type {
58 $(
59 $Type => {
60 let mut vals = Values::default();
61 for array in $arrays {
62 let array = array.as_any().downcast_ref::<$CastType>().with_context(|| ConversionSnafu {
63 from: format!("{:?}", $data_type),
64 })?;
65 vals.$field.extend(array
66 .iter_data()
67 .filter_map(|i| i.map($MapFunction))
68 .collect::<Vec<_>>());
69 }
70 return Ok(vals);
71 },
72 )+
73 ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) | ConcreteDataType::Duration(_) | ConcreteDataType::Json(_) => unreachable!("Should not send {:?} in gRPC", $data_type),
74 }
75 }};
76}
77
78pub fn values(arrays: &[VectorRef]) -> Result<Values> {
79 if arrays.is_empty() {
80 return Ok(Values::default());
81 }
82 let data_type = arrays[0].data_type();
83
84 convert_arrow_array_to_grpc_vals!(
85 data_type,
86 arrays,
87 (
88 ConcreteDataType::Boolean(_),
89 BooleanVector,
90 bool_values,
91 |x| { x }
92 ),
93 (ConcreteDataType::Int8(_), Int8Vector, i8_values, |x| {
94 i32::from(x)
95 }),
96 (ConcreteDataType::Int16(_), Int16Vector, i16_values, |x| {
97 i32::from(x)
98 }),
99 (ConcreteDataType::Int32(_), Int32Vector, i32_values, |x| {
100 x
101 }),
102 (ConcreteDataType::Int64(_), Int64Vector, i64_values, |x| {
103 x
104 }),
105 (ConcreteDataType::UInt8(_), UInt8Vector, u8_values, |x| {
106 u32::from(x)
107 }),
108 (ConcreteDataType::UInt16(_), UInt16Vector, u16_values, |x| {
109 u32::from(x)
110 }),
111 (ConcreteDataType::UInt32(_), UInt32Vector, u32_values, |x| {
112 x
113 }),
114 (ConcreteDataType::UInt64(_), UInt64Vector, u64_values, |x| {
115 x
116 }),
117 (
118 ConcreteDataType::Float32(_),
119 Float32Vector,
120 f32_values,
121 |x| { x }
122 ),
123 (
124 ConcreteDataType::Float64(_),
125 Float64Vector,
126 f64_values,
127 |x| { x }
128 ),
129 (
130 ConcreteDataType::Binary(_),
131 BinaryVector,
132 binary_values,
133 |x| { x.into() }
134 ),
135 (
136 ConcreteDataType::String(_),
137 StringVector,
138 string_values,
139 |x| { x.into() }
140 ),
141 (ConcreteDataType::Date(_), DateVector, date_values, |x| {
142 x.val()
143 }),
144 (
145 ConcreteDataType::Timestamp(TimestampType::Second(_)),
146 TimestampSecondVector,
147 timestamp_second_values,
148 |x| { x.into_native() }
149 ),
150 (
151 ConcreteDataType::Timestamp(TimestampType::Millisecond(_)),
152 TimestampMillisecondVector,
153 timestamp_millisecond_values,
154 |x| { x.into_native() }
155 ),
156 (
157 ConcreteDataType::Timestamp(TimestampType::Microsecond(_)),
158 TimestampMicrosecondVector,
159 timestamp_microsecond_values,
160 |x| { x.into_native() }
161 ),
162 (
163 ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)),
164 TimestampNanosecondVector,
165 timestamp_nanosecond_values,
166 |x| { x.into_native() }
167 ),
168 (
169 ConcreteDataType::Time(TimeType::Second(_)),
170 TimeSecondVector,
171 time_second_values,
172 |x| { x.into_native() as i64 }
173 ),
174 (
175 ConcreteDataType::Time(TimeType::Millisecond(_)),
176 TimeMillisecondVector,
177 time_millisecond_values,
178 |x| { x.into_native() as i64 }
179 ),
180 (
181 ConcreteDataType::Time(TimeType::Microsecond(_)),
182 TimeMicrosecondVector,
183 time_microsecond_values,
184 |x| { x.into_native() }
185 ),
186 (
187 ConcreteDataType::Time(TimeType::Nanosecond(_)),
188 TimeNanosecondVector,
189 time_nanosecond_values,
190 |x| { x.into_native() }
191 ),
192 (
193 ConcreteDataType::Interval(IntervalType::YearMonth(_)),
194 IntervalYearMonthVector,
195 interval_year_month_values,
196 |x| { x.into_native() }
197 ),
198 (
199 ConcreteDataType::Interval(IntervalType::DayTime(_)),
200 IntervalDayTimeVector,
201 interval_day_time_values,
202 |x| { x.to_i64() }
203 ),
204 (
205 ConcreteDataType::Interval(IntervalType::MonthDayNano(_)),
206 IntervalMonthDayNanoVector,
207 interval_month_day_nano_values,
208 |x| { convert_month_day_nano_to_pb(x) }
209 ),
210 (
211 ConcreteDataType::Decimal128(_),
212 Decimal128Vector,
213 decimal128_values,
214 |x| { convert_to_pb_decimal128(x) }
215 ),
216 (
217 ConcreteDataType::Vector(_),
218 BinaryVector,
219 binary_values,
220 |x| { x.into() }
221 )
222 )
223}
224
225#[cfg(test)]
226mod tests {
227 use std::sync::Arc;
228
229 use datatypes::arrow::datatypes::{IntervalDayTime, IntervalMonthDayNano};
230
231 use super::*;
232
233 #[test]
234 fn test_convert_arrow_arrays_i32() {
235 let array = Int32Vector::from(vec![Some(1), Some(2), None, Some(3)]);
236 let array: VectorRef = Arc::new(array);
237
238 let values = values(&[array]).unwrap();
239
240 assert_eq!(vec![1, 2, 3], values.i32_values);
241 }
242
243 #[test]
244 fn test_convert_arrow_array_time_second() {
245 let array = TimeSecondVector::from(vec![Some(1), Some(2), None, Some(3)]);
246 let array: VectorRef = Arc::new(array);
247
248 let values = values(&[array]).unwrap();
249
250 assert_eq!(vec![1, 2, 3], values.time_second_values);
251 }
252
253 #[test]
254 fn test_convert_arrow_array_interval_year_month() {
255 let array = IntervalYearMonthVector::from(vec![Some(1), Some(2), None, Some(3)]);
256 let array: VectorRef = Arc::new(array);
257
258 let values = values(&[array]).unwrap();
259
260 assert_eq!(vec![1, 2, 3], values.interval_year_month_values);
261 }
262
263 #[test]
264 fn test_convert_arrow_array_interval_day_time() {
265 let array = IntervalDayTimeVector::from(vec![
266 Some(IntervalDayTime::new(0, 1)),
267 Some(IntervalDayTime::new(0, 2)),
268 None,
269 Some(IntervalDayTime::new(0, 3)),
270 ]);
271 let array: VectorRef = Arc::new(array);
272
273 let values = values(&[array]).unwrap();
274
275 assert_eq!(vec![1, 2, 3], values.interval_day_time_values);
276 }
277
278 #[test]
279 fn test_convert_arrow_array_interval_month_day_nano() {
280 let array = IntervalMonthDayNanoVector::from(vec![
281 Some(IntervalMonthDayNano::new(0, 0, 1)),
282 Some(IntervalMonthDayNano::new(0, 0, 2)),
283 None,
284 Some(IntervalMonthDayNano::new(0, 0, 3)),
285 ]);
286 let array: VectorRef = Arc::new(array);
287
288 let values = values(&[array]).unwrap();
289
290 (0..3).for_each(|i| {
291 assert_eq!(values.interval_month_day_nano_values[i].months, 0);
292 assert_eq!(values.interval_month_day_nano_values[i].days, 0);
293 assert_eq!(
294 values.interval_month_day_nano_values[i].nanoseconds,
295 i as i64 + 1
296 );
297 })
298 }
299
300 #[test]
301 fn test_convert_arrow_array_decimal128() {
302 let array = Decimal128Vector::from(vec![Some(1), Some(2), None, Some(3)]);
303
304 let vals = values(&[Arc::new(array)]).unwrap();
305 (0..3).for_each(|i| {
306 assert_eq!(vals.decimal128_values[i].hi, 0);
307 assert_eq!(vals.decimal128_values[i].lo, i as i64 + 1);
308 });
309 }
310
311 #[test]
312 fn test_convert_arrow_arrays_string() {
313 let array = StringVector::from(vec![
314 Some("1".to_string()),
315 Some("2".to_string()),
316 None,
317 Some("3".to_string()),
318 None,
319 ]);
320 let array: VectorRef = Arc::new(array);
321
322 let values = values(&[array]).unwrap();
323
324 assert_eq!(vec!["1", "2", "3"], values.string_values);
325 }
326
327 #[test]
328 fn test_convert_arrow_arrays_bool() {
329 let array = BooleanVector::from(vec![Some(true), Some(false), None, Some(false), None]);
330 let array: VectorRef = Arc::new(array);
331
332 let values = values(&[array]).unwrap();
333
334 assert_eq!(vec![true, false, false], values.bool_values);
335 }
336
337 #[test]
338 fn test_convert_arrow_arrays_empty() {
339 let array = BooleanVector::from(vec![None, None, None, None, None]);
340 let array: VectorRef = Arc::new(array);
341
342 let values = values(&[array]).unwrap();
343
344 assert!(values.bool_values.is_empty());
345 }
346
347 #[test]
348 fn test_null_mask() {
349 let a1: VectorRef = Arc::new(Int32Vector::from(vec![None, Some(2), None]));
350 let a2: VectorRef = Arc::new(Int32Vector::from(vec![Some(1), Some(2), None, Some(4)]));
351 let mask = null_mask(&[a1, a2], 3 + 4);
352 assert_eq!(vec![0b0010_0101], mask);
353
354 let empty: VectorRef = Arc::new(Int32Vector::from(vec![None, None, None]));
355 let mask = null_mask(&[empty.clone(), empty.clone(), empty], 9);
356 assert_eq!(vec![0b1111_1111, 0b0000_0001], mask);
357
358 let a1: VectorRef = Arc::new(Int32Vector::from(vec![Some(1), Some(2), Some(3)]));
359 let a2: VectorRef = Arc::new(Int32Vector::from(vec![Some(4), Some(5), Some(6)]));
360 let mask = null_mask(&[a1, a2], 3 + 3);
361 assert_eq!(Vec::<u8>::default(), mask);
362
363 let a1: VectorRef = Arc::new(Int32Vector::from(vec![Some(1), Some(2), Some(3)]));
364 let a2: VectorRef = Arc::new(Int32Vector::from(vec![Some(4), Some(5), None]));
365 let mask = null_mask(&[a1, a2], 3 + 3);
366 assert_eq!(vec![0b0010_0000], mask);
367 }
368}