1use std::sync::Arc;
18
19use datafusion::arrow::buffer::NullBuffer;
20use datafusion::arrow::datatypes::Field;
21use datatypes::arrow::array::{Array, ArrayData, ArrayRef, DictionaryArray, Int64Array};
22use datatypes::arrow::datatypes::{DataType, Int64Type};
23use snafu::{ensure, OptionExt};
24
25use crate::error::{EmptyRangeSnafu, IllegalRangeSnafu, Result};
26
27pub type RangeTuple = (u32, u32);
28
29pub struct RangeArray {
69 array: DictionaryArray<Int64Type>,
70}
71
72impl RangeArray {
73 pub const fn key_type() -> DataType {
74 DataType::Int64
75 }
76
77 pub fn value_type(&self) -> DataType {
78 self.array.value_type()
79 }
80
81 pub fn try_new(dict: DictionaryArray<Int64Type>) -> Result<Self> {
82 let ranges_iter = dict
83 .keys()
84 .iter()
85 .map(|compound_key| compound_key.map(unpack))
86 .collect::<Option<Vec<_>>>()
87 .context(EmptyRangeSnafu)?;
88 Self::check_ranges(dict.values().len(), ranges_iter)?;
89
90 Ok(Self { array: dict })
91 }
92
93 pub fn from_ranges<R>(values: ArrayRef, ranges: R) -> Result<Self>
94 where
95 R: IntoIterator<Item = RangeTuple> + Clone,
96 {
97 Self::check_ranges(values.len(), ranges.clone())?;
98
99 unsafe { Ok(Self::from_ranges_unchecked(values, ranges)) }
100 }
101
102 pub unsafe fn from_ranges_unchecked<R>(values: ArrayRef, ranges: R) -> Self
111 where
112 R: IntoIterator<Item = RangeTuple>,
113 {
114 let key_array = Int64Array::from_iter(
115 ranges
116 .into_iter()
117 .map(|(offset, length)| pack(offset, length)),
118 );
119
120 let mut data = ArrayData::builder(DataType::Dictionary(
124 Box::new(Self::key_type()),
125 Box::new(values.data_type().clone()),
126 ))
127 .len(key_array.len())
128 .add_buffer(key_array.to_data().buffers()[0].clone())
129 .add_child_data(values.to_data());
130 match key_array.to_data().nulls() {
131 Some(buffer) if key_array.to_data().null_count() > 0 => {
132 data = data
133 .nulls(Some(buffer.clone()))
134 .null_count(key_array.to_data().null_count());
135 }
136 _ => data = data.null_count(0),
137 }
138 let array_data = unsafe { data.build_unchecked() };
139
140 Self {
141 array: array_data.into(),
142 }
143 }
144
145 pub fn len(&self) -> usize {
146 self.array.keys().len()
147 }
148
149 pub fn is_empty(&self) -> bool {
150 self.array.keys().is_empty()
151 }
152
153 pub fn get(&self, index: usize) -> Option<ArrayRef> {
154 if index >= self.len() {
155 return None;
156 }
157
158 let compound_key = self.array.keys().value(index);
159 let (offset, length) = unpack(compound_key);
160 let array = self.array.values().slice(offset as usize, length as usize);
161
162 Some(array)
163 }
164
165 pub fn get_offset_length(&self, index: usize) -> Option<(usize, usize)> {
166 if index >= self.len() {
167 return None;
168 }
169
170 let compound_key = self.array.keys().value(index);
171 let (offset, length) = unpack(compound_key);
172
173 Some((offset as usize, length as usize))
174 }
175
176 pub fn into_dict(self) -> DictionaryArray<Int64Type> {
180 self.array
181 }
182
183 fn check_ranges<R>(value_len: usize, ranges: R) -> Result<()>
184 where
185 R: IntoIterator<Item = RangeTuple>,
186 {
187 for (offset, length) in ranges.into_iter() {
188 ensure!(
189 offset as usize + length as usize <= value_len,
190 IllegalRangeSnafu {
191 offset,
192 length,
193 len: value_len
194 }
195 );
196 }
197 Ok(())
198 }
199
200 pub fn convert_field(field: &Field) -> Field {
203 let value_type = Box::new(field.data_type().clone());
204 Field::new(
205 field.name(),
206 Self::convert_data_type(*value_type),
207 field.is_nullable(),
208 )
209 }
210
211 pub fn convert_data_type(value_type: DataType) -> DataType {
213 DataType::Dictionary(Box::new(Self::key_type()), Box::new(value_type))
214 }
215
216 pub fn values(&self) -> &ArrayRef {
217 self.array.values()
218 }
219
220 pub fn ranges(&self) -> impl Iterator<Item = Option<RangeTuple>> + '_ {
221 self.array
222 .keys()
223 .into_iter()
224 .map(|compound| compound.map(unpack))
225 }
226}
227
228impl Array for RangeArray {
229 fn as_any(&self) -> &dyn std::any::Any {
230 self
231 }
232
233 fn into_data(self) -> ArrayData {
234 self.array.into_data()
235 }
236
237 fn to_data(&self) -> ArrayData {
238 self.array.to_data()
239 }
240
241 fn slice(&self, offset: usize, length: usize) -> ArrayRef {
242 Arc::new(self.array.slice(offset, length))
243 }
244
245 fn nulls(&self) -> Option<&NullBuffer> {
246 self.array.nulls()
247 }
248
249 fn data_type(&self) -> &DataType {
250 self.array.data_type()
251 }
252
253 fn len(&self) -> usize {
254 self.len()
255 }
256
257 fn is_empty(&self) -> bool {
258 self.is_empty()
259 }
260
261 fn offset(&self) -> usize {
262 self.array.offset()
263 }
264
265 fn get_buffer_memory_size(&self) -> usize {
266 self.array.get_buffer_memory_size()
267 }
268
269 fn get_array_memory_size(&self) -> usize {
270 self.array.get_array_memory_size()
271 }
272}
273
274impl std::fmt::Debug for RangeArray {
275 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276 let ranges = self
277 .array
278 .keys()
279 .iter()
280 .map(|compound_key| {
281 compound_key.map(|key| {
282 let (offset, length) = unpack(key);
283 offset..(offset + length)
284 })
285 })
286 .collect::<Vec<_>>();
287 f.debug_struct("RangeArray")
288 .field("base array", self.array.values())
289 .field("ranges", &ranges)
290 .finish()
291 }
292}
293
294fn pack(offset: u32, length: u32) -> i64 {
297 bytemuck::cast::<[u32; 2], i64>([offset, length])
298}
299
300fn unpack(compound: i64) -> (u32, u32) {
301 let [offset, length] = bytemuck::cast::<i64, [u32; 2]>(compound);
302 (offset, length)
303}
304
305#[cfg(test)]
306mod test {
307 use std::fmt::Write;
308 use std::sync::Arc;
309
310 use datatypes::arrow::array::UInt64Array;
311
312 use super::*;
313
314 fn expand_format(range_array: &RangeArray) -> String {
315 let mut result = String::new();
316 for i in 0..range_array.len() {
317 writeln!(result, "{:?}", range_array.get(i)).unwrap();
318 }
319 result
320 }
321
322 #[test]
323 fn construct_from_ranges() {
324 let values_array = Arc::new(UInt64Array::from_iter([1, 2, 3, 4, 5, 6, 7, 8, 9]));
325 let ranges = [(0, 2), (0, 5), (1, 1), (3, 3), (8, 1), (9, 0)];
326
327 let range_array = RangeArray::from_ranges(values_array, ranges).unwrap();
328 assert_eq!(range_array.len(), 6);
329
330 let expected = String::from(
331 "Some(PrimitiveArray<UInt64>\
332 \n[\
333 \n 1,\
334 \n 2,\
335 \n])\
336 \nSome(PrimitiveArray<UInt64>\
337 \n[\
338 \n 1,\
339 \n 2,\
340 \n 3,\
341 \n 4,\
342 \n 5,\
343 \n])\
344 \nSome(PrimitiveArray<UInt64>\
345 \n[\
346 \n 2,\
347 \n])\
348 \
349 \nSome(PrimitiveArray<UInt64>\
350 \n[\
351 \n 4,\
352 \n 5,\
353 \n 6,\
354 \n])\
355 \nSome(PrimitiveArray<UInt64>\
356 \n[\
357 \n 9,\
358 \n])\
359 \nSome(PrimitiveArray<UInt64>\
360 \n[\
361 \n])\
362 \n",
363 );
364
365 let formatted = expand_format(&range_array);
366 assert_eq!(formatted, expected);
367 }
368
369 #[test]
370 fn illegal_range() {
371 let values_array = Arc::new(UInt64Array::from_iter([1, 2, 3, 4, 5, 6, 7, 8, 9]));
372 let ranges = [(9, 1)];
373 assert!(RangeArray::from_ranges(values_array, ranges).is_err());
374 }
375
376 #[test]
377 fn dict_array_round_trip() {
378 let values_array = Arc::new(UInt64Array::from_iter([1, 2, 3, 4, 5, 6, 7, 8, 9]));
379 let ranges = [(0, 4), (1, 4), (2, 4), (3, 4), (4, 4), (5, 4)];
380 let expected = String::from(
381 "Some(PrimitiveArray<UInt64>\
382 \n[\
383 \n 1,\
384 \n 2,\
385 \n 3,\
386 \n 4,\
387 \n])\
388 \nSome(PrimitiveArray<UInt64>\
389 \n[\
390 \n 2,\
391 \n 3,\
392 \n 4,\
393 \n 5,\
394 \n])\
395 \nSome(PrimitiveArray<UInt64>\
396 \n[\
397 \n 3,\
398 \n 4,\
399 \n 5,\
400 \n 6,\
401 \n])\
402 \nSome(PrimitiveArray<UInt64>\
403 \n[\
404 \n 4,\
405 \n 5,\
406 \n 6,\
407 \n 7,\
408 \n])\
409 \nSome(PrimitiveArray<UInt64>\
410 \n[\
411 \n 5,\
412 \n 6,\
413 \n 7,\
414 \n 8,\
415 \n])\
416 \nSome(PrimitiveArray<UInt64>\
417 \n[\
418 \n 6,\
419 \n 7,\
420 \n 8,\
421 \n 9,\
422 \n])\
423 \n",
424 );
425
426 let range_array = RangeArray::from_ranges(values_array, ranges).unwrap();
427 assert_eq!(range_array.len(), 6);
428 let formatted = expand_format(&range_array);
429 assert_eq!(formatted, expected);
430
431 let dict_array = range_array.into_dict();
433 let rounded_range_array = RangeArray::try_new(dict_array).unwrap();
434 let formatted = expand_format(&rounded_range_array);
435 assert_eq!(formatted, expected);
436 }
437
438 #[test]
439 fn empty_range_array() {
440 let values_array = Arc::new(UInt64Array::from_iter([1, 2, 3, 4, 5, 6, 7, 8, 9]));
441 let ranges = [];
442 let range_array = RangeArray::from_ranges(values_array, ranges).unwrap();
443 assert!(range_array.is_empty());
444 }
445}