common_time/
range.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::{Debug, Display, Formatter};
16use std::ops::{Bound, RangeBounds};
17
18use serde::{Deserialize, Serialize};
19
20use crate::Timestamp;
21use crate::timestamp::TimeUnit;
22use crate::timestamp_millis::TimestampMillis;
23
24/// A half-open time range.
25///
26/// The range contains values that `value >= start` and `val < end`.
27///
28/// The range is empty iff `start == end == "the default value of T"`
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
30pub struct GenericRange<T> {
31    start: Option<T>,
32    end: Option<T>,
33}
34
35impl<T> GenericRange<T>
36where
37    T: Copy + PartialOrd + Default,
38{
39    /// Computes the AND'ed range with other.  
40    pub fn and(&self, other: &GenericRange<T>) -> GenericRange<T> {
41        let start = match (self.start(), other.start()) {
42            (Some(l), Some(r)) => {
43                if l > r {
44                    Some(*l)
45                } else {
46                    Some(*r)
47                }
48            }
49            (Some(l), None) => Some(*l),
50            (None, Some(r)) => Some(*r),
51            (None, None) => None,
52        };
53
54        let end = match (self.end(), other.end()) {
55            (Some(l), Some(r)) => {
56                if l > r {
57                    Some(*r)
58                } else {
59                    Some(*l)
60                }
61            }
62            (Some(l), None) => Some(*l),
63            (None, Some(r)) => Some(*r),
64            (None, None) => None,
65        };
66
67        Self::from_optional(start, end)
68    }
69
70    /// Compute the OR'ed range of two ranges.
71    /// Notice: this method does not compute the exact OR'ed operation for simplicity.
72    /// For example, `[1, 2)` or'ed with `[4, 5)` will produce `[1, 5)`
73    /// instead of `[1, 2) ∪ [4, 5)`
74    pub fn or(&self, other: &GenericRange<T>) -> GenericRange<T> {
75        if self.is_empty() {
76            return *other;
77        }
78
79        if other.is_empty() {
80            return *self;
81        }
82        let start = match (self.start(), other.start()) {
83            (Some(l), Some(r)) => {
84                if l > r {
85                    Some(*r)
86                } else {
87                    Some(*l)
88                }
89            }
90            (Some(_), None) => None,
91            (None, Some(_)) => None,
92            (None, None) => None,
93        };
94
95        let end = match (self.end(), other.end()) {
96            (Some(l), Some(r)) => {
97                if l > r {
98                    Some(*l)
99                } else {
100                    Some(*r)
101                }
102            }
103            (Some(_), None) => None,
104            (None, Some(_)) => None,
105            (None, None) => None,
106        };
107
108        Self::from_optional(start, end)
109    }
110
111    /// Checks if current range intersect with target.
112    pub fn intersects(&self, target: &GenericRange<T>) -> bool {
113        !self.and(target).is_empty()
114    }
115
116    /// Create an empty range.
117    pub fn empty() -> GenericRange<T> {
118        GenericRange {
119            start: Some(T::default()),
120            end: Some(T::default()),
121        }
122    }
123
124    /// Create GenericRange from optional start and end.
125    /// If the present value of start >= the present value of end, it will return an empty range
126    /// with the default value of `T`.
127    fn from_optional(start: Option<T>, end: Option<T>) -> GenericRange<T> {
128        match (start, end) {
129            (Some(start_val), Some(end_val)) => {
130                if start_val < end_val {
131                    Self {
132                        start: Some(start_val),
133                        end: Some(end_val),
134                    }
135                } else {
136                    Self::empty()
137                }
138            }
139            (s, e) => Self { start: s, end: e },
140        }
141    }
142}
143
144impl<T> GenericRange<T> {
145    /// Creates a new range that contains values in `[start, end)`.
146    ///
147    /// Returns `None` if `start` > `end`.
148    pub fn new<U: PartialOrd + Into<T>>(start: U, end: U) -> Option<GenericRange<T>> {
149        if start <= end {
150            let (start, end) = (Some(start.into()), Some(end.into()));
151            Some(GenericRange { start, end })
152        } else {
153            None
154        }
155    }
156
157    /// Return a range containing all possible values.
158    pub fn min_to_max() -> GenericRange<T> {
159        Self {
160            start: None,
161            end: None,
162        }
163    }
164
165    /// Returns the lower bound of the range (inclusive).
166    #[inline]
167    pub fn start(&self) -> &Option<T> {
168        &self.start
169    }
170
171    /// Returns the upper bound of the range (exclusive).
172    #[inline]
173    pub fn end(&self) -> &Option<T> {
174        &self.end
175    }
176
177    /// Returns true if `timestamp` is contained in the range.
178    pub fn contains<U: PartialOrd<T>>(&self, target: &U) -> bool {
179        match (&self.start, &self.end) {
180            (Some(start), Some(end)) => *target >= *start && *target < *end,
181            (Some(start), None) => *target >= *start,
182            (None, Some(end)) => *target < *end,
183            (None, None) => true,
184        }
185    }
186}
187
188impl<T: PartialOrd> GenericRange<T> {
189    /// Returns true if the range contains no timestamps.
190    #[inline]
191    pub fn is_empty(&self) -> bool {
192        match (&self.start, &self.end) {
193            (Some(start), Some(end)) => start == end,
194            _ => false,
195        }
196    }
197}
198
199pub type TimestampRange = GenericRange<Timestamp>;
200
201impl Display for TimestampRange {
202    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
203        let s = match (&self.start, &self.end) {
204            (Some(start), Some(end)) => {
205                format!(
206                    "TimestampRange{{[{}{},{}{})}}",
207                    start.value(),
208                    start.unit().short_name(),
209                    end.value(),
210                    end.unit().short_name()
211                )
212            }
213            (Some(start), None) => {
214                format!(
215                    "TimestampRange{{[{}{},#)}}",
216                    start.value(),
217                    start.unit().short_name()
218                )
219            }
220            (None, Some(end)) => {
221                format!(
222                    "TimestampRange{{[#,{}{})}}",
223                    end.value(),
224                    end.unit().short_name()
225                )
226            }
227            (None, None) => "TimestampRange{{[#,#)}}".to_string(),
228        };
229        f.write_str(&s)
230    }
231}
232
233impl TimestampRange {
234    /// Create a TimestampRange with optional inclusive end timestamp.
235    /// If end timestamp is present and is less than start timestamp, this method will return
236    /// an empty range.
237    /// ### Caveat
238    /// If the given end timestamp's value is `i64::MAX`, which will result into overflow when added
239    /// by 1(the end is inclusive), this method does not try to convert the time unit of end
240    /// timestamp, instead it just return `[start, INF)`. This exaggerates the range but does not
241    /// affect correctness.  
242    pub fn new_inclusive(start: Option<Timestamp>, end: Option<Timestamp>) -> Self {
243        // check for emptiness
244        if let (Some(start_ts), Some(end_ts)) = (start, end)
245            && start_ts > end_ts
246        {
247            return Self::empty();
248        }
249
250        let end = if let Some(end) = end {
251            end.value()
252                .checked_add(1)
253                .map(|v| Timestamp::new(v, end.unit()))
254        } else {
255            None
256        };
257        Self::from_optional(start, end)
258    }
259
260    /// Shortcut method to create a timestamp range with given start/end value and time unit.
261    /// Returns empty iff `start` > `end`.
262    pub fn with_unit(start: i64, end: i64, unit: TimeUnit) -> Option<Self> {
263        let start = Timestamp::new(start, unit);
264        let end = Timestamp::new(end, unit);
265        Self::new(start, end)
266    }
267
268    /// Create a range that containing only given `ts`.
269    /// ### Notice:
270    /// Left-close right-open range cannot properly represent range with a single value.
271    /// For simplicity, this implementation returns an approximate range `[ts, ts+1)` instead.
272    pub fn single(ts: Timestamp) -> Self {
273        let unit = ts.unit();
274        let start = Some(ts);
275        let end = ts.value().checked_add(1).map(|v| Timestamp::new(v, unit));
276
277        Self::from_optional(start, end)
278    }
279
280    /// Create a range `[start, INF)`.
281    /// ### Notice
282    /// Left-close right-open range cannot properly represent range with exclusive start like: `(start, ...)`.
283    /// You may resort to `[start-1, ...)` instead.
284    pub fn from_start(start: Timestamp) -> Self {
285        Self {
286            start: Some(start),
287            end: None,
288        }
289    }
290
291    /// Create a range `[-INF, end)`.
292    /// ### Notice
293    /// Left-close right-open range cannot properly represent range with inclusive end like: `[..., END]`.
294    /// If `inclusive` is true, this method returns `[-INF, end+1)` instead.
295    pub fn until_end(end: Timestamp, inclusive: bool) -> Self {
296        let end = if inclusive {
297            end.value()
298                .checked_add(1)
299                .map(|v| Timestamp::new(v, end.unit()))
300        } else {
301            Some(end)
302        };
303        Self { start: None, end }
304    }
305}
306
307/// Create [TimestampRange] from a timestamp tuple.
308/// The tuple's two elements form the "start" and "end" (both inclusive) of the timestamp range.
309impl From<(Timestamp, Timestamp)> for TimestampRange {
310    fn from((start, end): (Timestamp, Timestamp)) -> Self {
311        if start > end {
312            Self::empty()
313        } else {
314            Self::new_inclusive(Some(start), Some(end))
315        }
316    }
317}
318
319/// Create [TimestampRange] from Rust's "range".
320impl<R: RangeBounds<Timestamp>> From<R> for TimestampRange {
321    fn from(r: R) -> Self {
322        let start = match r.start_bound() {
323            Bound::Included(x) => Some(*x),
324            Bound::Excluded(x) => x
325                .value()
326                .checked_sub(1)
327                .map(|v| Timestamp::new(v, x.unit())),
328            Bound::Unbounded => None,
329        };
330        let end = match r.end_bound() {
331            Bound::Included(x) => x
332                .value()
333                .checked_add(1)
334                .map(|v| Timestamp::new(v, x.unit())),
335            Bound::Excluded(x) => Some(*x),
336            Bound::Unbounded => None,
337        };
338        Self::from_optional(start, end)
339    }
340}
341
342/// Time range in milliseconds.
343pub type RangeMillis = GenericRange<TimestampMillis>;
344
345#[cfg(test)]
346mod tests {
347
348    use super::*;
349
350    #[test]
351    fn test_new_range() {
352        let (start, end) = (TimestampMillis::new(0), TimestampMillis::new(100));
353        let range = RangeMillis::new(start, end).unwrap();
354
355        assert_eq!(Some(start), *range.start());
356        assert_eq!(Some(end), *range.end());
357
358        let range2 = RangeMillis::new(0, 100).unwrap();
359        assert_eq!(range, range2);
360
361        let range_eq = RangeMillis::new(123, 123).unwrap();
362        assert_eq!(range_eq.start(), range_eq.end());
363
364        assert_eq!(None, RangeMillis::new(1, 0));
365
366        let range = RangeMillis::empty();
367        assert_eq!(range.start(), range.end());
368        assert_eq!(Some(TimestampMillis::new(0)), *range.start());
369    }
370
371    #[test]
372    fn test_timestamp_range_new_inclusive() {
373        let range = TimestampRange::new_inclusive(
374            Some(Timestamp::new(i64::MAX - 1, TimeUnit::Second)),
375            Some(Timestamp::new(i64::MAX, TimeUnit::Millisecond)),
376        );
377        assert!(range.is_empty());
378
379        let range = TimestampRange::new_inclusive(
380            Some(Timestamp::new(1, TimeUnit::Second)),
381            Some(Timestamp::new(1, TimeUnit::Millisecond)),
382        );
383        assert!(range.is_empty());
384
385        let range = TimestampRange::new_inclusive(
386            Some(Timestamp::new(1, TimeUnit::Second)),
387            Some(Timestamp::new(i64::MAX, TimeUnit::Millisecond)),
388        );
389        assert!(range.end.is_none());
390    }
391
392    #[test]
393    fn test_range_contains() {
394        let range = RangeMillis::new(-10, 10).unwrap();
395        assert!(!range.is_empty());
396        assert!(range.contains(&-10));
397        assert!(range.contains(&0));
398        assert!(range.contains(&9));
399        assert!(!range.contains(&10));
400
401        let range = RangeMillis::new(i64::MIN, i64::MAX).unwrap();
402        assert!(!range.is_empty());
403        assert!(range.contains(&TimestampMillis::MIN));
404        assert!(range.contains(&TimestampMillis::MAX));
405        assert!(!range.contains(&TimestampMillis::INF));
406
407        let range = RangeMillis::new(0, 0).unwrap();
408        assert!(range.is_empty());
409        assert!(!range.contains(&0));
410    }
411
412    #[test]
413    fn test_range_with_diff_unit() {
414        let r1 = TimestampRange::with_unit(1, 2, TimeUnit::Second).unwrap();
415        let r2 = TimestampRange::with_unit(1000, 2000, TimeUnit::Millisecond).unwrap();
416        assert_eq!(r2, r1);
417
418        let r3 = TimestampRange::with_unit(0, 0, TimeUnit::Second).unwrap();
419        let r4 = TimestampRange::with_unit(0, 0, TimeUnit::Millisecond).unwrap();
420        assert_eq!(r3, r4);
421
422        let r5 = TimestampRange::with_unit(0, 0, TimeUnit::Millisecond).unwrap();
423        let r6 = TimestampRange::with_unit(0, 0, TimeUnit::Microsecond).unwrap();
424        assert_eq!(r5, r6);
425
426        let r5 = TimestampRange::with_unit(-1, 1, TimeUnit::Second).unwrap();
427        let r6 = TimestampRange::with_unit(-1000, 1000, TimeUnit::Millisecond).unwrap();
428        assert_eq!(r5, r6);
429    }
430
431    #[test]
432    fn test_range_and() {
433        assert_eq!(
434            TimestampRange::with_unit(5, 10, TimeUnit::Millisecond).unwrap(),
435            TimestampRange::with_unit(1, 10, TimeUnit::Millisecond)
436                .unwrap()
437                .and(&TimestampRange::with_unit(5, 20, TimeUnit::Millisecond).unwrap())
438        );
439
440        let empty = TimestampRange::with_unit(0, 10, TimeUnit::Millisecond)
441            .unwrap()
442            .and(&TimestampRange::with_unit(11, 20, TimeUnit::Millisecond).unwrap());
443        assert!(empty.is_empty());
444
445        // whatever AND'ed with empty shall return empty
446        let empty_and_all = empty.and(&TimestampRange::min_to_max());
447        assert!(empty_and_all.is_empty());
448        assert!(empty.and(&empty).is_empty());
449        assert!(
450            empty
451                .and(&TimestampRange::with_unit(0, 10, TimeUnit::Millisecond).unwrap())
452                .is_empty()
453        );
454
455        // AND TimestampRange with different unit
456        let anded = TimestampRange::with_unit(0, 10, TimeUnit::Millisecond)
457            .unwrap()
458            .and(&TimestampRange::with_unit(1000, 12000, TimeUnit::Microsecond).unwrap());
459        assert_eq!(
460            TimestampRange::with_unit(1, 10, TimeUnit::Millisecond).unwrap(),
461            anded
462        );
463
464        let anded = TimestampRange::with_unit(0, 10, TimeUnit::Millisecond)
465            .unwrap()
466            .and(&TimestampRange::with_unit(1000, 12000, TimeUnit::Microsecond).unwrap());
467        assert_eq!(
468            TimestampRange::with_unit(1, 10, TimeUnit::Millisecond).unwrap(),
469            anded
470        );
471    }
472
473    #[test]
474    fn test_range_or() {
475        assert_eq!(
476            TimestampRange::with_unit(1, 20, TimeUnit::Millisecond).unwrap(),
477            TimestampRange::with_unit(1, 10, TimeUnit::Millisecond)
478                .unwrap()
479                .or(&TimestampRange::with_unit(5, 20, TimeUnit::Millisecond).unwrap())
480        );
481
482        assert_eq!(
483            TimestampRange::min_to_max(),
484            TimestampRange::min_to_max().or(&TimestampRange::min_to_max())
485        );
486
487        let empty = TimestampRange::empty().or(&TimestampRange::empty());
488        assert!(empty.is_empty());
489
490        let t1 = TimestampRange::with_unit(-10, 0, TimeUnit::Second).unwrap();
491        let t2 = TimestampRange::with_unit(-30, -20, TimeUnit::Second).unwrap();
492        assert_eq!(
493            TimestampRange::with_unit(-30, 0, TimeUnit::Second).unwrap(),
494            t1.or(&t2)
495        );
496
497        let t1 = TimestampRange::with_unit(-10, 0, TimeUnit::Second).unwrap();
498        assert_eq!(t1, t1.or(&t1));
499    }
500
501    #[test]
502    fn test_intersect() {
503        let t1 = TimestampRange::with_unit(-10, 0, TimeUnit::Second).unwrap();
504        let t2 = TimestampRange::with_unit(-30, -20, TimeUnit::Second).unwrap();
505        assert!(!t1.intersects(&t2));
506
507        let t1 = TimestampRange::with_unit(10, 20, TimeUnit::Second).unwrap();
508        let t2 = TimestampRange::with_unit(0, 30, TimeUnit::Second).unwrap();
509        assert!(t1.intersects(&t2));
510
511        let t1 = TimestampRange::with_unit(-20, -10, TimeUnit::Second).unwrap();
512        let t2 = TimestampRange::with_unit(-10, 0, TimeUnit::Second).unwrap();
513        assert!(!t1.intersects(&t2));
514
515        let t1 = TimestampRange::with_unit(0, 1, TimeUnit::Second).unwrap();
516        let t2 = TimestampRange::with_unit(999, 1000, TimeUnit::Millisecond).unwrap();
517        assert!(t1.intersects(&t2));
518
519        let t1 = TimestampRange::with_unit(1, 2, TimeUnit::Second).unwrap();
520        let t2 = TimestampRange::with_unit(1000, 2000, TimeUnit::Millisecond).unwrap();
521        assert!(t1.intersects(&t2));
522
523        let t1 = TimestampRange::with_unit(0, 1, TimeUnit::Second).unwrap();
524        assert!(t1.intersects(&t1));
525
526        let t1 = TimestampRange::with_unit(0, 1, TimeUnit::Second).unwrap();
527        let t2 = TimestampRange::empty();
528        assert!(!t1.intersects(&t2));
529
530        // empty range does not intersect with empty range
531        let empty = TimestampRange::empty();
532        assert!(!empty.intersects(&empty));
533
534        // full range intersects with full range
535        let full = TimestampRange::min_to_max();
536        assert!(full.intersects(&full));
537
538        assert!(!full.intersects(&empty));
539    }
540
541    #[test]
542    fn test_new_inclusive() {
543        let range = TimestampRange::new_inclusive(
544            Some(Timestamp::new_millisecond(1)),
545            Some(Timestamp::new_millisecond(3)),
546        );
547        assert!(!range.is_empty());
548        assert!(range.contains(&Timestamp::new_millisecond(1)));
549        assert!(range.contains(&Timestamp::new_millisecond(3)));
550
551        let range = TimestampRange::new_inclusive(
552            Some(Timestamp::new_millisecond(1)),
553            Some(Timestamp::new_millisecond(1)),
554        );
555        assert!(!range.is_empty());
556        assert_eq!(1, range.start.unwrap().value());
557        assert!(range.contains(&Timestamp::new_millisecond(1)));
558
559        let range = TimestampRange::new_inclusive(
560            Some(Timestamp::new_millisecond(2)),
561            Some(Timestamp::new_millisecond(1)),
562        );
563        assert!(range.is_empty());
564    }
565
566    #[test]
567    fn test_serialize_timestamp_range() {
568        macro_rules! test_serde_for_unit {
569            ($($unit: expr),*) => {
570                $(
571                let original_range = TimestampRange::with_unit(0, 10, $unit).unwrap();
572                let string = serde_json::to_string(&original_range).unwrap();
573                let deserialized: TimestampRange = serde_json::from_str(&string).unwrap();
574                assert_eq!(original_range, deserialized);
575                )*
576            };
577        }
578
579        test_serde_for_unit!(
580            TimeUnit::Second,
581            TimeUnit::Millisecond,
582            TimeUnit::Microsecond,
583            TimeUnit::Nanosecond
584        );
585    }
586
587    #[test]
588    fn test_from_timestamp_tuple() {
589        let timestamp_range: TimestampRange =
590            (Timestamp::new_millisecond(1), Timestamp::new_millisecond(3)).into();
591        assert_eq!(
592            timestamp_range,
593            TimestampRange::from_optional(
594                Some(Timestamp::new_millisecond(1)),
595                Some(Timestamp::new_millisecond(4))
596            )
597        );
598
599        let timestamp_range: TimestampRange =
600            (Timestamp::new_millisecond(1), Timestamp::new_millisecond(1)).into();
601        assert_eq!(
602            timestamp_range,
603            TimestampRange::from_optional(
604                Some(Timestamp::new_millisecond(1)),
605                Some(Timestamp::new_millisecond(2))
606            )
607        );
608
609        let timestamp_range: TimestampRange =
610            (Timestamp::new_second(1), Timestamp::new_millisecond(3)).into();
611        assert_eq!(timestamp_range, TimestampRange::empty());
612    }
613
614    #[test]
615    fn test_from_timestamp_range() {
616        let timestamp_range: TimestampRange =
617            (Timestamp::new_millisecond(1)..Timestamp::new_millisecond(3)).into();
618        assert_eq!(
619            timestamp_range,
620            TimestampRange::from_optional(
621                Some(Timestamp::new_millisecond(1)),
622                Some(Timestamp::new_millisecond(3))
623            )
624        );
625
626        let timestamp_range: TimestampRange =
627            (Timestamp::new_millisecond(1)..=Timestamp::new_millisecond(3)).into();
628        assert_eq!(
629            timestamp_range,
630            TimestampRange::from_optional(
631                Some(Timestamp::new_millisecond(1)),
632                Some(Timestamp::new_millisecond(4))
633            )
634        );
635
636        let timestamp_range: TimestampRange = (Timestamp::new_millisecond(1)..).into();
637        assert_eq!(
638            timestamp_range,
639            TimestampRange::from_optional(Some(Timestamp::new_millisecond(1)), None)
640        );
641
642        let timestamp_range: TimestampRange = (..Timestamp::new_millisecond(3)).into();
643        assert_eq!(
644            timestamp_range,
645            TimestampRange::from_optional(None, Some(Timestamp::new_millisecond(3)))
646        );
647
648        let timestamp_range: TimestampRange = (..=Timestamp::new_millisecond(3)).into();
649        assert_eq!(
650            timestamp_range,
651            TimestampRange::from_optional(None, Some(Timestamp::new_millisecond(4)))
652        );
653
654        let timestamp_range: TimestampRange = (..).into();
655        assert_eq!(timestamp_range, TimestampRange::min_to_max(),);
656    }
657}