promql/
range_array.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//!An extended "array" based on [DictionaryArray].
16
17use std::sync::Arc;
18
19use datafusion::arrow::buffer::NullBuffer;
20use datafusion::arrow::datatypes::Field;
21use datatypes::arrow::array::{Array, ArrayData, ArrayRef, DictionaryArray, Int64Array};
22use datatypes::arrow::datatypes::{DataType, Int64Type};
23use snafu::{ensure, OptionExt};
24
25use crate::error::{EmptyRangeSnafu, IllegalRangeSnafu, Result};
26
27pub type RangeTuple = (u32, u32);
28
29/// An compound logical "array" type. Represent serval ranges (slices) of one array.
30/// It's useful to use case like compute sliding window, or range selector from promql.
31///
32/// It's build on top of Arrow's [DictionaryArray]. [DictionaryArray] contains two
33/// sub-arrays, one for dictionary key and another for dictionary value. Both of them
34/// can be arbitrary types, but here the key array is fixed to u32 type.
35///
36/// ```text
37///             │ ┌─────┬─────┬─────┬─────┐
38///      Two    │ │ i64 │ i64 │ i64 │ i64 │      Keys
39///    Arrays   │ └─────┴─────┴─────┴─────┘     (Fixed to i64)
40///      In     │
41///   Arrow's   │ ┌────────────────────────────┐
42/// Dictionary  │ │                            │ Values
43///    Array    │ └────────────────────────────┘(Any Type)
44/// ```
45///
46/// Because the i64 key array is reinterpreted into two u32 for offset and length
47/// in [RangeArray] to represent a "range":
48///
49/// ```text
50/// 63            32│31             0
51/// ┌───────────────┼───────────────┐
52/// │  offset (u32) │  length (u32) │
53/// └───────────────┼───────────────┘
54/// ```
55///
56/// Then the [DictionaryArray] can be expanded to several ranges like this:
57///
58/// ```text
59/// Keys
60/// ┌───────┬───────┬───────┐      ┌───────┐
61/// │ (0,2) │ (1,2) │ (2,2) │ ─┐   │[A,B,C]│ values.slice(0,2)
62/// └───────┴───────┴───────┘  │   ├───────┤
63///  Values                    ├─► │[B,C,D]│ values.slice(1,2)
64/// ┌───┬───┬───┬───┬───┐      │   ├───────┤
65/// │ A │ B │ C │ D │ E │     ─┘   │[C,D,E]│ values.slice(2,2)
66/// └───┴───┴───┴───┴───┘          └───────┘
67/// ```
68pub struct RangeArray {
69    array: DictionaryArray<Int64Type>,
70}
71
72impl RangeArray {
73    pub const fn key_type() -> DataType {
74        DataType::Int64
75    }
76
77    pub fn value_type(&self) -> DataType {
78        self.array.value_type()
79    }
80
81    pub fn try_new(dict: DictionaryArray<Int64Type>) -> Result<Self> {
82        let ranges_iter = dict
83            .keys()
84            .iter()
85            .map(|compound_key| compound_key.map(unpack))
86            .collect::<Option<Vec<_>>>()
87            .context(EmptyRangeSnafu)?;
88        Self::check_ranges(dict.values().len(), ranges_iter)?;
89
90        Ok(Self { array: dict })
91    }
92
93    pub fn from_ranges<R>(values: ArrayRef, ranges: R) -> Result<Self>
94    where
95        R: IntoIterator<Item = RangeTuple> + Clone,
96    {
97        Self::check_ranges(values.len(), ranges.clone())?;
98
99        unsafe { Ok(Self::from_ranges_unchecked(values, ranges)) }
100    }
101
102    /// Construct [RangeArray] from given range without checking its validity.
103    ///
104    /// # Safety
105    ///
106    /// Caller should ensure the given range are valid. Otherwise use [`from_ranges`]
107    /// instead.
108    ///
109    /// [`from_ranges`]: crate::range_array::RangeArray#method.from_ranges
110    pub unsafe fn from_ranges_unchecked<R>(values: ArrayRef, ranges: R) -> Self
111    where
112        R: IntoIterator<Item = RangeTuple>,
113    {
114        let key_array = Int64Array::from_iter(
115            ranges
116                .into_iter()
117                .map(|(offset, length)| pack(offset, length)),
118        );
119
120        // Build from ArrayData to bypass the "offset" checker. Because
121        // we are not using "keys" as-is.
122        // This paragraph is copied from arrow-rs dictionary_array.rs `try_new()`.
123        let mut data = ArrayData::builder(DataType::Dictionary(
124            Box::new(Self::key_type()),
125            Box::new(values.data_type().clone()),
126        ))
127        .len(key_array.len())
128        .add_buffer(key_array.to_data().buffers()[0].clone())
129        .add_child_data(values.to_data());
130        match key_array.to_data().nulls() {
131            Some(buffer) if key_array.to_data().null_count() > 0 => {
132                data = data
133                    .nulls(Some(buffer.clone()))
134                    .null_count(key_array.to_data().null_count());
135            }
136            _ => data = data.null_count(0),
137        }
138        let array_data = unsafe { data.build_unchecked() };
139
140        Self {
141            array: array_data.into(),
142        }
143    }
144
145    pub fn len(&self) -> usize {
146        self.array.keys().len()
147    }
148
149    pub fn is_empty(&self) -> bool {
150        self.array.keys().is_empty()
151    }
152
153    pub fn get(&self, index: usize) -> Option<ArrayRef> {
154        if index >= self.len() {
155            return None;
156        }
157
158        let compound_key = self.array.keys().value(index);
159        let (offset, length) = unpack(compound_key);
160        let array = self.array.values().slice(offset as usize, length as usize);
161
162        Some(array)
163    }
164
165    pub fn get_offset_length(&self, index: usize) -> Option<(usize, usize)> {
166        if index >= self.len() {
167            return None;
168        }
169
170        let compound_key = self.array.keys().value(index);
171        let (offset, length) = unpack(compound_key);
172
173        Some((offset as usize, length as usize))
174    }
175
176    /// Return the underlying Arrow's [DictionaryArray]. Notes the dictionary array might be
177    /// invalid from Arrow's definition. Be care if try to access this dictionary through
178    /// Arrow's API.
179    pub fn into_dict(self) -> DictionaryArray<Int64Type> {
180        self.array
181    }
182
183    fn check_ranges<R>(value_len: usize, ranges: R) -> Result<()>
184    where
185        R: IntoIterator<Item = RangeTuple>,
186    {
187        for (offset, length) in ranges.into_iter() {
188            ensure!(
189                offset as usize + length as usize <= value_len,
190                IllegalRangeSnafu {
191                    offset,
192                    length,
193                    len: value_len
194                }
195            );
196        }
197        Ok(())
198    }
199
200    /// Change the field's datatype to the type after processed by [RangeArray].
201    /// Like `Utf8` will become `Dictionary<Int64, Utf8>`.
202    pub fn convert_field(field: &Field) -> Field {
203        let value_type = Box::new(field.data_type().clone());
204        Field::new(
205            field.name(),
206            Self::convert_data_type(*value_type),
207            field.is_nullable(),
208        )
209    }
210
211    /// Build datatype of wrapped [RangeArray] on given value type.
212    pub fn convert_data_type(value_type: DataType) -> DataType {
213        DataType::Dictionary(Box::new(Self::key_type()), Box::new(value_type))
214    }
215
216    pub fn values(&self) -> &ArrayRef {
217        self.array.values()
218    }
219
220    pub fn ranges(&self) -> impl Iterator<Item = Option<RangeTuple>> + '_ {
221        self.array
222            .keys()
223            .into_iter()
224            .map(|compound| compound.map(unpack))
225    }
226}
227
228impl Array for RangeArray {
229    fn as_any(&self) -> &dyn std::any::Any {
230        self
231    }
232
233    fn into_data(self) -> ArrayData {
234        self.array.into_data()
235    }
236
237    fn to_data(&self) -> ArrayData {
238        self.array.to_data()
239    }
240
241    fn slice(&self, offset: usize, length: usize) -> ArrayRef {
242        Arc::new(self.array.slice(offset, length))
243    }
244
245    fn nulls(&self) -> Option<&NullBuffer> {
246        self.array.nulls()
247    }
248
249    fn data_type(&self) -> &DataType {
250        self.array.data_type()
251    }
252
253    fn len(&self) -> usize {
254        self.len()
255    }
256
257    fn is_empty(&self) -> bool {
258        self.is_empty()
259    }
260
261    fn offset(&self) -> usize {
262        self.array.offset()
263    }
264
265    fn get_buffer_memory_size(&self) -> usize {
266        self.array.get_buffer_memory_size()
267    }
268
269    fn get_array_memory_size(&self) -> usize {
270        self.array.get_array_memory_size()
271    }
272}
273
274impl std::fmt::Debug for RangeArray {
275    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276        let ranges = self
277            .array
278            .keys()
279            .iter()
280            .map(|compound_key| {
281                compound_key.map(|key| {
282                    let (offset, length) = unpack(key);
283                    offset..(offset + length)
284                })
285            })
286            .collect::<Vec<_>>();
287        f.debug_struct("RangeArray")
288            .field("base array", self.array.values())
289            .field("ranges", &ranges)
290            .finish()
291    }
292}
293
294// util functions
295
296fn pack(offset: u32, length: u32) -> i64 {
297    bytemuck::cast::<[u32; 2], i64>([offset, length])
298}
299
300fn unpack(compound: i64) -> (u32, u32) {
301    let [offset, length] = bytemuck::cast::<i64, [u32; 2]>(compound);
302    (offset, length)
303}
304
305#[cfg(test)]
306mod test {
307    use std::fmt::Write;
308    use std::sync::Arc;
309
310    use datatypes::arrow::array::UInt64Array;
311
312    use super::*;
313
314    fn expand_format(range_array: &RangeArray) -> String {
315        let mut result = String::new();
316        for i in 0..range_array.len() {
317            writeln!(result, "{:?}", range_array.get(i)).unwrap();
318        }
319        result
320    }
321
322    #[test]
323    fn construct_from_ranges() {
324        let values_array = Arc::new(UInt64Array::from_iter([1, 2, 3, 4, 5, 6, 7, 8, 9]));
325        let ranges = [(0, 2), (0, 5), (1, 1), (3, 3), (8, 1), (9, 0)];
326
327        let range_array = RangeArray::from_ranges(values_array, ranges).unwrap();
328        assert_eq!(range_array.len(), 6);
329
330        let expected = String::from(
331            "Some(PrimitiveArray<UInt64>\
332            \n[\
333            \n  1,\
334            \n  2,\
335            \n])\
336            \nSome(PrimitiveArray<UInt64>\
337            \n[\
338            \n  1,\
339            \n  2,\
340            \n  3,\
341            \n  4,\
342            \n  5,\
343            \n])\
344            \nSome(PrimitiveArray<UInt64>\
345            \n[\
346            \n  2,\
347            \n])\
348            \
349            \nSome(PrimitiveArray<UInt64>\
350            \n[\
351            \n  4,\
352            \n  5,\
353            \n  6,\
354            \n])\
355            \nSome(PrimitiveArray<UInt64>\
356            \n[\
357            \n  9,\
358            \n])\
359            \nSome(PrimitiveArray<UInt64>\
360            \n[\
361            \n])\
362            \n",
363        );
364
365        let formatted = expand_format(&range_array);
366        assert_eq!(formatted, expected);
367    }
368
369    #[test]
370    fn illegal_range() {
371        let values_array = Arc::new(UInt64Array::from_iter([1, 2, 3, 4, 5, 6, 7, 8, 9]));
372        let ranges = [(9, 1)];
373        assert!(RangeArray::from_ranges(values_array, ranges).is_err());
374    }
375
376    #[test]
377    fn dict_array_round_trip() {
378        let values_array = Arc::new(UInt64Array::from_iter([1, 2, 3, 4, 5, 6, 7, 8, 9]));
379        let ranges = [(0, 4), (1, 4), (2, 4), (3, 4), (4, 4), (5, 4)];
380        let expected = String::from(
381            "Some(PrimitiveArray<UInt64>\
382            \n[\
383            \n  1,\
384            \n  2,\
385            \n  3,\
386            \n  4,\
387            \n])\
388            \nSome(PrimitiveArray<UInt64>\
389            \n[\
390            \n  2,\
391            \n  3,\
392            \n  4,\
393            \n  5,\
394            \n])\
395            \nSome(PrimitiveArray<UInt64>\
396            \n[\
397            \n  3,\
398            \n  4,\
399            \n  5,\
400            \n  6,\
401            \n])\
402            \nSome(PrimitiveArray<UInt64>\
403            \n[\
404            \n  4,\
405            \n  5,\
406            \n  6,\
407            \n  7,\
408            \n])\
409            \nSome(PrimitiveArray<UInt64>\
410            \n[\
411            \n  5,\
412            \n  6,\
413            \n  7,\
414            \n  8,\
415            \n])\
416            \nSome(PrimitiveArray<UInt64>\
417            \n[\
418            \n  6,\
419            \n  7,\
420            \n  8,\
421            \n  9,\
422            \n])\
423            \n",
424        );
425
426        let range_array = RangeArray::from_ranges(values_array, ranges).unwrap();
427        assert_eq!(range_array.len(), 6);
428        let formatted = expand_format(&range_array);
429        assert_eq!(formatted, expected);
430
431        // this dict array is invalid from Arrow's definition
432        let dict_array = range_array.into_dict();
433        let rounded_range_array = RangeArray::try_new(dict_array).unwrap();
434        let formatted = expand_format(&rounded_range_array);
435        assert_eq!(formatted, expected);
436    }
437
438    #[test]
439    fn empty_range_array() {
440        let values_array = Arc::new(UInt64Array::from_iter([1, 2, 3, 4, 5, 6, 7, 8, 9]));
441        let ranges = [];
442        let range_array = RangeArray::from_ranges(values_array, ranges).unwrap();
443        assert!(range_array.is_empty());
444    }
445}