1use std::any::Any;
16use std::fmt::Debug;
17use std::sync::Arc;
18
19use arrow_array::builder::{ArrayBuilder, Decimal128Builder};
20use arrow_array::iterator::ArrayIter;
21use arrow_array::{Array, ArrayRef, Decimal128Array};
22use common_decimal::Decimal128;
23use common_decimal::decimal128::{DECIMAL128_DEFAULT_SCALE, DECIMAL128_MAX_PRECISION};
24use snafu::{OptionExt, ResultExt};
25
26use crate::arrow::datatypes::DataType as ArrowDataType;
27use crate::data_type::ConcreteDataType;
28use crate::error::{
29 self, CastTypeSnafu, InvalidPrecisionOrScaleSnafu, Result, ValueExceedsPrecisionSnafu,
30};
31use crate::prelude::{ScalarVector, ScalarVectorBuilder};
32use crate::serialize::Serializable;
33use crate::value::{Value, ValueRef};
34use crate::vectors;
35use crate::vectors::{MutableVector, Validity, Vector, VectorRef};
36
37#[derive(Debug, PartialEq)]
39pub struct Decimal128Vector {
40 array: Decimal128Array,
41}
42
43impl Decimal128Vector {
44 pub fn new(array: Decimal128Array) -> Self {
46 Self { array }
47 }
48
49 pub fn from_values<I: IntoIterator<Item = i128>>(iter: I) -> Self {
51 Self {
52 array: Decimal128Array::from_iter_values(iter),
53 }
54 }
55
56 pub fn from_slice<P: AsRef<[i128]>>(slice: P) -> Self {
58 let iter = slice.as_ref().iter().copied();
59 Self {
60 array: Decimal128Array::from_iter_values(iter),
61 }
62 }
63
64 pub fn from_wrapper_slice<P: AsRef<[Decimal128]>>(slice: P) -> Self {
66 let iter = slice.as_ref().iter().copied().map(|v| v.val());
67 Self {
68 array: Decimal128Array::from_iter_values(iter),
69 }
70 }
71
72 pub fn get_slice(&self, offset: usize, length: usize) -> Self {
74 let array = self.array.slice(offset, length);
75 Self { array }
76 }
77
78 pub fn with_precision_and_scale(self, precision: u8, scale: i8) -> Result<Self> {
86 let array = self
87 .array
88 .with_precision_and_scale(precision, scale)
89 .context(InvalidPrecisionOrScaleSnafu { precision, scale })?;
90 Ok(Self { array })
91 }
92
93 pub fn with_precision_and_scale_to_null(self, precision: u8, scale: i8) -> Result<Self> {
100 self.null_if_overflow_precision(precision)
101 .with_precision_and_scale(precision, scale)
102 }
103
104 pub fn value_as_string(&self, idx: usize) -> String {
106 self.array.value_as_string(idx)
107 }
108
109 pub fn precision(&self) -> u8 {
111 self.array.precision()
112 }
113
114 pub fn scale(&self) -> i8 {
116 self.array.scale()
117 }
118
119 pub(crate) fn as_arrow(&self) -> &dyn Array {
121 &self.array
122 }
123
124 pub fn validate_decimal_precision(&self, precision: u8) -> Result<()> {
126 self.array
127 .validate_decimal_precision(precision)
128 .context(ValueExceedsPrecisionSnafu { precision })
129 }
130
131 fn null_if_overflow_precision(&self, precision: u8) -> Self {
133 Self {
134 array: self.array.null_if_overflow_precision(precision),
135 }
136 }
137
138 fn get_decimal128_value_from_array(&self, index: usize) -> Option<Decimal128> {
140 if self.array.is_valid(index) {
141 let value = unsafe { self.array.value_unchecked(index) };
143 Some(Decimal128::new(value, self.precision(), self.scale()))
145 } else {
146 None
147 }
148 }
149}
150
151impl Vector for Decimal128Vector {
152 fn data_type(&self) -> ConcreteDataType {
153 if let ArrowDataType::Decimal128(p, s) = self.array.data_type() {
154 ConcreteDataType::decimal128_datatype(*p, *s)
155 } else {
156 ConcreteDataType::decimal128_default_datatype()
157 }
158 }
159
160 fn vector_type_name(&self) -> String {
161 "Decimal128Vector".to_string()
162 }
163
164 fn as_any(&self) -> &dyn Any {
165 self
166 }
167
168 fn len(&self) -> usize {
169 self.array.len()
170 }
171
172 fn to_arrow_array(&self) -> ArrayRef {
173 Arc::new(self.array.clone())
174 }
175
176 fn to_boxed_arrow_array(&self) -> Box<dyn Array> {
177 Box::new(self.array.clone())
178 }
179
180 fn validity(&self) -> Validity {
181 vectors::impl_validity_for_vector!(self.array)
182 }
183
184 fn memory_size(&self) -> usize {
185 self.array.get_buffer_memory_size()
186 }
187
188 fn null_count(&self) -> usize {
189 self.array.null_count()
190 }
191
192 fn is_null(&self, row: usize) -> bool {
193 self.array.is_null(row)
194 }
195
196 fn slice(&self, offset: usize, length: usize) -> VectorRef {
197 Arc::new(self.get_slice(offset, length))
198 }
199
200 fn get(&self, index: usize) -> Value {
201 if let Some(decimal) = self.get_decimal128_value_from_array(index) {
202 Value::Decimal128(decimal)
203 } else {
204 Value::Null
205 }
206 }
207
208 fn get_ref(&self, index: usize) -> ValueRef<'_> {
209 if let Some(decimal) = self.get_decimal128_value_from_array(index) {
210 ValueRef::Decimal128(decimal)
211 } else {
212 ValueRef::Null
213 }
214 }
215}
216
217impl From<Decimal128Array> for Decimal128Vector {
218 fn from(array: Decimal128Array) -> Self {
219 Self { array }
220 }
221}
222
223impl From<Vec<Option<i128>>> for Decimal128Vector {
224 fn from(vec: Vec<Option<i128>>) -> Self {
225 let array = Decimal128Array::from_iter(vec);
226 Self { array }
227 }
228}
229
230impl Serializable for Decimal128Vector {
231 fn serialize_to_json(&self) -> Result<Vec<serde_json::Value>> {
232 self.iter_data()
233 .map(|v| match v {
234 None => Ok(serde_json::Value::Null), Some(d) => serde_json::to_value(d),
236 })
237 .collect::<serde_json::Result<_>>()
238 .context(error::SerializeSnafu)
239 }
240}
241
242pub struct Decimal128Iter<'a> {
243 precision: u8,
244 scale: i8,
245 iter: ArrayIter<&'a Decimal128Array>,
246}
247
248impl Iterator for Decimal128Iter<'_> {
249 type Item = Option<Decimal128>;
250
251 fn next(&mut self) -> Option<Self::Item> {
252 self.iter
253 .next()
254 .map(|item| item.map(|v| Decimal128::new(v, self.precision, self.scale)))
255 }
256
257 fn size_hint(&self) -> (usize, Option<usize>) {
258 self.iter.size_hint()
259 }
260}
261
262impl ScalarVector for Decimal128Vector {
263 type OwnedItem = Decimal128;
264
265 type RefItem<'a> = Decimal128;
266
267 type Iter<'a> = Decimal128Iter<'a>;
268
269 type Builder = Decimal128VectorBuilder;
270
271 fn get_data(&self, idx: usize) -> Option<Self::RefItem<'_>> {
272 self.get_decimal128_value_from_array(idx)
273 }
274
275 fn iter_data(&self) -> Self::Iter<'_> {
276 Self::Iter {
277 precision: self.precision(),
278 scale: self.scale(),
279 iter: self.array.iter(),
280 }
281 }
282}
283
284pub struct Decimal128VectorBuilder {
285 precision: u8,
286 scale: i8,
287 mutable_array: Decimal128Builder,
288}
289
290impl MutableVector for Decimal128VectorBuilder {
291 fn data_type(&self) -> ConcreteDataType {
292 ConcreteDataType::decimal128_datatype(self.precision, self.scale)
293 }
294
295 fn len(&self) -> usize {
296 self.mutable_array.len()
297 }
298
299 fn as_any(&self) -> &dyn Any {
300 self
301 }
302
303 fn as_mut_any(&mut self) -> &mut dyn Any {
304 self
305 }
306
307 fn to_vector(&mut self) -> VectorRef {
308 Arc::new(self.finish())
309 }
310
311 fn to_vector_cloned(&self) -> VectorRef {
312 Arc::new(self.finish_cloned())
313 }
314
315 fn try_push_value_ref(&mut self, value: &ValueRef) -> Result<()> {
316 let decimal_val = value.try_into_decimal128()?.map(|v| v.val());
317 self.mutable_array.append_option(decimal_val);
318 Ok(())
319 }
320
321 fn push_null(&mut self) {
322 self.mutable_array.append_null();
323 }
324
325 fn extend_slice_of(&mut self, vector: &dyn Vector, offset: usize, length: usize) -> Result<()> {
326 let decimal_vector =
327 vector
328 .as_any()
329 .downcast_ref::<Decimal128Vector>()
330 .context(CastTypeSnafu {
331 msg: format!(
332 "Failed to cast vector from {} to Decimal128Vector",
333 vector.vector_type_name(),
334 ),
335 })?;
336 let slice = decimal_vector.get_slice(offset, length);
337 self.mutable_array
338 .extend(slice.iter_data().map(|v| v.map(|d| d.val())));
339 Ok(())
340 }
341}
342
343impl ScalarVectorBuilder for Decimal128VectorBuilder {
344 type VectorType = Decimal128Vector;
345
346 fn with_capacity(capacity: usize) -> Self {
347 Self {
348 precision: DECIMAL128_MAX_PRECISION,
349 scale: DECIMAL128_DEFAULT_SCALE,
350 mutable_array: Decimal128Builder::with_capacity(capacity),
351 }
352 }
353
354 fn push(&mut self, value: Option<<Self::VectorType as ScalarVector>::RefItem<'_>>) {
355 self.mutable_array.append_option(value.map(|v| v.val()));
356 }
357
358 fn finish(&mut self) -> Self::VectorType {
359 Decimal128Vector {
360 array: self.mutable_array.finish(),
361 }
362 }
363
364 fn finish_cloned(&self) -> Self::VectorType {
365 Decimal128Vector {
366 array: self.mutable_array.finish_cloned(),
367 }
368 }
369}
370
371impl Decimal128VectorBuilder {
372 pub fn with_precision_and_scale(self, precision: u8, scale: i8) -> Result<Self> {
374 let mutable_array = self
375 .mutable_array
376 .with_precision_and_scale(precision, scale)
377 .context(InvalidPrecisionOrScaleSnafu { precision, scale })?;
378 Ok(Self {
379 precision,
380 scale,
381 mutable_array,
382 })
383 }
384}
385
386vectors::impl_try_from_arrow_array_for_vector!(Decimal128Array, Decimal128Vector);
387
388pub(crate) fn replicate_decimal128(
389 vector: &Decimal128Vector,
390 offsets: &[usize],
391) -> Decimal128Vector {
392 assert_eq!(offsets.len(), vector.len());
393
394 if offsets.is_empty() {
395 return vector.get_slice(0, 0);
396 }
397
398 let mut builder = Decimal128VectorBuilder::with_capacity(*offsets.last().unwrap())
400 .with_precision_and_scale(vector.precision(), vector.scale())
401 .unwrap();
402
403 let mut previous_offset = 0;
404
405 for (offset, value) in offsets.iter().zip(vector.array.iter()) {
406 let repeat_times = *offset - previous_offset;
407 match value {
408 Some(data) => {
409 unsafe {
410 builder
412 .mutable_array
413 .append_trusted_len_iter(std::iter::repeat_n(data, repeat_times));
414 }
415 }
416 None => {
417 builder.mutable_array.append_nulls(repeat_times);
418 }
419 }
420 previous_offset = *offset;
421 }
422 builder.finish()
423}
424
425#[cfg(test)]
426pub mod tests {
427 use arrow_array::Decimal128Array;
428 use common_decimal::Decimal128;
429
430 use super::*;
431 use crate::vectors::Int8Vector;
432 use crate::vectors::operations::VectorOp;
433
434 #[test]
435 fn test_from_arrow_decimal128_array() {
436 let decimal_array = Decimal128Array::from(vec![Some(123), Some(456)]);
437 let decimal_vector = Decimal128Vector::from(decimal_array);
438 let expect = Decimal128Vector::from_values(vec![123, 456]);
439 assert_eq!(decimal_vector, expect);
440
441 let decimal_array = Decimal128Array::from(vec![Some(123), Some(456)])
442 .with_precision_and_scale(10, 2)
443 .unwrap();
444 let decimal_vector = Decimal128Vector::from(decimal_array);
445 let expect = Decimal128Vector::from_values(vec![123, 456])
446 .with_precision_and_scale(10, 2)
447 .unwrap();
448 assert_eq!(decimal_vector, expect);
449
450 let decimal_array: ArrayRef = Arc::new(
451 Decimal128Array::from(vec![Some(123), Some(456)])
452 .with_precision_and_scale(3, 2)
453 .unwrap(),
454 );
455 let decimal_vector = Decimal128Vector::try_from_arrow_array(decimal_array).unwrap();
456 let expect = Decimal128Vector::from_values(vec![123, 456])
457 .with_precision_and_scale(3, 2)
458 .unwrap();
459 assert_eq!(decimal_vector, expect);
460 }
461
462 #[test]
463 fn test_from_slice() {
464 let decimal_vector = Decimal128Vector::from_slice([123, 456]);
465 let decimal_vector2 = Decimal128Vector::from_wrapper_slice([
466 Decimal128::new(123, 10, 2),
467 Decimal128::new(456, 10, 2),
468 ]);
469 let expect = Decimal128Vector::from_values(vec![123, 456]);
470
471 assert_eq!(decimal_vector, expect);
472 assert_eq!(decimal_vector2, expect);
473 }
474
475 #[test]
476 fn test_decimal128_vector_slice() {
477 let data = vec![100, 200, 300];
478 let decimal_vector = Decimal128Vector::from_values(data.clone())
480 .with_precision_and_scale(10, 2)
481 .unwrap();
482 let decimal_vector2 = decimal_vector.slice(1, 2);
483 assert_eq!(decimal_vector2.len(), 2);
484 assert_eq!(
485 decimal_vector2.get(0),
486 Value::Decimal128(Decimal128::new(200, 10, 2))
487 );
488 assert_eq!(
489 decimal_vector2.get(1),
490 Value::Decimal128(Decimal128::new(300, 10, 2))
491 );
492 }
493
494 #[test]
495 fn test_decimal128_vector_basic() {
496 let data = vec![100, 200, 300];
497 let decimal_vector = Decimal128Vector::from_values(data.clone())
499 .with_precision_and_scale(10, 2)
500 .unwrap();
501
502 assert_eq!(decimal_vector.value_as_string(0), "1.00");
504
505 for i in 0..data.len() {
507 assert_eq!(
508 decimal_vector.get_data(i),
509 Some(Decimal128::new((i + 1) as i128 * 100, 10, 2))
510 );
511 assert_eq!(
512 decimal_vector.get(i),
513 Value::Decimal128(Decimal128::new((i + 1) as i128 * 100, 10, 2))
514 );
515 assert_eq!(
516 decimal_vector.get_ref(i),
517 ValueRef::Decimal128(Decimal128::new((i + 1) as i128 * 100, 10, 2))
518 );
519 }
520
521 let decimal_vector = decimal_vector
524 .with_precision_and_scale_to_null(2, 1)
525 .unwrap();
526 assert_eq!(decimal_vector.len(), 3);
527 assert!(decimal_vector.is_null(0));
528 assert!(decimal_vector.is_null(1));
529 assert!(decimal_vector.is_null(2));
530 }
531
532 #[test]
533 fn test_decimal128_vector_builder() {
534 let mut decimal_builder = Decimal128VectorBuilder::with_capacity(3)
535 .with_precision_and_scale(10, 2)
536 .unwrap();
537 decimal_builder.push(Some(Decimal128::new(100, 10, 2)));
538 decimal_builder.push(Some(Decimal128::new(200, 10, 2)));
539 decimal_builder.push(Some(Decimal128::new(300, 10, 2)));
540 let decimal_vector = decimal_builder.finish();
541 assert_eq!(decimal_vector.len(), 3);
542 assert_eq!(decimal_vector.precision(), 10);
543 assert_eq!(decimal_vector.scale(), 2);
544 assert_eq!(
545 decimal_vector.get(0),
546 Value::Decimal128(Decimal128::new(100, 10, 2))
547 );
548 assert_eq!(
549 decimal_vector.get(1),
550 Value::Decimal128(Decimal128::new(200, 10, 2))
551 );
552 assert_eq!(
553 decimal_vector.get(2),
554 Value::Decimal128(Decimal128::new(300, 10, 2))
555 );
556
557 let mut decimal_builder = Decimal128VectorBuilder::with_capacity(3);
559 decimal_builder.push(Some(Decimal128::new(123, 38, 10)));
560 decimal_builder.push(Some(Decimal128::new(1234, 38, 10)));
561 decimal_builder.push(Some(Decimal128::new(12345, 38, 10)));
562 let decimal_vector = decimal_builder.finish();
563 assert_eq!(decimal_vector.precision(), 38);
564 assert_eq!(decimal_vector.scale(), 10);
565 let result = decimal_vector
566 .with_precision_and_scale(3, 2)
567 .and_then(|x| x.validate_decimal_precision(3));
568 assert_eq!(
569 "Value exceeds the precision 3 bound",
570 result.unwrap_err().to_string()
571 );
572 }
573
574 #[test]
575 fn test_cast_to_decimal128() {
576 let vector = Int8Vector::from_values(vec![1, 2, 3, 4, 100]);
577 let casted_vector = vector.cast(&ConcreteDataType::decimal128_datatype(3, 1));
578 assert!(casted_vector.is_ok());
579 let vector = casted_vector.unwrap();
580 let array = vector.as_any().downcast_ref::<Decimal128Vector>().unwrap();
581 assert!(array.is_null(4));
583 }
584
585 #[test]
586 fn test_decimal28_vector_iter_data() {
587 let vector = Decimal128Vector::from_values(vec![1, 2, 3, 4])
588 .with_precision_and_scale(3, 1)
589 .unwrap();
590 let mut iter = vector.iter_data();
591 assert_eq!(iter.next(), Some(Some(Decimal128::new(1, 3, 1))));
592 assert_eq!(iter.next(), Some(Some(Decimal128::new(2, 3, 1))));
593 assert_eq!(iter.next(), Some(Some(Decimal128::new(3, 3, 1))));
594 assert_eq!(iter.next(), Some(Some(Decimal128::new(4, 3, 1))));
595 assert_eq!(iter.next(), None);
596
597 let values = vector
598 .iter_data()
599 .filter_map(|v| v.map(|x| x.val() * 2))
600 .collect::<Vec<_>>();
601 assert_eq!(values, vec![2, 4, 6, 8]);
602 }
603
604 #[test]
605 fn test_decimal128_vector_builder_finish_cloned() {
606 let mut builder = Decimal128VectorBuilder::with_capacity(1024);
607 builder.push(Some(Decimal128::new(1, 3, 1)));
608 builder.push(Some(Decimal128::new(1, 3, 1)));
609 builder.push(Some(Decimal128::new(1, 3, 1)));
610 builder.push(Some(Decimal128::new(1, 3, 1)));
611 let vector = builder.finish_cloned();
612 assert_eq!(vector.len(), 4);
613 assert_eq!(builder.len(), 4);
614 }
615}