1use std::any::Any;
16use std::sync::Arc;
17
18use arrow::array::Array;
19use arrow::datatypes::Int64Type;
20use arrow_array::{ArrayRef, DictionaryArray, Int64Array};
21use serde_json::Value as JsonValue;
22use snafu::ResultExt;
23
24use crate::data_type::ConcreteDataType;
25use crate::error::{self, Result};
26use crate::serialize::Serializable;
27use crate::types::DictionaryType;
28use crate::value::{Value, ValueRef};
29use crate::vectors::operations::VectorOp;
30use crate::vectors::{self, Helper, Validity, Vector, VectorRef};
31
32#[derive(Debug, PartialEq)]
34pub struct DictionaryVector {
35 array: DictionaryArray<Int64Type>,
36 item_type: ConcreteDataType,
38 item_vector: VectorRef,
40}
41
42impl DictionaryVector {
43 pub fn new(array: DictionaryArray<Int64Type>, item_type: ConcreteDataType) -> Result<Self> {
45 let item_vector = Helper::try_into_vector(array.values())?;
46
47 Ok(Self {
48 array,
49 item_type,
50 item_vector,
51 })
52 }
53
54 pub fn array(&self) -> &DictionaryArray<Int64Type> {
56 &self.array
57 }
58
59 pub fn keys(&self) -> &arrow_array::PrimitiveArray<Int64Type> {
61 self.array.keys()
62 }
63
64 pub fn values(&self) -> &ArrayRef {
66 self.array.values()
67 }
68
69 pub fn as_arrow(&self) -> &dyn Array {
70 &self.array
71 }
72}
73
74impl Vector for DictionaryVector {
75 fn data_type(&self) -> ConcreteDataType {
76 ConcreteDataType::Dictionary(DictionaryType::new(
77 ConcreteDataType::int64_datatype(),
78 self.item_type.clone(),
79 ))
80 }
81
82 fn vector_type_name(&self) -> String {
83 "DictionaryVector".to_string()
84 }
85
86 fn as_any(&self) -> &dyn Any {
87 self
88 }
89
90 fn len(&self) -> usize {
91 self.array.len()
92 }
93
94 fn to_arrow_array(&self) -> ArrayRef {
95 Arc::new(self.array.clone())
96 }
97
98 fn to_boxed_arrow_array(&self) -> Box<dyn Array> {
99 Box::new(self.array.clone())
100 }
101
102 fn validity(&self) -> Validity {
103 vectors::impl_validity_for_vector!(self.array)
104 }
105
106 fn memory_size(&self) -> usize {
107 self.array.get_buffer_memory_size()
108 }
109
110 fn null_count(&self) -> usize {
111 self.array.null_count()
112 }
113
114 fn is_null(&self, row: usize) -> bool {
115 self.array.is_null(row)
116 }
117
118 fn slice(&self, offset: usize, length: usize) -> VectorRef {
119 Arc::new(Self {
120 array: self.array.slice(offset, length),
121 item_type: self.item_type.clone(),
122 item_vector: self.item_vector.clone(),
123 })
124 }
125
126 fn get(&self, index: usize) -> Value {
127 if !self.array.is_valid(index) {
128 return Value::Null;
129 }
130
131 let key = self.array.keys().value(index);
132 self.item_vector.get(key as usize)
133 }
134
135 fn get_ref(&self, index: usize) -> ValueRef {
136 if !self.array.is_valid(index) {
137 return ValueRef::Null;
138 }
139
140 let key = self.array.keys().value(index);
141 self.item_vector.get_ref(key as usize)
142 }
143}
144
145impl Serializable for DictionaryVector {
146 fn serialize_to_json(&self) -> Result<Vec<JsonValue>> {
147 let mut result = Vec::with_capacity(self.len());
150
151 for i in 0..self.len() {
152 if self.is_null(i) {
153 result.push(JsonValue::Null);
154 } else {
155 let key = self.array.keys().value(i);
156 let value = self.item_vector.get(key as usize);
157 let json_value = serde_json::to_value(value).context(error::SerializeSnafu)?;
158 result.push(json_value);
159 }
160 }
161
162 Ok(result)
163 }
164}
165
166impl TryFrom<DictionaryArray<Int64Type>> for DictionaryVector {
167 type Error = crate::error::Error;
168
169 fn try_from(array: DictionaryArray<Int64Type>) -> Result<Self> {
170 let item_type = ConcreteDataType::from_arrow_type(array.values().data_type());
171 let item_vector = Helper::try_into_vector(array.values())?;
172
173 Ok(Self {
174 array,
175 item_type,
176 item_vector,
177 })
178 }
179}
180
181pub struct DictionaryIter<'a> {
182 vector: &'a DictionaryVector,
183 idx: usize,
184}
185
186impl<'a> DictionaryIter<'a> {
187 pub fn new(vector: &'a DictionaryVector) -> DictionaryIter<'a> {
188 DictionaryIter { vector, idx: 0 }
189 }
190}
191
192impl<'a> Iterator for DictionaryIter<'a> {
193 type Item = Option<ValueRef<'a>>;
194
195 #[inline]
196 fn next(&mut self) -> Option<Self::Item> {
197 if self.idx >= self.vector.len() {
198 return None;
199 }
200
201 let idx = self.idx;
202 self.idx += 1;
203
204 if self.vector.is_null(idx) {
205 return Some(None);
206 }
207
208 Some(Some(self.vector.item_vector.get_ref(self.idx)))
209 }
210
211 #[inline]
212 fn size_hint(&self) -> (usize, Option<usize>) {
213 (
214 self.vector.len() - self.idx,
215 Some(self.vector.len() - self.idx),
216 )
217 }
218}
219
220impl VectorOp for DictionaryVector {
221 fn replicate(&self, offsets: &[usize]) -> VectorRef {
222 let keys = self.array.keys();
223 let mut replicated_keys = Vec::with_capacity(offsets.len());
224
225 let mut previous_offset = 0;
226 for (i, &offset) in offsets.iter().enumerate() {
227 let key = if i < self.len() {
228 if keys.is_valid(i) {
229 Some(keys.value(i))
230 } else {
231 None
232 }
233 } else {
234 None
235 };
236
237 let repeat_count = offset - previous_offset;
239 if repeat_count > 0 {
240 replicated_keys.resize(replicated_keys.len() + repeat_count, key);
241 }
242
243 previous_offset = offset;
244 }
245
246 let new_keys = Int64Array::from(replicated_keys);
247 let new_array = DictionaryArray::try_new(new_keys, self.values().clone())
248 .expect("Failed to create replicated dictionary array");
249
250 Arc::new(Self {
251 array: new_array,
252 item_type: self.item_type.clone(),
253 item_vector: self.item_vector.clone(),
254 })
255 }
256
257 fn filter(&self, filter: &vectors::BooleanVector) -> Result<VectorRef> {
258 let key_array: ArrayRef = Arc::new(self.array.keys().clone());
259 let key_vector = Helper::try_into_vector(&key_array)?;
260 let filtered_key_vector = key_vector.filter(filter)?;
261 let filtered_key_array = filtered_key_vector.to_arrow_array();
262 let filtered_key_array = filtered_key_array
263 .as_any()
264 .downcast_ref::<Int64Array>()
265 .unwrap();
266
267 let new_array = DictionaryArray::try_new(filtered_key_array.clone(), self.values().clone())
268 .expect("Failed to create filtered dictionary array");
269
270 Ok(Arc::new(Self {
271 array: new_array,
272 item_type: self.item_type.clone(),
273 item_vector: self.item_vector.clone(),
274 }))
275 }
276
277 fn cast(&self, to_type: &ConcreteDataType) -> Result<VectorRef> {
278 let new_items = self.item_vector.cast(to_type)?;
279 let new_array =
280 DictionaryArray::try_new(self.array.keys().clone(), new_items.to_arrow_array())
281 .expect("Failed to create casted dictionary array");
282 Ok(Arc::new(Self {
283 array: new_array,
284 item_type: to_type.clone(),
285 item_vector: self.item_vector.clone(),
286 }))
287 }
288
289 fn take(&self, indices: &vectors::UInt32Vector) -> Result<VectorRef> {
290 let key_array: ArrayRef = Arc::new(self.array.keys().clone());
291 let key_vector = Helper::try_into_vector(&key_array)?;
292 let new_key_vector = key_vector.take(indices)?;
293 let new_key_array = new_key_vector.to_arrow_array();
294 let new_key_array = new_key_array.as_any().downcast_ref::<Int64Array>().unwrap();
295
296 let new_array = DictionaryArray::try_new(new_key_array.clone(), self.values().clone())
297 .expect("Failed to create filtered dictionary array");
298
299 Ok(Arc::new(Self {
300 array: new_array,
301 item_type: self.item_type.clone(),
302 item_vector: self.item_vector.clone(),
303 }))
304 }
305}
306
307#[cfg(test)]
308mod tests {
309 use std::sync::Arc;
310
311 use arrow_array::StringArray;
312
313 use super::*;
314
315 fn create_test_dictionary() -> DictionaryVector {
317 let values = StringArray::from(vec!["a", "b", "c", "d"]);
321 let keys = Int64Array::from(vec![Some(0), Some(1), Some(2), None, Some(1), Some(3)]);
322 let dict_array = DictionaryArray::new(keys, Arc::new(values));
323 DictionaryVector::try_from(dict_array).unwrap()
324 }
325
326 #[test]
327 fn test_dictionary_vector_basics() {
328 let dict_vec = create_test_dictionary();
329
330 assert_eq!(dict_vec.len(), 6);
332 assert_eq!(dict_vec.null_count(), 1);
333
334 let data_type = dict_vec.data_type();
336 if let ConcreteDataType::Dictionary(dict_type) = data_type {
337 assert_eq!(*dict_type.value_type(), ConcreteDataType::string_datatype());
338 } else {
339 panic!("Expected Dictionary data type");
340 }
341
342 assert!(!dict_vec.is_null(0));
344 assert!(dict_vec.is_null(3));
345
346 assert_eq!(dict_vec.get(0), Value::String("a".to_string().into()));
348 assert_eq!(dict_vec.get(1), Value::String("b".to_string().into()));
349 assert_eq!(dict_vec.get(3), Value::Null);
350 assert_eq!(dict_vec.get(4), Value::String("b".to_string().into()));
351 }
352
353 #[test]
354 fn test_slice() {
355 let dict_vec = create_test_dictionary();
356 let sliced = dict_vec.slice(1, 3);
357
358 assert_eq!(sliced.len(), 3);
359 assert_eq!(sliced.get(0), Value::String("b".to_string().into()));
360 assert_eq!(sliced.get(1), Value::String("c".to_string().into()));
361 assert_eq!(sliced.get(2), Value::Null);
362 }
363
364 #[test]
365 fn test_replicate() {
366 let dict_vec = create_test_dictionary();
367
368 let offsets = vec![0, 2, 5];
370 let replicated = dict_vec.replicate(&offsets);
371 assert_eq!(replicated.len(), 5);
372 assert_eq!(replicated.get(0), Value::String("b".to_string().into()));
373 assert_eq!(replicated.get(1), Value::String("b".to_string().into()));
374 assert_eq!(replicated.get(2), Value::String("c".to_string().into()));
375 assert_eq!(replicated.get(3), Value::String("c".to_string().into()));
376 assert_eq!(replicated.get(4), Value::String("c".to_string().into()));
377 }
378
379 #[test]
380 fn test_filter() {
381 let dict_vec = create_test_dictionary();
382
383 let filter_values = vec![true, false, true, false, true, false];
385 let filter = vectors::BooleanVector::from(filter_values);
386
387 let filtered = dict_vec.filter(&filter).unwrap();
388 assert_eq!(filtered.len(), 3);
389
390 assert_eq!(filtered.get(0), Value::String("a".to_string().into()));
392 assert_eq!(filtered.get(1), Value::String("c".to_string().into()));
393 assert_eq!(filtered.get(2), Value::String("b".to_string().into()));
394 }
395
396 #[test]
397 fn test_cast() {
398 let dict_vec = create_test_dictionary();
399
400 let casted = dict_vec.cast(&ConcreteDataType::string_datatype()).unwrap();
402
403 assert_eq!(
405 casted.data_type(),
406 ConcreteDataType::Dictionary(DictionaryType::new(
407 ConcreteDataType::int64_datatype(),
408 ConcreteDataType::string_datatype(),
409 ))
410 );
411 assert_eq!(casted.len(), dict_vec.len());
412
413 assert_eq!(casted.get(0), Value::String("a".to_string().into()));
415 assert_eq!(casted.get(1), Value::String("b".to_string().into()));
416 assert_eq!(casted.get(2), Value::String("c".to_string().into()));
417 assert_eq!(casted.get(3), Value::Null);
418 assert_eq!(casted.get(4), Value::String("b".to_string().into()));
419 assert_eq!(casted.get(5), Value::String("d".to_string().into()));
420 }
421
422 #[test]
423 fn test_take() {
424 let dict_vec = create_test_dictionary();
425
426 let indices_vec = vec![Some(2u32), Some(0), Some(4)];
428 let indices = vectors::UInt32Vector::from(indices_vec);
429
430 let taken = dict_vec.take(&indices).unwrap();
431 assert_eq!(taken.len(), 3);
432
433 assert_eq!(taken.get(0), Value::String("c".to_string().into()));
435 assert_eq!(taken.get(1), Value::String("a".to_string().into()));
436 assert_eq!(taken.get(2), Value::String("b".to_string().into()));
437 }
438}