common_time/
range.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::{Debug, Display, Formatter};
16
17use serde::{Deserialize, Serialize};
18
19use crate::timestamp::TimeUnit;
20use crate::timestamp_millis::TimestampMillis;
21use crate::Timestamp;
22
23/// A half-open time range.
24///
25/// The range contains values that `value >= start` and `val < end`.
26///
27/// The range is empty iff `start == end == "the default value of T"`
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
29pub struct GenericRange<T> {
30    start: Option<T>,
31    end: Option<T>,
32}
33
34impl<T> GenericRange<T>
35where
36    T: Copy + PartialOrd + Default,
37{
38    /// Computes the AND'ed range with other.  
39    pub fn and(&self, other: &GenericRange<T>) -> GenericRange<T> {
40        let start = match (self.start(), other.start()) {
41            (Some(l), Some(r)) => {
42                if l > r {
43                    Some(*l)
44                } else {
45                    Some(*r)
46                }
47            }
48            (Some(l), None) => Some(*l),
49            (None, Some(r)) => Some(*r),
50            (None, None) => None,
51        };
52
53        let end = match (self.end(), other.end()) {
54            (Some(l), Some(r)) => {
55                if l > r {
56                    Some(*r)
57                } else {
58                    Some(*l)
59                }
60            }
61            (Some(l), None) => Some(*l),
62            (None, Some(r)) => Some(*r),
63            (None, None) => None,
64        };
65
66        Self::from_optional(start, end)
67    }
68
69    /// Compute the OR'ed range of two ranges.
70    /// Notice: this method does not compute the exact OR'ed operation for simplicity.
71    /// For example, `[1, 2)` or'ed with `[4, 5)` will produce `[1, 5)`
72    /// instead of `[1, 2) ∪ [4, 5)`
73    pub fn or(&self, other: &GenericRange<T>) -> GenericRange<T> {
74        if self.is_empty() {
75            return *other;
76        }
77
78        if other.is_empty() {
79            return *self;
80        }
81        let start = match (self.start(), other.start()) {
82            (Some(l), Some(r)) => {
83                if l > r {
84                    Some(*r)
85                } else {
86                    Some(*l)
87                }
88            }
89            (Some(_), None) => None,
90            (None, Some(_)) => None,
91            (None, None) => None,
92        };
93
94        let end = match (self.end(), other.end()) {
95            (Some(l), Some(r)) => {
96                if l > r {
97                    Some(*l)
98                } else {
99                    Some(*r)
100                }
101            }
102            (Some(_), None) => None,
103            (None, Some(_)) => None,
104            (None, None) => None,
105        };
106
107        Self::from_optional(start, end)
108    }
109
110    /// Checks if current range intersect with target.
111    pub fn intersects(&self, target: &GenericRange<T>) -> bool {
112        !self.and(target).is_empty()
113    }
114
115    /// Create an empty range.
116    pub fn empty() -> GenericRange<T> {
117        GenericRange {
118            start: Some(T::default()),
119            end: Some(T::default()),
120        }
121    }
122
123    /// Create GenericRange from optional start and end.
124    /// If the present value of start >= the present value of end, it will return an empty range
125    /// with the default value of `T`.
126    fn from_optional(start: Option<T>, end: Option<T>) -> GenericRange<T> {
127        match (start, end) {
128            (Some(start_val), Some(end_val)) => {
129                if start_val < end_val {
130                    Self {
131                        start: Some(start_val),
132                        end: Some(end_val),
133                    }
134                } else {
135                    Self::empty()
136                }
137            }
138            (s, e) => Self { start: s, end: e },
139        }
140    }
141}
142
143impl<T> GenericRange<T> {
144    /// Creates a new range that contains values in `[start, end)`.
145    ///
146    /// Returns `None` if `start` > `end`.
147    pub fn new<U: PartialOrd + Into<T>>(start: U, end: U) -> Option<GenericRange<T>> {
148        if start <= end {
149            let (start, end) = (Some(start.into()), Some(end.into()));
150            Some(GenericRange { start, end })
151        } else {
152            None
153        }
154    }
155
156    /// Return a range containing all possible values.
157    pub fn min_to_max() -> GenericRange<T> {
158        Self {
159            start: None,
160            end: None,
161        }
162    }
163
164    /// Returns the lower bound of the range (inclusive).
165    #[inline]
166    pub fn start(&self) -> &Option<T> {
167        &self.start
168    }
169
170    /// Returns the upper bound of the range (exclusive).
171    #[inline]
172    pub fn end(&self) -> &Option<T> {
173        &self.end
174    }
175
176    /// Returns true if `timestamp` is contained in the range.
177    pub fn contains<U: PartialOrd<T>>(&self, target: &U) -> bool {
178        match (&self.start, &self.end) {
179            (Some(start), Some(end)) => *target >= *start && *target < *end,
180            (Some(start), None) => *target >= *start,
181            (None, Some(end)) => *target < *end,
182            (None, None) => true,
183        }
184    }
185}
186
187impl<T: PartialOrd> GenericRange<T> {
188    /// Returns true if the range contains no timestamps.
189    #[inline]
190    pub fn is_empty(&self) -> bool {
191        match (&self.start, &self.end) {
192            (Some(start), Some(end)) => start == end,
193            _ => false,
194        }
195    }
196}
197
198pub type TimestampRange = GenericRange<Timestamp>;
199
200impl Display for TimestampRange {
201    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
202        let s = match (&self.start, &self.end) {
203            (Some(start), Some(end)) => {
204                format!(
205                    "TimestampRange{{[{}{},{}{})}}",
206                    start.value(),
207                    start.unit().short_name(),
208                    end.value(),
209                    end.unit().short_name()
210                )
211            }
212            (Some(start), None) => {
213                format!(
214                    "TimestampRange{{[{}{},#)}}",
215                    start.value(),
216                    start.unit().short_name()
217                )
218            }
219            (None, Some(end)) => {
220                format!(
221                    "TimestampRange{{[#,{}{})}}",
222                    end.value(),
223                    end.unit().short_name()
224                )
225            }
226            (None, None) => "TimestampRange{{[#,#)}}".to_string(),
227        };
228        f.write_str(&s)
229    }
230}
231
232impl TimestampRange {
233    /// Create a TimestampRange with optional inclusive end timestamp.
234    /// If end timestamp is present and is less than start timestamp, this method will return
235    /// an empty range.
236    /// ### Caveat
237    /// If the given end timestamp's value is `i64::MAX`, which will result into overflow when added
238    /// by 1(the end is inclusive), this method does not try to convert the time unit of end
239    /// timestamp, instead it just return `[start, INF)`. This exaggerates the range but does not
240    /// affect correctness.  
241    pub fn new_inclusive(start: Option<Timestamp>, end: Option<Timestamp>) -> Self {
242        // check for emptiness
243        if let (Some(start_ts), Some(end_ts)) = (start, end) {
244            if start_ts > end_ts {
245                return Self::empty();
246            }
247        }
248
249        let end = if let Some(end) = end {
250            end.value()
251                .checked_add(1)
252                .map(|v| Timestamp::new(v, end.unit()))
253        } else {
254            None
255        };
256        Self::from_optional(start, end)
257    }
258
259    /// Shortcut method to create a timestamp range with given start/end value and time unit.
260    /// Returns empty iff `start` > `end`.
261    pub fn with_unit(start: i64, end: i64, unit: TimeUnit) -> Option<Self> {
262        let start = Timestamp::new(start, unit);
263        let end = Timestamp::new(end, unit);
264        Self::new(start, end)
265    }
266
267    /// Create a range that containing only given `ts`.
268    /// ### Notice:
269    /// Left-close right-open range cannot properly represent range with a single value.
270    /// For simplicity, this implementation returns an approximate range `[ts, ts+1)` instead.
271    pub fn single(ts: Timestamp) -> Self {
272        let unit = ts.unit();
273        let start = Some(ts);
274        let end = ts.value().checked_add(1).map(|v| Timestamp::new(v, unit));
275
276        Self::from_optional(start, end)
277    }
278
279    /// Create a range `[start, INF)`.
280    /// ### Notice
281    /// Left-close right-open range cannot properly represent range with exclusive start like: `(start, ...)`.
282    /// You may resort to `[start-1, ...)` instead.
283    pub fn from_start(start: Timestamp) -> Self {
284        Self {
285            start: Some(start),
286            end: None,
287        }
288    }
289
290    /// Create a range `[-INF, end)`.
291    /// ### Notice
292    /// Left-close right-open range cannot properly represent range with inclusive end like: `[..., END]`.
293    /// If `inclusive` is true, this method returns `[-INF, end+1)` instead.
294    pub fn until_end(end: Timestamp, inclusive: bool) -> Self {
295        let end = if inclusive {
296            end.value()
297                .checked_add(1)
298                .map(|v| Timestamp::new(v, end.unit()))
299        } else {
300            Some(end)
301        };
302        Self { start: None, end }
303    }
304}
305
306/// Time range in milliseconds.
307pub type RangeMillis = GenericRange<TimestampMillis>;
308
309#[cfg(test)]
310mod tests {
311
312    use super::*;
313
314    #[test]
315    fn test_new_range() {
316        let (start, end) = (TimestampMillis::new(0), TimestampMillis::new(100));
317        let range = RangeMillis::new(start, end).unwrap();
318
319        assert_eq!(Some(start), *range.start());
320        assert_eq!(Some(end), *range.end());
321
322        let range2 = RangeMillis::new(0, 100).unwrap();
323        assert_eq!(range, range2);
324
325        let range_eq = RangeMillis::new(123, 123).unwrap();
326        assert_eq!(range_eq.start(), range_eq.end());
327
328        assert_eq!(None, RangeMillis::new(1, 0));
329
330        let range = RangeMillis::empty();
331        assert_eq!(range.start(), range.end());
332        assert_eq!(Some(TimestampMillis::new(0)), *range.start());
333    }
334
335    #[test]
336    fn test_timestamp_range_new_inclusive() {
337        let range = TimestampRange::new_inclusive(
338            Some(Timestamp::new(i64::MAX - 1, TimeUnit::Second)),
339            Some(Timestamp::new(i64::MAX, TimeUnit::Millisecond)),
340        );
341        assert!(range.is_empty());
342
343        let range = TimestampRange::new_inclusive(
344            Some(Timestamp::new(1, TimeUnit::Second)),
345            Some(Timestamp::new(1, TimeUnit::Millisecond)),
346        );
347        assert!(range.is_empty());
348
349        let range = TimestampRange::new_inclusive(
350            Some(Timestamp::new(1, TimeUnit::Second)),
351            Some(Timestamp::new(i64::MAX, TimeUnit::Millisecond)),
352        );
353        assert!(range.end.is_none());
354    }
355
356    #[test]
357    fn test_range_contains() {
358        let range = RangeMillis::new(-10, 10).unwrap();
359        assert!(!range.is_empty());
360        assert!(range.contains(&-10));
361        assert!(range.contains(&0));
362        assert!(range.contains(&9));
363        assert!(!range.contains(&10));
364
365        let range = RangeMillis::new(i64::MIN, i64::MAX).unwrap();
366        assert!(!range.is_empty());
367        assert!(range.contains(&TimestampMillis::MIN));
368        assert!(range.contains(&TimestampMillis::MAX));
369        assert!(!range.contains(&TimestampMillis::INF));
370
371        let range = RangeMillis::new(0, 0).unwrap();
372        assert!(range.is_empty());
373        assert!(!range.contains(&0));
374    }
375
376    #[test]
377    fn test_range_with_diff_unit() {
378        let r1 = TimestampRange::with_unit(1, 2, TimeUnit::Second).unwrap();
379        let r2 = TimestampRange::with_unit(1000, 2000, TimeUnit::Millisecond).unwrap();
380        assert_eq!(r2, r1);
381
382        let r3 = TimestampRange::with_unit(0, 0, TimeUnit::Second).unwrap();
383        let r4 = TimestampRange::with_unit(0, 0, TimeUnit::Millisecond).unwrap();
384        assert_eq!(r3, r4);
385
386        let r5 = TimestampRange::with_unit(0, 0, TimeUnit::Millisecond).unwrap();
387        let r6 = TimestampRange::with_unit(0, 0, TimeUnit::Microsecond).unwrap();
388        assert_eq!(r5, r6);
389
390        let r5 = TimestampRange::with_unit(-1, 1, TimeUnit::Second).unwrap();
391        let r6 = TimestampRange::with_unit(-1000, 1000, TimeUnit::Millisecond).unwrap();
392        assert_eq!(r5, r6);
393    }
394
395    #[test]
396    fn test_range_and() {
397        assert_eq!(
398            TimestampRange::with_unit(5, 10, TimeUnit::Millisecond).unwrap(),
399            TimestampRange::with_unit(1, 10, TimeUnit::Millisecond)
400                .unwrap()
401                .and(&TimestampRange::with_unit(5, 20, TimeUnit::Millisecond).unwrap())
402        );
403
404        let empty = TimestampRange::with_unit(0, 10, TimeUnit::Millisecond)
405            .unwrap()
406            .and(&TimestampRange::with_unit(11, 20, TimeUnit::Millisecond).unwrap());
407        assert!(empty.is_empty());
408
409        // whatever AND'ed with empty shall return empty
410        let empty_and_all = empty.and(&TimestampRange::min_to_max());
411        assert!(empty_and_all.is_empty());
412        assert!(empty.and(&empty).is_empty());
413        assert!(empty
414            .and(&TimestampRange::with_unit(0, 10, TimeUnit::Millisecond).unwrap())
415            .is_empty());
416
417        // AND TimestampRange with different unit
418        let anded = TimestampRange::with_unit(0, 10, TimeUnit::Millisecond)
419            .unwrap()
420            .and(&TimestampRange::with_unit(1000, 12000, TimeUnit::Microsecond).unwrap());
421        assert_eq!(
422            TimestampRange::with_unit(1, 10, TimeUnit::Millisecond).unwrap(),
423            anded
424        );
425
426        let anded = TimestampRange::with_unit(0, 10, TimeUnit::Millisecond)
427            .unwrap()
428            .and(&TimestampRange::with_unit(1000, 12000, TimeUnit::Microsecond).unwrap());
429        assert_eq!(
430            TimestampRange::with_unit(1, 10, TimeUnit::Millisecond).unwrap(),
431            anded
432        );
433    }
434
435    #[test]
436    fn test_range_or() {
437        assert_eq!(
438            TimestampRange::with_unit(1, 20, TimeUnit::Millisecond).unwrap(),
439            TimestampRange::with_unit(1, 10, TimeUnit::Millisecond)
440                .unwrap()
441                .or(&TimestampRange::with_unit(5, 20, TimeUnit::Millisecond).unwrap())
442        );
443
444        assert_eq!(
445            TimestampRange::min_to_max(),
446            TimestampRange::min_to_max().or(&TimestampRange::min_to_max())
447        );
448
449        let empty = TimestampRange::empty().or(&TimestampRange::empty());
450        assert!(empty.is_empty());
451
452        let t1 = TimestampRange::with_unit(-10, 0, TimeUnit::Second).unwrap();
453        let t2 = TimestampRange::with_unit(-30, -20, TimeUnit::Second).unwrap();
454        assert_eq!(
455            TimestampRange::with_unit(-30, 0, TimeUnit::Second).unwrap(),
456            t1.or(&t2)
457        );
458
459        let t1 = TimestampRange::with_unit(-10, 0, TimeUnit::Second).unwrap();
460        assert_eq!(t1, t1.or(&t1));
461    }
462
463    #[test]
464    fn test_intersect() {
465        let t1 = TimestampRange::with_unit(-10, 0, TimeUnit::Second).unwrap();
466        let t2 = TimestampRange::with_unit(-30, -20, TimeUnit::Second).unwrap();
467        assert!(!t1.intersects(&t2));
468
469        let t1 = TimestampRange::with_unit(10, 20, TimeUnit::Second).unwrap();
470        let t2 = TimestampRange::with_unit(0, 30, TimeUnit::Second).unwrap();
471        assert!(t1.intersects(&t2));
472
473        let t1 = TimestampRange::with_unit(-20, -10, TimeUnit::Second).unwrap();
474        let t2 = TimestampRange::with_unit(-10, 0, TimeUnit::Second).unwrap();
475        assert!(!t1.intersects(&t2));
476
477        let t1 = TimestampRange::with_unit(0, 1, TimeUnit::Second).unwrap();
478        let t2 = TimestampRange::with_unit(999, 1000, TimeUnit::Millisecond).unwrap();
479        assert!(t1.intersects(&t2));
480
481        let t1 = TimestampRange::with_unit(1, 2, TimeUnit::Second).unwrap();
482        let t2 = TimestampRange::with_unit(1000, 2000, TimeUnit::Millisecond).unwrap();
483        assert!(t1.intersects(&t2));
484
485        let t1 = TimestampRange::with_unit(0, 1, TimeUnit::Second).unwrap();
486        assert!(t1.intersects(&t1));
487
488        let t1 = TimestampRange::with_unit(0, 1, TimeUnit::Second).unwrap();
489        let t2 = TimestampRange::empty();
490        assert!(!t1.intersects(&t2));
491
492        // empty range does not intersect with empty range
493        let empty = TimestampRange::empty();
494        assert!(!empty.intersects(&empty));
495
496        // full range intersects with full range
497        let full = TimestampRange::min_to_max();
498        assert!(full.intersects(&full));
499
500        assert!(!full.intersects(&empty));
501    }
502
503    #[test]
504    fn test_new_inclusive() {
505        let range = TimestampRange::new_inclusive(
506            Some(Timestamp::new_millisecond(1)),
507            Some(Timestamp::new_millisecond(3)),
508        );
509        assert!(!range.is_empty());
510        assert!(range.contains(&Timestamp::new_millisecond(1)));
511        assert!(range.contains(&Timestamp::new_millisecond(3)));
512
513        let range = TimestampRange::new_inclusive(
514            Some(Timestamp::new_millisecond(1)),
515            Some(Timestamp::new_millisecond(1)),
516        );
517        assert!(!range.is_empty());
518        assert_eq!(1, range.start.unwrap().value());
519        assert!(range.contains(&Timestamp::new_millisecond(1)));
520
521        let range = TimestampRange::new_inclusive(
522            Some(Timestamp::new_millisecond(2)),
523            Some(Timestamp::new_millisecond(1)),
524        );
525        assert!(range.is_empty());
526    }
527
528    #[test]
529    fn test_serialize_timestamp_range() {
530        macro_rules! test_serde_for_unit {
531            ($($unit: expr),*) => {
532                $(
533                let original_range = TimestampRange::with_unit(0, 10, $unit).unwrap();
534                let string = serde_json::to_string(&original_range).unwrap();
535                let deserialized: TimestampRange = serde_json::from_str(&string).unwrap();
536                assert_eq!(original_range, deserialized);
537                )*
538            };
539        }
540
541        test_serde_for_unit!(
542            TimeUnit::Second,
543            TimeUnit::Millisecond,
544            TimeUnit::Microsecond,
545            TimeUnit::Nanosecond
546        );
547    }
548}