1use datafusion::arrow::datatypes::Field;
18use datatypes::arrow::array::{Array, ArrayData, ArrayRef, DictionaryArray, Int64Array};
19use datatypes::arrow::datatypes::{DataType, Int64Type};
20use snafu::{OptionExt, ensure};
21
22use crate::error::{EmptyRangeSnafu, IllegalRangeSnafu, Result};
23
24pub type RangeTuple = (u32, u32);
25
26pub struct RangeArray {
66 array: DictionaryArray<Int64Type>,
67}
68
69impl RangeArray {
70 pub const fn key_type() -> DataType {
71 DataType::Int64
72 }
73
74 pub fn value_type(&self) -> DataType {
75 self.array.value_type()
76 }
77
78 pub fn try_new(dict: DictionaryArray<Int64Type>) -> Result<Self> {
79 let ranges_iter = dict
80 .keys()
81 .iter()
82 .map(|compound_key| compound_key.map(unpack))
83 .collect::<Option<Vec<_>>>()
84 .context(EmptyRangeSnafu)?;
85 Self::check_ranges(dict.values().len(), ranges_iter)?;
86
87 Ok(Self { array: dict })
88 }
89
90 pub fn from_ranges<R>(values: ArrayRef, ranges: R) -> Result<Self>
91 where
92 R: IntoIterator<Item = RangeTuple> + Clone,
93 {
94 Self::check_ranges(values.len(), ranges.clone())?;
95
96 unsafe { Ok(Self::from_ranges_unchecked(values, ranges)) }
97 }
98
99 pub unsafe fn from_ranges_unchecked<R>(values: ArrayRef, ranges: R) -> Self
108 where
109 R: IntoIterator<Item = RangeTuple>,
110 {
111 let key_array = Int64Array::from_iter(
112 ranges
113 .into_iter()
114 .map(|(offset, length)| pack(offset, length)),
115 );
116
117 let mut data = ArrayData::builder(DataType::Dictionary(
121 Box::new(Self::key_type()),
122 Box::new(values.data_type().clone()),
123 ))
124 .len(key_array.len())
125 .add_buffer(key_array.to_data().buffers()[0].clone())
126 .add_child_data(values.to_data());
127 match key_array.to_data().nulls() {
128 Some(buffer) if key_array.to_data().null_count() > 0 => {
129 data = data
130 .nulls(Some(buffer.clone()))
131 .null_count(key_array.to_data().null_count());
132 }
133 _ => data = data.null_count(0),
134 }
135 let array_data = unsafe { data.build_unchecked() };
136
137 Self {
138 array: array_data.into(),
139 }
140 }
141
142 pub fn len(&self) -> usize {
143 self.array.keys().len()
144 }
145
146 pub fn is_empty(&self) -> bool {
147 self.array.keys().is_empty()
148 }
149
150 pub fn get(&self, index: usize) -> Option<ArrayRef> {
151 if index >= self.len() {
152 return None;
153 }
154
155 let compound_key = self.array.keys().value(index);
156 let (offset, length) = unpack(compound_key);
157 let array = self.array.values().slice(offset as usize, length as usize);
158
159 Some(array)
160 }
161
162 pub fn get_offset_length(&self, index: usize) -> Option<(usize, usize)> {
163 if index >= self.len() {
164 return None;
165 }
166
167 let compound_key = self.array.keys().value(index);
168 let (offset, length) = unpack(compound_key);
169
170 Some((offset as usize, length as usize))
171 }
172
173 pub fn into_dict(self) -> DictionaryArray<Int64Type> {
177 self.array
178 }
179
180 fn check_ranges<R>(value_len: usize, ranges: R) -> Result<()>
181 where
182 R: IntoIterator<Item = RangeTuple>,
183 {
184 for (offset, length) in ranges.into_iter() {
185 ensure!(
186 offset as usize + length as usize <= value_len,
187 IllegalRangeSnafu {
188 offset,
189 length,
190 len: value_len
191 }
192 );
193 }
194 Ok(())
195 }
196
197 pub fn convert_field(field: &Field) -> Field {
200 let value_type = Box::new(field.data_type().clone());
201 Field::new(
202 field.name(),
203 Self::convert_data_type(*value_type),
204 field.is_nullable(),
205 )
206 }
207
208 pub fn convert_data_type(value_type: DataType) -> DataType {
210 DataType::Dictionary(Box::new(Self::key_type()), Box::new(value_type))
211 }
212
213 pub fn values(&self) -> &ArrayRef {
214 self.array.values()
215 }
216
217 pub fn ranges(&self) -> impl Iterator<Item = Option<RangeTuple>> + '_ {
218 self.array
219 .keys()
220 .into_iter()
221 .map(|compound| compound.map(unpack))
222 }
223}
224
225impl std::fmt::Debug for RangeArray {
226 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
227 let ranges = self
228 .array
229 .keys()
230 .iter()
231 .map(|compound_key| {
232 compound_key.map(|key| {
233 let (offset, length) = unpack(key);
234 offset..(offset + length)
235 })
236 })
237 .collect::<Vec<_>>();
238 f.debug_struct("RangeArray")
239 .field("base array", self.array.values())
240 .field("ranges", &ranges)
241 .finish()
242 }
243}
244
245fn pack(offset: u32, length: u32) -> i64 {
248 bytemuck::cast::<[u32; 2], i64>([offset, length])
249}
250
251fn unpack(compound: i64) -> (u32, u32) {
252 let [offset, length] = bytemuck::cast::<i64, [u32; 2]>(compound);
253 (offset, length)
254}
255
256#[cfg(test)]
257mod test {
258 use std::fmt::Write;
259 use std::sync::Arc;
260
261 use datatypes::arrow::array::UInt64Array;
262
263 use super::*;
264
265 fn expand_format(range_array: &RangeArray) -> String {
266 let mut result = String::new();
267 for i in 0..range_array.len() {
268 writeln!(result, "{:?}", range_array.get(i)).unwrap();
269 }
270 result
271 }
272
273 #[test]
274 fn construct_from_ranges() {
275 let values_array = Arc::new(UInt64Array::from_iter([1, 2, 3, 4, 5, 6, 7, 8, 9]));
276 let ranges = [(0, 2), (0, 5), (1, 1), (3, 3), (8, 1), (9, 0)];
277
278 let range_array = RangeArray::from_ranges(values_array, ranges).unwrap();
279 assert_eq!(range_array.len(), 6);
280
281 let expected = String::from(
282 "Some(PrimitiveArray<UInt64>\
283 \n[\
284 \n 1,\
285 \n 2,\
286 \n])\
287 \nSome(PrimitiveArray<UInt64>\
288 \n[\
289 \n 1,\
290 \n 2,\
291 \n 3,\
292 \n 4,\
293 \n 5,\
294 \n])\
295 \nSome(PrimitiveArray<UInt64>\
296 \n[\
297 \n 2,\
298 \n])\
299 \
300 \nSome(PrimitiveArray<UInt64>\
301 \n[\
302 \n 4,\
303 \n 5,\
304 \n 6,\
305 \n])\
306 \nSome(PrimitiveArray<UInt64>\
307 \n[\
308 \n 9,\
309 \n])\
310 \nSome(PrimitiveArray<UInt64>\
311 \n[\
312 \n])\
313 \n",
314 );
315
316 let formatted = expand_format(&range_array);
317 assert_eq!(formatted, expected);
318 }
319
320 #[test]
321 fn illegal_range() {
322 let values_array = Arc::new(UInt64Array::from_iter([1, 2, 3, 4, 5, 6, 7, 8, 9]));
323 let ranges = [(9, 1)];
324 assert!(RangeArray::from_ranges(values_array, ranges).is_err());
325 }
326
327 #[test]
328 fn dict_array_round_trip() {
329 let values_array = Arc::new(UInt64Array::from_iter([1, 2, 3, 4, 5, 6, 7, 8, 9]));
330 let ranges = [(0, 4), (1, 4), (2, 4), (3, 4), (4, 4), (5, 4)];
331 let expected = String::from(
332 "Some(PrimitiveArray<UInt64>\
333 \n[\
334 \n 1,\
335 \n 2,\
336 \n 3,\
337 \n 4,\
338 \n])\
339 \nSome(PrimitiveArray<UInt64>\
340 \n[\
341 \n 2,\
342 \n 3,\
343 \n 4,\
344 \n 5,\
345 \n])\
346 \nSome(PrimitiveArray<UInt64>\
347 \n[\
348 \n 3,\
349 \n 4,\
350 \n 5,\
351 \n 6,\
352 \n])\
353 \nSome(PrimitiveArray<UInt64>\
354 \n[\
355 \n 4,\
356 \n 5,\
357 \n 6,\
358 \n 7,\
359 \n])\
360 \nSome(PrimitiveArray<UInt64>\
361 \n[\
362 \n 5,\
363 \n 6,\
364 \n 7,\
365 \n 8,\
366 \n])\
367 \nSome(PrimitiveArray<UInt64>\
368 \n[\
369 \n 6,\
370 \n 7,\
371 \n 8,\
372 \n 9,\
373 \n])\
374 \n",
375 );
376
377 let range_array = RangeArray::from_ranges(values_array, ranges).unwrap();
378 assert_eq!(range_array.len(), 6);
379 let formatted = expand_format(&range_array);
380 assert_eq!(formatted, expected);
381
382 let dict_array = range_array.into_dict();
384 let rounded_range_array = RangeArray::try_new(dict_array).unwrap();
385 let formatted = expand_format(&rounded_range_array);
386 assert_eq!(formatted, expected);
387 }
388
389 #[test]
390 fn empty_range_array() {
391 let values_array = Arc::new(UInt64Array::from_iter([1, 2, 3, 4, 5, 6, 7, 8, 9]));
392 let ranges = [];
393 let range_array = RangeArray::from_ranges(values_array, ranges).unwrap();
394 assert!(range_array.is_empty());
395 }
396}