1use std::any::Any;
16use std::fmt::Debug;
17use std::sync::Arc;
18
19use arrow_array::builder::{ArrayBuilder, Decimal128Builder};
20use arrow_array::iterator::ArrayIter;
21use arrow_array::{Array, ArrayRef, Decimal128Array};
22use common_decimal::decimal128::{DECIMAL128_DEFAULT_SCALE, DECIMAL128_MAX_PRECISION};
23use common_decimal::Decimal128;
24use snafu::{OptionExt, ResultExt};
25
26use crate::arrow::datatypes::DataType as ArrowDataType;
27use crate::data_type::ConcreteDataType;
28use crate::error::{
29 self, CastTypeSnafu, InvalidPrecisionOrScaleSnafu, Result, ValueExceedsPrecisionSnafu,
30};
31use crate::prelude::{ScalarVector, ScalarVectorBuilder};
32use crate::serialize::Serializable;
33use crate::value::{Value, ValueRef};
34use crate::vectors;
35use crate::vectors::{MutableVector, Validity, Vector, VectorRef};
36
37#[derive(Debug, PartialEq)]
39pub struct Decimal128Vector {
40 array: Decimal128Array,
41}
42
43impl Decimal128Vector {
44 pub fn new(array: Decimal128Array) -> Self {
46 Self { array }
47 }
48
49 pub fn from_values<I: IntoIterator<Item = i128>>(iter: I) -> Self {
51 Self {
52 array: Decimal128Array::from_iter_values(iter),
53 }
54 }
55
56 pub fn from_slice<P: AsRef<[i128]>>(slice: P) -> Self {
58 let iter = slice.as_ref().iter().copied();
59 Self {
60 array: Decimal128Array::from_iter_values(iter),
61 }
62 }
63
64 pub fn from_wrapper_slice<P: AsRef<[Decimal128]>>(slice: P) -> Self {
66 let iter = slice.as_ref().iter().copied().map(|v| v.val());
67 Self {
68 array: Decimal128Array::from_iter_values(iter),
69 }
70 }
71
72 pub fn get_slice(&self, offset: usize, length: usize) -> Self {
74 let array = self.array.slice(offset, length);
75 Self { array }
76 }
77
78 pub fn with_precision_and_scale(self, precision: u8, scale: i8) -> Result<Self> {
86 self.validate_decimal_precision(precision)?;
88 let array = self
89 .array
90 .with_precision_and_scale(precision, scale)
91 .context(InvalidPrecisionOrScaleSnafu { precision, scale })?;
92 Ok(Self { array })
93 }
94
95 pub fn with_precision_and_scale_to_null(self, precision: u8, scale: i8) -> Result<Self> {
102 self.null_if_overflow_precision(precision)
103 .with_precision_and_scale(precision, scale)
104 }
105
106 pub fn value_as_string(&self, idx: usize) -> String {
108 self.array.value_as_string(idx)
109 }
110
111 pub fn precision(&self) -> u8 {
113 self.array.precision()
114 }
115
116 pub fn scale(&self) -> i8 {
118 self.array.scale()
119 }
120
121 pub(crate) fn as_arrow(&self) -> &dyn Array {
123 &self.array
124 }
125
126 fn validate_decimal_precision(&self, precision: u8) -> Result<()> {
128 self.array
129 .validate_decimal_precision(precision)
130 .context(ValueExceedsPrecisionSnafu { precision })
131 }
132
133 fn null_if_overflow_precision(&self, precision: u8) -> Self {
135 Self {
136 array: self.array.null_if_overflow_precision(precision),
137 }
138 }
139
140 fn get_decimal128_value_from_array(&self, index: usize) -> Option<Decimal128> {
142 if self.array.is_valid(index) {
143 let value = unsafe { self.array.value_unchecked(index) };
145 Some(Decimal128::new(value, self.precision(), self.scale()))
147 } else {
148 None
149 }
150 }
151}
152
153impl Vector for Decimal128Vector {
154 fn data_type(&self) -> ConcreteDataType {
155 if let ArrowDataType::Decimal128(p, s) = self.array.data_type() {
156 ConcreteDataType::decimal128_datatype(*p, *s)
157 } else {
158 ConcreteDataType::decimal128_default_datatype()
159 }
160 }
161
162 fn vector_type_name(&self) -> String {
163 "Decimal128Vector".to_string()
164 }
165
166 fn as_any(&self) -> &dyn Any {
167 self
168 }
169
170 fn len(&self) -> usize {
171 self.array.len()
172 }
173
174 fn to_arrow_array(&self) -> ArrayRef {
175 Arc::new(self.array.clone())
176 }
177
178 fn to_boxed_arrow_array(&self) -> Box<dyn Array> {
179 Box::new(self.array.clone())
180 }
181
182 fn validity(&self) -> Validity {
183 vectors::impl_validity_for_vector!(self.array)
184 }
185
186 fn memory_size(&self) -> usize {
187 self.array.get_buffer_memory_size()
188 }
189
190 fn null_count(&self) -> usize {
191 self.array.null_count()
192 }
193
194 fn is_null(&self, row: usize) -> bool {
195 self.array.is_null(row)
196 }
197
198 fn slice(&self, offset: usize, length: usize) -> VectorRef {
199 Arc::new(self.get_slice(offset, length))
200 }
201
202 fn get(&self, index: usize) -> Value {
203 if let Some(decimal) = self.get_decimal128_value_from_array(index) {
204 Value::Decimal128(decimal)
205 } else {
206 Value::Null
207 }
208 }
209
210 fn get_ref(&self, index: usize) -> ValueRef {
211 if let Some(decimal) = self.get_decimal128_value_from_array(index) {
212 ValueRef::Decimal128(decimal)
213 } else {
214 ValueRef::Null
215 }
216 }
217}
218
219impl From<Decimal128Array> for Decimal128Vector {
220 fn from(array: Decimal128Array) -> Self {
221 Self { array }
222 }
223}
224
225impl From<Vec<Option<i128>>> for Decimal128Vector {
226 fn from(vec: Vec<Option<i128>>) -> Self {
227 let array = Decimal128Array::from_iter(vec);
228 Self { array }
229 }
230}
231
232impl Serializable for Decimal128Vector {
233 fn serialize_to_json(&self) -> Result<Vec<serde_json::Value>> {
234 self.iter_data()
235 .map(|v| match v {
236 None => Ok(serde_json::Value::Null), Some(d) => serde_json::to_value(d),
238 })
239 .collect::<serde_json::Result<_>>()
240 .context(error::SerializeSnafu)
241 }
242}
243
244pub struct Decimal128Iter<'a> {
245 precision: u8,
246 scale: i8,
247 iter: ArrayIter<&'a Decimal128Array>,
248}
249
250impl Iterator for Decimal128Iter<'_> {
251 type Item = Option<Decimal128>;
252
253 fn next(&mut self) -> Option<Self::Item> {
254 self.iter
255 .next()
256 .map(|item| item.map(|v| Decimal128::new(v, self.precision, self.scale)))
257 }
258
259 fn size_hint(&self) -> (usize, Option<usize>) {
260 self.iter.size_hint()
261 }
262}
263
264impl ScalarVector for Decimal128Vector {
265 type OwnedItem = Decimal128;
266
267 type RefItem<'a> = Decimal128;
268
269 type Iter<'a> = Decimal128Iter<'a>;
270
271 type Builder = Decimal128VectorBuilder;
272
273 fn get_data(&self, idx: usize) -> Option<Self::RefItem<'_>> {
274 self.get_decimal128_value_from_array(idx)
275 }
276
277 fn iter_data(&self) -> Self::Iter<'_> {
278 Self::Iter {
279 precision: self.precision(),
280 scale: self.scale(),
281 iter: self.array.iter(),
282 }
283 }
284}
285
286pub struct Decimal128VectorBuilder {
287 precision: u8,
288 scale: i8,
289 mutable_array: Decimal128Builder,
290}
291
292impl MutableVector for Decimal128VectorBuilder {
293 fn data_type(&self) -> ConcreteDataType {
294 ConcreteDataType::decimal128_datatype(self.precision, self.scale)
295 }
296
297 fn len(&self) -> usize {
298 self.mutable_array.len()
299 }
300
301 fn as_any(&self) -> &dyn Any {
302 self
303 }
304
305 fn as_mut_any(&mut self) -> &mut dyn Any {
306 self
307 }
308
309 fn to_vector(&mut self) -> VectorRef {
310 Arc::new(self.finish())
311 }
312
313 fn to_vector_cloned(&self) -> VectorRef {
314 Arc::new(self.finish_cloned())
315 }
316
317 fn try_push_value_ref(&mut self, value: ValueRef) -> Result<()> {
318 let decimal_val = value.as_decimal128()?.map(|v| v.val());
319 self.mutable_array.append_option(decimal_val);
320 Ok(())
321 }
322
323 fn push_null(&mut self) {
324 self.mutable_array.append_null();
325 }
326
327 fn extend_slice_of(&mut self, vector: &dyn Vector, offset: usize, length: usize) -> Result<()> {
328 let decimal_vector =
329 vector
330 .as_any()
331 .downcast_ref::<Decimal128Vector>()
332 .context(CastTypeSnafu {
333 msg: format!(
334 "Failed to cast vector from {} to Decimal128Vector",
335 vector.vector_type_name(),
336 ),
337 })?;
338 let slice = decimal_vector.get_slice(offset, length);
339 self.mutable_array
340 .extend(slice.iter_data().map(|v| v.map(|d| d.val())));
341 Ok(())
342 }
343}
344
345impl ScalarVectorBuilder for Decimal128VectorBuilder {
346 type VectorType = Decimal128Vector;
347
348 fn with_capacity(capacity: usize) -> Self {
349 Self {
350 precision: DECIMAL128_MAX_PRECISION,
351 scale: DECIMAL128_DEFAULT_SCALE,
352 mutable_array: Decimal128Builder::with_capacity(capacity),
353 }
354 }
355
356 fn push(&mut self, value: Option<<Self::VectorType as ScalarVector>::RefItem<'_>>) {
357 self.mutable_array.append_option(value.map(|v| v.val()));
358 }
359
360 fn finish(&mut self) -> Self::VectorType {
361 Decimal128Vector {
362 array: self.mutable_array.finish(),
363 }
364 }
365
366 fn finish_cloned(&self) -> Self::VectorType {
367 Decimal128Vector {
368 array: self.mutable_array.finish_cloned(),
369 }
370 }
371}
372
373impl Decimal128VectorBuilder {
374 pub fn with_precision_and_scale(self, precision: u8, scale: i8) -> Result<Self> {
376 let mutable_array = self
377 .mutable_array
378 .with_precision_and_scale(precision, scale)
379 .context(InvalidPrecisionOrScaleSnafu { precision, scale })?;
380 Ok(Self {
381 precision,
382 scale,
383 mutable_array,
384 })
385 }
386}
387
388vectors::impl_try_from_arrow_array_for_vector!(Decimal128Array, Decimal128Vector);
389
390pub(crate) fn replicate_decimal128(
391 vector: &Decimal128Vector,
392 offsets: &[usize],
393) -> Decimal128Vector {
394 assert_eq!(offsets.len(), vector.len());
395
396 if offsets.is_empty() {
397 return vector.get_slice(0, 0);
398 }
399
400 let mut builder = Decimal128VectorBuilder::with_capacity(*offsets.last().unwrap())
402 .with_precision_and_scale(vector.precision(), vector.scale())
403 .unwrap();
404
405 let mut previous_offset = 0;
406
407 for (offset, value) in offsets.iter().zip(vector.array.iter()) {
408 let repeat_times = *offset - previous_offset;
409 match value {
410 Some(data) => {
411 unsafe {
412 builder
414 .mutable_array
415 .append_trusted_len_iter(std::iter::repeat_n(data, repeat_times));
416 }
417 }
418 None => {
419 builder.mutable_array.append_nulls(repeat_times);
420 }
421 }
422 previous_offset = *offset;
423 }
424 builder.finish()
425}
426
427#[cfg(test)]
428pub mod tests {
429 use arrow_array::Decimal128Array;
430 use common_decimal::Decimal128;
431
432 use super::*;
433 use crate::vectors::operations::VectorOp;
434 use crate::vectors::Int8Vector;
435
436 #[test]
437 fn test_from_arrow_decimal128_array() {
438 let decimal_array = Decimal128Array::from(vec![Some(123), Some(456)]);
439 let decimal_vector = Decimal128Vector::from(decimal_array);
440 let expect = Decimal128Vector::from_values(vec![123, 456]);
441 assert_eq!(decimal_vector, expect);
442
443 let decimal_array = Decimal128Array::from(vec![Some(123), Some(456)])
444 .with_precision_and_scale(10, 2)
445 .unwrap();
446 let decimal_vector = Decimal128Vector::from(decimal_array);
447 let expect = Decimal128Vector::from_values(vec![123, 456])
448 .with_precision_and_scale(10, 2)
449 .unwrap();
450 assert_eq!(decimal_vector, expect);
451
452 let decimal_array: ArrayRef = Arc::new(
453 Decimal128Array::from(vec![Some(123), Some(456)])
454 .with_precision_and_scale(3, 2)
455 .unwrap(),
456 );
457 let decimal_vector = Decimal128Vector::try_from_arrow_array(decimal_array).unwrap();
458 let expect = Decimal128Vector::from_values(vec![123, 456])
459 .with_precision_and_scale(3, 2)
460 .unwrap();
461 assert_eq!(decimal_vector, expect);
462 }
463
464 #[test]
465 fn test_from_slice() {
466 let decimal_vector = Decimal128Vector::from_slice([123, 456]);
467 let decimal_vector2 = Decimal128Vector::from_wrapper_slice([
468 Decimal128::new(123, 10, 2),
469 Decimal128::new(456, 10, 2),
470 ]);
471 let expect = Decimal128Vector::from_values(vec![123, 456]);
472
473 assert_eq!(decimal_vector, expect);
474 assert_eq!(decimal_vector2, expect);
475 }
476
477 #[test]
478 fn test_decimal128_vector_slice() {
479 let data = vec![100, 200, 300];
480 let decimal_vector = Decimal128Vector::from_values(data.clone())
482 .with_precision_and_scale(10, 2)
483 .unwrap();
484 let decimal_vector2 = decimal_vector.slice(1, 2);
485 assert_eq!(decimal_vector2.len(), 2);
486 assert_eq!(
487 decimal_vector2.get(0),
488 Value::Decimal128(Decimal128::new(200, 10, 2))
489 );
490 assert_eq!(
491 decimal_vector2.get(1),
492 Value::Decimal128(Decimal128::new(300, 10, 2))
493 );
494 }
495
496 #[test]
497 fn test_decimal128_vector_basic() {
498 let data = vec![100, 200, 300];
499 let decimal_vector = Decimal128Vector::from_values(data.clone())
501 .with_precision_and_scale(10, 2)
502 .unwrap();
503
504 assert_eq!(decimal_vector.value_as_string(0), "1.00");
506
507 for i in 0..data.len() {
509 assert_eq!(
510 decimal_vector.get_data(i),
511 Some(Decimal128::new((i + 1) as i128 * 100, 10, 2))
512 );
513 assert_eq!(
514 decimal_vector.get(i),
515 Value::Decimal128(Decimal128::new((i + 1) as i128 * 100, 10, 2))
516 );
517 assert_eq!(
518 decimal_vector.get_ref(i),
519 ValueRef::Decimal128(Decimal128::new((i + 1) as i128 * 100, 10, 2))
520 );
521 }
522
523 let decimal_vector = decimal_vector
526 .with_precision_and_scale_to_null(2, 1)
527 .unwrap();
528 assert_eq!(decimal_vector.len(), 3);
529 assert!(decimal_vector.is_null(0));
530 assert!(decimal_vector.is_null(1));
531 assert!(decimal_vector.is_null(2));
532 }
533
534 #[test]
535 fn test_decimal128_vector_builder() {
536 let mut decimal_builder = Decimal128VectorBuilder::with_capacity(3)
537 .with_precision_and_scale(10, 2)
538 .unwrap();
539 decimal_builder.push(Some(Decimal128::new(100, 10, 2)));
540 decimal_builder.push(Some(Decimal128::new(200, 10, 2)));
541 decimal_builder.push(Some(Decimal128::new(300, 10, 2)));
542 let decimal_vector = decimal_builder.finish();
543 assert_eq!(decimal_vector.len(), 3);
544 assert_eq!(decimal_vector.precision(), 10);
545 assert_eq!(decimal_vector.scale(), 2);
546 assert_eq!(
547 decimal_vector.get(0),
548 Value::Decimal128(Decimal128::new(100, 10, 2))
549 );
550 assert_eq!(
551 decimal_vector.get(1),
552 Value::Decimal128(Decimal128::new(200, 10, 2))
553 );
554 assert_eq!(
555 decimal_vector.get(2),
556 Value::Decimal128(Decimal128::new(300, 10, 2))
557 );
558
559 let mut decimal_builder = Decimal128VectorBuilder::with_capacity(3);
561 decimal_builder.push(Some(Decimal128::new(123, 38, 10)));
562 decimal_builder.push(Some(Decimal128::new(1234, 38, 10)));
563 decimal_builder.push(Some(Decimal128::new(12345, 38, 10)));
564 let decimal_vector = decimal_builder.finish();
565 assert_eq!(decimal_vector.precision(), 38);
566 assert_eq!(decimal_vector.scale(), 10);
567 let result = decimal_vector.with_precision_and_scale(3, 2);
568 assert_eq!(
569 "Value exceeds the precision 3 bound",
570 result.unwrap_err().to_string()
571 );
572 }
573
574 #[test]
575 fn test_cast_to_decimal128() {
576 let vector = Int8Vector::from_values(vec![1, 2, 3, 4, 100]);
577 let casted_vector = vector.cast(&ConcreteDataType::decimal128_datatype(3, 1));
578 assert!(casted_vector.is_ok());
579 let vector = casted_vector.unwrap();
580 let array = vector.as_any().downcast_ref::<Decimal128Vector>().unwrap();
581 assert!(array.is_null(4));
583 }
584
585 #[test]
586 fn test_decimal28_vector_iter_data() {
587 let vector = Decimal128Vector::from_values(vec![1, 2, 3, 4])
588 .with_precision_and_scale(3, 1)
589 .unwrap();
590 let mut iter = vector.iter_data();
591 assert_eq!(iter.next(), Some(Some(Decimal128::new(1, 3, 1))));
592 assert_eq!(iter.next(), Some(Some(Decimal128::new(2, 3, 1))));
593 assert_eq!(iter.next(), Some(Some(Decimal128::new(3, 3, 1))));
594 assert_eq!(iter.next(), Some(Some(Decimal128::new(4, 3, 1))));
595 assert_eq!(iter.next(), None);
596
597 let values = vector
598 .iter_data()
599 .filter_map(|v| v.map(|x| x.val() * 2))
600 .collect::<Vec<_>>();
601 assert_eq!(values, vec![2, 4, 6, 8]);
602 }
603
604 #[test]
605 fn test_decimal128_vector_builder_finish_cloned() {
606 let mut builder = Decimal128VectorBuilder::with_capacity(1024);
607 builder.push(Some(Decimal128::new(1, 3, 1)));
608 builder.push(Some(Decimal128::new(1, 3, 1)));
609 builder.push(Some(Decimal128::new(1, 3, 1)));
610 builder.push(Some(Decimal128::new(1, 3, 1)));
611 let vector = builder.finish_cloned();
612 assert_eq!(vector.len(), 4);
613 assert_eq!(builder.len(), 4);
614 }
615}