promql/
range_array.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//!An extended "array" based on [DictionaryArray].
16
17use datafusion::arrow::datatypes::Field;
18use datatypes::arrow::array::{Array, ArrayData, ArrayRef, DictionaryArray, Int64Array};
19use datatypes::arrow::datatypes::{DataType, Int64Type};
20use snafu::{OptionExt, ensure};
21
22use crate::error::{EmptyRangeSnafu, IllegalRangeSnafu, Result};
23
24pub type RangeTuple = (u32, u32);
25
26/// An compound logical "array" type. Represent serval ranges (slices) of one array.
27/// It's useful to use case like compute sliding window, or range selector from promql.
28///
29/// It's build on top of Arrow's [DictionaryArray]. [DictionaryArray] contains two
30/// sub-arrays, one for dictionary key and another for dictionary value. Both of them
31/// can be arbitrary types, but here the key array is fixed to u32 type.
32///
33/// ```text
34///             │ ┌─────┬─────┬─────┬─────┐
35///      Two    │ │ i64 │ i64 │ i64 │ i64 │      Keys
36///    Arrays   │ └─────┴─────┴─────┴─────┘     (Fixed to i64)
37///      In     │
38///   Arrow's   │ ┌────────────────────────────┐
39/// Dictionary  │ │                            │ Values
40///    Array    │ └────────────────────────────┘(Any Type)
41/// ```
42///
43/// Because the i64 key array is reinterpreted into two u32 for offset and length
44/// in [RangeArray] to represent a "range":
45///
46/// ```text
47/// 63            32│31             0
48/// ┌───────────────┼───────────────┐
49/// │  offset (u32) │  length (u32) │
50/// └───────────────┼───────────────┘
51/// ```
52///
53/// Then the [DictionaryArray] can be expanded to several ranges like this:
54///
55/// ```text
56/// Keys
57/// ┌───────┬───────┬───────┐      ┌───────┐
58/// │ (0,2) │ (1,2) │ (2,2) │ ─┐   │[A,B,C]│ values.slice(0,2)
59/// └───────┴───────┴───────┘  │   ├───────┤
60///  Values                    ├─► │[B,C,D]│ values.slice(1,2)
61/// ┌───┬───┬───┬───┬───┐      │   ├───────┤
62/// │ A │ B │ C │ D │ E │     ─┘   │[C,D,E]│ values.slice(2,2)
63/// └───┴───┴───┴───┴───┘          └───────┘
64/// ```
65pub struct RangeArray {
66    array: DictionaryArray<Int64Type>,
67}
68
69impl RangeArray {
70    pub const fn key_type() -> DataType {
71        DataType::Int64
72    }
73
74    pub fn value_type(&self) -> DataType {
75        self.array.value_type()
76    }
77
78    pub fn try_new(dict: DictionaryArray<Int64Type>) -> Result<Self> {
79        let ranges_iter = dict
80            .keys()
81            .iter()
82            .map(|compound_key| compound_key.map(unpack))
83            .collect::<Option<Vec<_>>>()
84            .context(EmptyRangeSnafu)?;
85        Self::check_ranges(dict.values().len(), ranges_iter)?;
86
87        Ok(Self { array: dict })
88    }
89
90    pub fn from_ranges<R>(values: ArrayRef, ranges: R) -> Result<Self>
91    where
92        R: IntoIterator<Item = RangeTuple> + Clone,
93    {
94        Self::check_ranges(values.len(), ranges.clone())?;
95
96        unsafe { Ok(Self::from_ranges_unchecked(values, ranges)) }
97    }
98
99    /// Construct [RangeArray] from given range without checking its validity.
100    ///
101    /// # Safety
102    ///
103    /// Caller should ensure the given range are valid. Otherwise use [`from_ranges`]
104    /// instead.
105    ///
106    /// [`from_ranges`]: crate::range_array::RangeArray#method.from_ranges
107    pub unsafe fn from_ranges_unchecked<R>(values: ArrayRef, ranges: R) -> Self
108    where
109        R: IntoIterator<Item = RangeTuple>,
110    {
111        let key_array = Int64Array::from_iter(
112            ranges
113                .into_iter()
114                .map(|(offset, length)| pack(offset, length)),
115        );
116
117        // Build from ArrayData to bypass the "offset" checker. Because
118        // we are not using "keys" as-is.
119        // This paragraph is copied from arrow-rs dictionary_array.rs `try_new()`.
120        let mut data = ArrayData::builder(DataType::Dictionary(
121            Box::new(Self::key_type()),
122            Box::new(values.data_type().clone()),
123        ))
124        .len(key_array.len())
125        .add_buffer(key_array.to_data().buffers()[0].clone())
126        .add_child_data(values.to_data());
127        match key_array.to_data().nulls() {
128            Some(buffer) if key_array.to_data().null_count() > 0 => {
129                data = data
130                    .nulls(Some(buffer.clone()))
131                    .null_count(key_array.to_data().null_count());
132            }
133            _ => data = data.null_count(0),
134        }
135        let array_data = unsafe { data.build_unchecked() };
136
137        Self {
138            array: array_data.into(),
139        }
140    }
141
142    pub fn len(&self) -> usize {
143        self.array.keys().len()
144    }
145
146    pub fn is_empty(&self) -> bool {
147        self.array.keys().is_empty()
148    }
149
150    pub fn get(&self, index: usize) -> Option<ArrayRef> {
151        if index >= self.len() {
152            return None;
153        }
154
155        let compound_key = self.array.keys().value(index);
156        let (offset, length) = unpack(compound_key);
157        let array = self.array.values().slice(offset as usize, length as usize);
158
159        Some(array)
160    }
161
162    pub fn get_offset_length(&self, index: usize) -> Option<(usize, usize)> {
163        if index >= self.len() {
164            return None;
165        }
166
167        let compound_key = self.array.keys().value(index);
168        let (offset, length) = unpack(compound_key);
169
170        Some((offset as usize, length as usize))
171    }
172
173    /// Return the underlying Arrow's [DictionaryArray]. Notes the dictionary array might be
174    /// invalid from Arrow's definition. Be care if try to access this dictionary through
175    /// Arrow's API.
176    pub fn into_dict(self) -> DictionaryArray<Int64Type> {
177        self.array
178    }
179
180    fn check_ranges<R>(value_len: usize, ranges: R) -> Result<()>
181    where
182        R: IntoIterator<Item = RangeTuple>,
183    {
184        for (offset, length) in ranges.into_iter() {
185            ensure!(
186                offset as usize + length as usize <= value_len,
187                IllegalRangeSnafu {
188                    offset,
189                    length,
190                    len: value_len
191                }
192            );
193        }
194        Ok(())
195    }
196
197    /// Change the field's datatype to the type after processed by [RangeArray].
198    /// Like `Utf8` will become `Dictionary<Int64, Utf8>`.
199    pub fn convert_field(field: &Field) -> Field {
200        let value_type = Box::new(field.data_type().clone());
201        Field::new(
202            field.name(),
203            Self::convert_data_type(*value_type),
204            field.is_nullable(),
205        )
206    }
207
208    /// Build datatype of wrapped [RangeArray] on given value type.
209    pub fn convert_data_type(value_type: DataType) -> DataType {
210        DataType::Dictionary(Box::new(Self::key_type()), Box::new(value_type))
211    }
212
213    pub fn values(&self) -> &ArrayRef {
214        self.array.values()
215    }
216
217    pub fn ranges(&self) -> impl Iterator<Item = Option<RangeTuple>> + '_ {
218        self.array
219            .keys()
220            .into_iter()
221            .map(|compound| compound.map(unpack))
222    }
223}
224
225impl std::fmt::Debug for RangeArray {
226    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
227        let ranges = self
228            .array
229            .keys()
230            .iter()
231            .map(|compound_key| {
232                compound_key.map(|key| {
233                    let (offset, length) = unpack(key);
234                    offset..(offset + length)
235                })
236            })
237            .collect::<Vec<_>>();
238        f.debug_struct("RangeArray")
239            .field("base array", self.array.values())
240            .field("ranges", &ranges)
241            .finish()
242    }
243}
244
245// util functions
246
247fn pack(offset: u32, length: u32) -> i64 {
248    bytemuck::cast::<[u32; 2], i64>([offset, length])
249}
250
251fn unpack(compound: i64) -> (u32, u32) {
252    let [offset, length] = bytemuck::cast::<i64, [u32; 2]>(compound);
253    (offset, length)
254}
255
256#[cfg(test)]
257mod test {
258    use std::fmt::Write;
259    use std::sync::Arc;
260
261    use datatypes::arrow::array::UInt64Array;
262
263    use super::*;
264
265    fn expand_format(range_array: &RangeArray) -> String {
266        let mut result = String::new();
267        for i in 0..range_array.len() {
268            writeln!(result, "{:?}", range_array.get(i)).unwrap();
269        }
270        result
271    }
272
273    #[test]
274    fn construct_from_ranges() {
275        let values_array = Arc::new(UInt64Array::from_iter([1, 2, 3, 4, 5, 6, 7, 8, 9]));
276        let ranges = [(0, 2), (0, 5), (1, 1), (3, 3), (8, 1), (9, 0)];
277
278        let range_array = RangeArray::from_ranges(values_array, ranges).unwrap();
279        assert_eq!(range_array.len(), 6);
280
281        let expected = String::from(
282            "Some(PrimitiveArray<UInt64>\
283            \n[\
284            \n  1,\
285            \n  2,\
286            \n])\
287            \nSome(PrimitiveArray<UInt64>\
288            \n[\
289            \n  1,\
290            \n  2,\
291            \n  3,\
292            \n  4,\
293            \n  5,\
294            \n])\
295            \nSome(PrimitiveArray<UInt64>\
296            \n[\
297            \n  2,\
298            \n])\
299            \
300            \nSome(PrimitiveArray<UInt64>\
301            \n[\
302            \n  4,\
303            \n  5,\
304            \n  6,\
305            \n])\
306            \nSome(PrimitiveArray<UInt64>\
307            \n[\
308            \n  9,\
309            \n])\
310            \nSome(PrimitiveArray<UInt64>\
311            \n[\
312            \n])\
313            \n",
314        );
315
316        let formatted = expand_format(&range_array);
317        assert_eq!(formatted, expected);
318    }
319
320    #[test]
321    fn illegal_range() {
322        let values_array = Arc::new(UInt64Array::from_iter([1, 2, 3, 4, 5, 6, 7, 8, 9]));
323        let ranges = [(9, 1)];
324        assert!(RangeArray::from_ranges(values_array, ranges).is_err());
325    }
326
327    #[test]
328    fn dict_array_round_trip() {
329        let values_array = Arc::new(UInt64Array::from_iter([1, 2, 3, 4, 5, 6, 7, 8, 9]));
330        let ranges = [(0, 4), (1, 4), (2, 4), (3, 4), (4, 4), (5, 4)];
331        let expected = String::from(
332            "Some(PrimitiveArray<UInt64>\
333            \n[\
334            \n  1,\
335            \n  2,\
336            \n  3,\
337            \n  4,\
338            \n])\
339            \nSome(PrimitiveArray<UInt64>\
340            \n[\
341            \n  2,\
342            \n  3,\
343            \n  4,\
344            \n  5,\
345            \n])\
346            \nSome(PrimitiveArray<UInt64>\
347            \n[\
348            \n  3,\
349            \n  4,\
350            \n  5,\
351            \n  6,\
352            \n])\
353            \nSome(PrimitiveArray<UInt64>\
354            \n[\
355            \n  4,\
356            \n  5,\
357            \n  6,\
358            \n  7,\
359            \n])\
360            \nSome(PrimitiveArray<UInt64>\
361            \n[\
362            \n  5,\
363            \n  6,\
364            \n  7,\
365            \n  8,\
366            \n])\
367            \nSome(PrimitiveArray<UInt64>\
368            \n[\
369            \n  6,\
370            \n  7,\
371            \n  8,\
372            \n  9,\
373            \n])\
374            \n",
375        );
376
377        let range_array = RangeArray::from_ranges(values_array, ranges).unwrap();
378        assert_eq!(range_array.len(), 6);
379        let formatted = expand_format(&range_array);
380        assert_eq!(formatted, expected);
381
382        // this dict array is invalid from Arrow's definition
383        let dict_array = range_array.into_dict();
384        let rounded_range_array = RangeArray::try_new(dict_array).unwrap();
385        let formatted = expand_format(&rounded_range_array);
386        assert_eq!(formatted, expected);
387    }
388
389    #[test]
390    fn empty_range_array() {
391        let values_array = Arc::new(UInt64Array::from_iter([1, 2, 3, 4, 5, 6, 7, 8, 9]));
392        let ranges = [];
393        let range_array = RangeArray::from_ranges(values_array, ranges).unwrap();
394        assert!(range_array.is_empty());
395    }
396}