1use std::any::Any;
16use std::sync::Arc;
17
18use arrow::array::{Array, ArrayBuilder, ArrayIter, ArrayRef};
19use snafu::ResultExt;
20
21use crate::arrow_array::{BinaryArray, MutableBinaryArray};
22use crate::data_type::ConcreteDataType;
23use crate::error::{self, InvalidVectorSnafu, Result};
24use crate::scalars::{ScalarVector, ScalarVectorBuilder};
25use crate::serialize::Serializable;
26use crate::types::parse_string_to_vector_type_value;
27use crate::value::{Value, ValueRef};
28use crate::vectors::{self, MutableVector, Validity, Vector, VectorRef};
29
30#[derive(Debug, PartialEq)]
32pub struct BinaryVector {
33 array: BinaryArray,
34}
35
36impl BinaryVector {
37 pub(crate) fn as_arrow(&self) -> &dyn Array {
38 &self.array
39 }
40
41 pub fn convert_binary_to_json(&self) -> Result<BinaryVector> {
44 let arrow_array = self.to_arrow_array();
45 let mut vector = vec![];
46 for binary in arrow_array
47 .as_any()
48 .downcast_ref::<BinaryArray>()
49 .unwrap()
50 .iter()
51 {
52 let jsonb = if let Some(binary) = binary {
53 match jsonb::from_slice(binary) {
54 Ok(jsonb) => Some(jsonb.to_vec()),
55 Err(_) => {
56 let s = String::from_utf8_lossy(binary);
57 return error::InvalidJsonSnafu {
58 value: s.to_string(),
59 }
60 .fail();
61 }
62 }
63 } else {
64 None
65 };
66 vector.push(jsonb);
67 }
68 Ok(BinaryVector::from(vector))
69 }
70
71 pub fn convert_binary_to_vector(&self, dim: u32) -> Result<BinaryVector> {
72 let arrow_array = self.to_arrow_array();
73 let mut vector = vec![];
74 for binary in arrow_array
75 .as_any()
76 .downcast_ref::<BinaryArray>()
77 .unwrap()
78 .iter()
79 {
80 let Some(binary) = binary else {
81 vector.push(None);
82 continue;
83 };
84
85 if let Ok(s) = String::from_utf8(binary.to_vec()) {
86 if let Ok(v) = parse_string_to_vector_type_value(&s, Some(dim)) {
87 vector.push(Some(v));
88 continue;
89 }
90 }
91
92 let expected_bytes_size = dim as usize * std::mem::size_of::<f32>();
93 if binary.len() == expected_bytes_size {
94 vector.push(Some(binary.to_vec()));
95 continue;
96 } else {
97 return InvalidVectorSnafu {
98 msg: format!(
99 "Unexpected bytes size for vector value, expected {}, got {}",
100 expected_bytes_size,
101 binary.len()
102 ),
103 }
104 .fail();
105 }
106 }
107 Ok(BinaryVector::from(vector))
108 }
109}
110
111impl From<BinaryArray> for BinaryVector {
112 fn from(array: BinaryArray) -> Self {
113 Self { array }
114 }
115}
116
117impl From<Vec<Option<Vec<u8>>>> for BinaryVector {
118 fn from(data: Vec<Option<Vec<u8>>>) -> Self {
119 Self {
120 array: BinaryArray::from_iter(data),
121 }
122 }
123}
124
125impl From<Vec<&[u8]>> for BinaryVector {
126 fn from(data: Vec<&[u8]>) -> Self {
127 Self {
128 array: BinaryArray::from_iter_values(data),
129 }
130 }
131}
132
133impl Vector for BinaryVector {
134 fn data_type(&self) -> ConcreteDataType {
135 ConcreteDataType::binary_datatype()
136 }
137
138 fn vector_type_name(&self) -> String {
139 "BinaryVector".to_string()
140 }
141
142 fn as_any(&self) -> &dyn Any {
143 self
144 }
145
146 fn len(&self) -> usize {
147 self.array.len()
148 }
149
150 fn to_arrow_array(&self) -> ArrayRef {
151 Arc::new(self.array.clone())
152 }
153
154 fn to_boxed_arrow_array(&self) -> Box<dyn Array> {
155 Box::new(self.array.clone())
156 }
157
158 fn validity(&self) -> Validity {
159 vectors::impl_validity_for_vector!(self.array)
160 }
161
162 fn memory_size(&self) -> usize {
163 self.array.get_buffer_memory_size()
164 }
165
166 fn null_count(&self) -> usize {
167 self.array.null_count()
168 }
169
170 fn is_null(&self, row: usize) -> bool {
171 self.array.is_null(row)
172 }
173
174 fn slice(&self, offset: usize, length: usize) -> VectorRef {
175 let array = self.array.slice(offset, length);
176 Arc::new(Self { array })
177 }
178
179 fn get(&self, index: usize) -> Value {
180 vectors::impl_get_for_vector!(self.array, index)
181 }
182
183 fn get_ref(&self, index: usize) -> ValueRef {
184 vectors::impl_get_ref_for_vector!(self.array, index)
185 }
186}
187
188impl From<Vec<Vec<u8>>> for BinaryVector {
189 fn from(data: Vec<Vec<u8>>) -> Self {
190 Self {
191 array: BinaryArray::from_iter_values(data),
192 }
193 }
194}
195
196impl ScalarVector for BinaryVector {
197 type OwnedItem = Vec<u8>;
198 type RefItem<'a> = &'a [u8];
199 type Iter<'a> = ArrayIter<&'a BinaryArray>;
200 type Builder = BinaryVectorBuilder;
201
202 fn get_data(&self, idx: usize) -> Option<Self::RefItem<'_>> {
203 if self.array.is_valid(idx) {
204 Some(self.array.value(idx))
205 } else {
206 None
207 }
208 }
209
210 fn iter_data(&self) -> Self::Iter<'_> {
211 self.array.iter()
212 }
213}
214
215pub struct BinaryVectorBuilder {
216 mutable_array: MutableBinaryArray,
217}
218
219impl MutableVector for BinaryVectorBuilder {
220 fn data_type(&self) -> ConcreteDataType {
221 ConcreteDataType::binary_datatype()
222 }
223
224 fn len(&self) -> usize {
225 self.mutable_array.len()
226 }
227
228 fn as_any(&self) -> &dyn Any {
229 self
230 }
231
232 fn as_mut_any(&mut self) -> &mut dyn Any {
233 self
234 }
235
236 fn to_vector(&mut self) -> VectorRef {
237 Arc::new(self.finish())
238 }
239
240 fn to_vector_cloned(&self) -> VectorRef {
241 Arc::new(self.finish_cloned())
242 }
243
244 fn try_push_value_ref(&mut self, value: ValueRef) -> Result<()> {
245 match value.as_binary()? {
246 Some(v) => self.mutable_array.append_value(v),
247 None => self.mutable_array.append_null(),
248 }
249 Ok(())
250 }
251
252 fn extend_slice_of(&mut self, vector: &dyn Vector, offset: usize, length: usize) -> Result<()> {
253 vectors::impl_extend_for_builder!(self, vector, BinaryVector, offset, length)
254 }
255
256 fn push_null(&mut self) {
257 self.mutable_array.append_null()
258 }
259}
260
261impl ScalarVectorBuilder for BinaryVectorBuilder {
262 type VectorType = BinaryVector;
263
264 fn with_capacity(capacity: usize) -> Self {
265 Self {
266 mutable_array: MutableBinaryArray::with_capacity(capacity, 0),
267 }
268 }
269
270 fn push(&mut self, value: Option<<Self::VectorType as ScalarVector>::RefItem<'_>>) {
271 match value {
272 Some(v) => self.mutable_array.append_value(v),
273 None => self.mutable_array.append_null(),
274 }
275 }
276
277 fn finish(&mut self) -> Self::VectorType {
278 BinaryVector {
279 array: self.mutable_array.finish(),
280 }
281 }
282
283 fn finish_cloned(&self) -> Self::VectorType {
284 BinaryVector {
285 array: self.mutable_array.finish_cloned(),
286 }
287 }
288}
289
290impl Serializable for BinaryVector {
291 fn serialize_to_json(&self) -> Result<Vec<serde_json::Value>> {
292 self.iter_data()
293 .map(|v| match v {
294 None => Ok(serde_json::Value::Null), Some(vec) => serde_json::to_value(vec),
296 })
297 .collect::<serde_json::Result<_>>()
298 .context(error::SerializeSnafu)
299 }
300}
301
302vectors::impl_try_from_arrow_array_for_vector!(BinaryArray, BinaryVector);
303
304#[cfg(test)]
305mod tests {
306 use std::assert_matches::assert_matches;
307
308 use arrow::datatypes::DataType as ArrowDataType;
309 use common_base::bytes::Bytes;
310 use serde_json;
311
312 use super::*;
313 use crate::arrow_array::BinaryArray;
314 use crate::data_type::DataType;
315 use crate::serialize::Serializable;
316 use crate::types::BinaryType;
317
318 #[test]
319 fn test_binary_vector_misc() {
320 let v = BinaryVector::from(BinaryArray::from_iter_values([
321 vec![1, 2, 3],
322 vec![1, 2, 3],
323 ]));
324
325 assert_eq!(2, v.len());
326 assert_eq!("BinaryVector", v.vector_type_name());
327 assert!(!v.is_const());
328 assert!(v.validity().is_all_valid());
329 assert!(!v.only_null());
330 assert_eq!(128, v.memory_size());
331
332 for i in 0..2 {
333 assert!(!v.is_null(i));
334 assert_eq!(Value::Binary(Bytes::from(vec![1, 2, 3])), v.get(i));
335 assert_eq!(ValueRef::Binary(&[1, 2, 3]), v.get_ref(i));
336 }
337
338 let arrow_arr = v.to_arrow_array();
339 assert_eq!(2, arrow_arr.len());
340 assert_eq!(&ArrowDataType::Binary, arrow_arr.data_type());
341 }
342
343 #[test]
344 fn test_serialize_binary_vector_to_json() {
345 let vector = BinaryVector::from(BinaryArray::from_iter_values([
346 vec![1, 2, 3],
347 vec![1, 2, 3],
348 ]));
349
350 let json_value = vector.serialize_to_json().unwrap();
351 assert_eq!(
352 "[[1,2,3],[1,2,3]]",
353 serde_json::to_string(&json_value).unwrap()
354 );
355 }
356
357 #[test]
358 fn test_serialize_binary_vector_with_null_to_json() {
359 let mut builder = BinaryVectorBuilder::with_capacity(4);
360 builder.push(Some(&[1, 2, 3]));
361 builder.push(None);
362 builder.push(Some(&[4, 5, 6]));
363 let vector = builder.finish();
364
365 let json_value = vector.serialize_to_json().unwrap();
366 assert_eq!(
367 "[[1,2,3],null,[4,5,6]]",
368 serde_json::to_string(&json_value).unwrap()
369 );
370 }
371
372 #[test]
373 fn test_from_arrow_array() {
374 let arrow_array = BinaryArray::from_iter_values([vec![1, 2, 3], vec![1, 2, 3]]);
375 let original = BinaryArray::from(arrow_array.to_data());
376 let vector = BinaryVector::from(arrow_array);
377 assert_eq!(original, vector.array);
378 }
379
380 #[test]
381 fn test_binary_vector_build_get() {
382 let mut builder = BinaryVectorBuilder::with_capacity(4);
383 builder.push(Some(b"hello"));
384 builder.push(Some(b"happy"));
385 builder.push(Some(b"world"));
386 builder.push(None);
387
388 let vector = builder.finish();
389 assert_eq!(b"hello", vector.get_data(0).unwrap());
390 assert_eq!(None, vector.get_data(3));
391
392 assert_eq!(Value::Binary(b"hello".as_slice().into()), vector.get(0));
393 assert_eq!(Value::Null, vector.get(3));
394
395 let mut iter = vector.iter_data();
396 assert_eq!(b"hello", iter.next().unwrap().unwrap());
397 assert_eq!(b"happy", iter.next().unwrap().unwrap());
398 assert_eq!(b"world", iter.next().unwrap().unwrap());
399 assert_eq!(None, iter.next().unwrap());
400 assert_eq!(None, iter.next());
401 }
402
403 #[test]
404 fn test_binary_vector_validity() {
405 let mut builder = BinaryVectorBuilder::with_capacity(4);
406 builder.push(Some(b"hello"));
407 builder.push(Some(b"world"));
408 let vector = builder.finish();
409 assert_eq!(0, vector.null_count());
410 assert!(vector.validity().is_all_valid());
411
412 let mut builder = BinaryVectorBuilder::with_capacity(3);
413 builder.push(Some(b"hello"));
414 builder.push(None);
415 builder.push(Some(b"world"));
416 let vector = builder.finish();
417 assert_eq!(1, vector.null_count());
418 let validity = vector.validity();
419 assert!(!validity.is_set(1));
420
421 assert_eq!(1, validity.null_count());
422 assert!(!validity.is_set(1));
423 }
424
425 #[test]
426 fn test_binary_vector_builder() {
427 let input = BinaryVector::from_slice(&[b"world", b"one", b"two"]);
428
429 let mut builder = BinaryType.create_mutable_vector(3);
430 builder.push_value_ref(ValueRef::Binary("hello".as_bytes()));
431 assert!(builder.try_push_value_ref(ValueRef::Int32(123)).is_err());
432 builder.extend_slice_of(&input, 1, 2).unwrap();
433 assert!(builder
434 .extend_slice_of(&crate::vectors::Int32Vector::from_slice([13]), 0, 1)
435 .is_err());
436 let vector = builder.to_vector();
437
438 let expect: VectorRef = Arc::new(BinaryVector::from_slice(&[b"hello", b"one", b"two"]));
439 assert_eq!(expect, vector);
440 }
441
442 #[test]
443 fn test_binary_vector_builder_finish_cloned() {
444 let mut builder = BinaryVectorBuilder::with_capacity(1024);
445 builder.push(Some(b"one"));
446 builder.push(Some(b"two"));
447 builder.push(Some(b"three"));
448 let vector = builder.finish_cloned();
449 assert_eq!(b"one", vector.get_data(0).unwrap());
450 assert_eq!(vector.len(), 3);
451 assert_eq!(builder.len(), 3);
452
453 builder.push(Some(b"four"));
454 let vector = builder.finish_cloned();
455 assert_eq!(b"four", vector.get_data(3).unwrap());
456 assert_eq!(builder.len(), 4);
457 }
458
459 #[test]
460 fn test_binary_json_conversion() {
461 let json_strings = vec![
463 b"{\"hello\": \"world\"}".to_vec(),
464 b"{\"foo\": 1}".to_vec(),
465 b"123".to_vec(),
466 ];
467 let json_vector = BinaryVector::from(json_strings.clone())
468 .convert_binary_to_json()
469 .unwrap();
470 let jsonbs = json_strings
471 .iter()
472 .map(|v| jsonb::parse_value(v).unwrap().to_vec())
473 .collect::<Vec<_>>();
474 for i in 0..3 {
475 assert_eq!(
476 json_vector.get_ref(i).as_binary().unwrap().unwrap(),
477 jsonbs.get(i).unwrap().as_slice()
478 );
479 }
480
481 let json_vector = BinaryVector::from(jsonbs.clone())
483 .convert_binary_to_json()
484 .unwrap();
485 for i in 0..3 {
486 assert_eq!(
487 json_vector.get_ref(i).as_binary().unwrap().unwrap(),
488 jsonbs.get(i).unwrap().as_slice()
489 );
490 }
491
492 let binary_with_jsonb_header: Vec<u8> = [0x80, 0x23, 0x40, 0x22].to_vec();
494 let error = BinaryVector::from(vec![binary_with_jsonb_header])
495 .convert_binary_to_json()
496 .unwrap_err();
497 assert_matches!(error, error::Error::InvalidJson { .. });
498
499 let json_strings = vec![b"{\"hello\": \"world\"".to_vec()];
501 let error = BinaryVector::from(json_strings)
502 .convert_binary_to_json()
503 .unwrap_err();
504 assert_matches!(error, error::Error::InvalidJson { .. });
505
506 let jsonb = jsonb::parse_value("{\"hello\": \"world\"}".as_bytes())
508 .unwrap()
509 .to_vec();
510 let corrupted_jsonb = jsonb[0..jsonb.len() - 1].to_vec();
511 let error = BinaryVector::from(vec![corrupted_jsonb])
512 .convert_binary_to_json()
513 .unwrap_err();
514 assert_matches!(error, error::Error::InvalidJson { .. });
515 }
516
517 #[test]
518 fn test_binary_vector_conversion() {
519 let dim = 3;
520 let vector = BinaryVector::from(vec![
521 Some(b"[1,2,3]".to_vec()),
522 Some(b"[4,5,6]".to_vec()),
523 Some(b"[7,8,9]".to_vec()),
524 None,
525 ]);
526 let expected = BinaryVector::from(vec![
527 Some(
528 [1.0f32, 2.0, 3.0]
529 .iter()
530 .flat_map(|v| v.to_le_bytes())
531 .collect(),
532 ),
533 Some(
534 [4.0f32, 5.0, 6.0]
535 .iter()
536 .flat_map(|v| v.to_le_bytes())
537 .collect(),
538 ),
539 Some(
540 [7.0f32, 8.0, 9.0]
541 .iter()
542 .flat_map(|v| v.to_le_bytes())
543 .collect(),
544 ),
545 None,
546 ]);
547
548 let converted = vector.convert_binary_to_vector(dim).unwrap();
549 assert_eq!(converted.len(), expected.len());
550 for i in 0..3 {
551 assert_eq!(
552 converted.get_ref(i).as_binary().unwrap().unwrap(),
553 expected.get_ref(i).as_binary().unwrap().unwrap()
554 );
555 }
556 }
557}