common_time/
range.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::{Debug, Display, Formatter};
16use std::ops::{Bound, RangeBounds};
17
18use serde::{Deserialize, Serialize};
19
20use crate::timestamp::TimeUnit;
21use crate::timestamp_millis::TimestampMillis;
22use crate::Timestamp;
23
24/// A half-open time range.
25///
26/// The range contains values that `value >= start` and `val < end`.
27///
28/// The range is empty iff `start == end == "the default value of T"`
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
30pub struct GenericRange<T> {
31    start: Option<T>,
32    end: Option<T>,
33}
34
35impl<T> GenericRange<T>
36where
37    T: Copy + PartialOrd + Default,
38{
39    /// Computes the AND'ed range with other.  
40    pub fn and(&self, other: &GenericRange<T>) -> GenericRange<T> {
41        let start = match (self.start(), other.start()) {
42            (Some(l), Some(r)) => {
43                if l > r {
44                    Some(*l)
45                } else {
46                    Some(*r)
47                }
48            }
49            (Some(l), None) => Some(*l),
50            (None, Some(r)) => Some(*r),
51            (None, None) => None,
52        };
53
54        let end = match (self.end(), other.end()) {
55            (Some(l), Some(r)) => {
56                if l > r {
57                    Some(*r)
58                } else {
59                    Some(*l)
60                }
61            }
62            (Some(l), None) => Some(*l),
63            (None, Some(r)) => Some(*r),
64            (None, None) => None,
65        };
66
67        Self::from_optional(start, end)
68    }
69
70    /// Compute the OR'ed range of two ranges.
71    /// Notice: this method does not compute the exact OR'ed operation for simplicity.
72    /// For example, `[1, 2)` or'ed with `[4, 5)` will produce `[1, 5)`
73    /// instead of `[1, 2) ∪ [4, 5)`
74    pub fn or(&self, other: &GenericRange<T>) -> GenericRange<T> {
75        if self.is_empty() {
76            return *other;
77        }
78
79        if other.is_empty() {
80            return *self;
81        }
82        let start = match (self.start(), other.start()) {
83            (Some(l), Some(r)) => {
84                if l > r {
85                    Some(*r)
86                } else {
87                    Some(*l)
88                }
89            }
90            (Some(_), None) => None,
91            (None, Some(_)) => None,
92            (None, None) => None,
93        };
94
95        let end = match (self.end(), other.end()) {
96            (Some(l), Some(r)) => {
97                if l > r {
98                    Some(*l)
99                } else {
100                    Some(*r)
101                }
102            }
103            (Some(_), None) => None,
104            (None, Some(_)) => None,
105            (None, None) => None,
106        };
107
108        Self::from_optional(start, end)
109    }
110
111    /// Checks if current range intersect with target.
112    pub fn intersects(&self, target: &GenericRange<T>) -> bool {
113        !self.and(target).is_empty()
114    }
115
116    /// Create an empty range.
117    pub fn empty() -> GenericRange<T> {
118        GenericRange {
119            start: Some(T::default()),
120            end: Some(T::default()),
121        }
122    }
123
124    /// Create GenericRange from optional start and end.
125    /// If the present value of start >= the present value of end, it will return an empty range
126    /// with the default value of `T`.
127    fn from_optional(start: Option<T>, end: Option<T>) -> GenericRange<T> {
128        match (start, end) {
129            (Some(start_val), Some(end_val)) => {
130                if start_val < end_val {
131                    Self {
132                        start: Some(start_val),
133                        end: Some(end_val),
134                    }
135                } else {
136                    Self::empty()
137                }
138            }
139            (s, e) => Self { start: s, end: e },
140        }
141    }
142}
143
144impl<T> GenericRange<T> {
145    /// Creates a new range that contains values in `[start, end)`.
146    ///
147    /// Returns `None` if `start` > `end`.
148    pub fn new<U: PartialOrd + Into<T>>(start: U, end: U) -> Option<GenericRange<T>> {
149        if start <= end {
150            let (start, end) = (Some(start.into()), Some(end.into()));
151            Some(GenericRange { start, end })
152        } else {
153            None
154        }
155    }
156
157    /// Return a range containing all possible values.
158    pub fn min_to_max() -> GenericRange<T> {
159        Self {
160            start: None,
161            end: None,
162        }
163    }
164
165    /// Returns the lower bound of the range (inclusive).
166    #[inline]
167    pub fn start(&self) -> &Option<T> {
168        &self.start
169    }
170
171    /// Returns the upper bound of the range (exclusive).
172    #[inline]
173    pub fn end(&self) -> &Option<T> {
174        &self.end
175    }
176
177    /// Returns true if `timestamp` is contained in the range.
178    pub fn contains<U: PartialOrd<T>>(&self, target: &U) -> bool {
179        match (&self.start, &self.end) {
180            (Some(start), Some(end)) => *target >= *start && *target < *end,
181            (Some(start), None) => *target >= *start,
182            (None, Some(end)) => *target < *end,
183            (None, None) => true,
184        }
185    }
186}
187
188impl<T: PartialOrd> GenericRange<T> {
189    /// Returns true if the range contains no timestamps.
190    #[inline]
191    pub fn is_empty(&self) -> bool {
192        match (&self.start, &self.end) {
193            (Some(start), Some(end)) => start == end,
194            _ => false,
195        }
196    }
197}
198
199pub type TimestampRange = GenericRange<Timestamp>;
200
201impl Display for TimestampRange {
202    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
203        let s = match (&self.start, &self.end) {
204            (Some(start), Some(end)) => {
205                format!(
206                    "TimestampRange{{[{}{},{}{})}}",
207                    start.value(),
208                    start.unit().short_name(),
209                    end.value(),
210                    end.unit().short_name()
211                )
212            }
213            (Some(start), None) => {
214                format!(
215                    "TimestampRange{{[{}{},#)}}",
216                    start.value(),
217                    start.unit().short_name()
218                )
219            }
220            (None, Some(end)) => {
221                format!(
222                    "TimestampRange{{[#,{}{})}}",
223                    end.value(),
224                    end.unit().short_name()
225                )
226            }
227            (None, None) => "TimestampRange{{[#,#)}}".to_string(),
228        };
229        f.write_str(&s)
230    }
231}
232
233impl TimestampRange {
234    /// Create a TimestampRange with optional inclusive end timestamp.
235    /// If end timestamp is present and is less than start timestamp, this method will return
236    /// an empty range.
237    /// ### Caveat
238    /// If the given end timestamp's value is `i64::MAX`, which will result into overflow when added
239    /// by 1(the end is inclusive), this method does not try to convert the time unit of end
240    /// timestamp, instead it just return `[start, INF)`. This exaggerates the range but does not
241    /// affect correctness.  
242    pub fn new_inclusive(start: Option<Timestamp>, end: Option<Timestamp>) -> Self {
243        // check for emptiness
244        if let (Some(start_ts), Some(end_ts)) = (start, end) {
245            if start_ts > end_ts {
246                return Self::empty();
247            }
248        }
249
250        let end = if let Some(end) = end {
251            end.value()
252                .checked_add(1)
253                .map(|v| Timestamp::new(v, end.unit()))
254        } else {
255            None
256        };
257        Self::from_optional(start, end)
258    }
259
260    /// Shortcut method to create a timestamp range with given start/end value and time unit.
261    /// Returns empty iff `start` > `end`.
262    pub fn with_unit(start: i64, end: i64, unit: TimeUnit) -> Option<Self> {
263        let start = Timestamp::new(start, unit);
264        let end = Timestamp::new(end, unit);
265        Self::new(start, end)
266    }
267
268    /// Create a range that containing only given `ts`.
269    /// ### Notice:
270    /// Left-close right-open range cannot properly represent range with a single value.
271    /// For simplicity, this implementation returns an approximate range `[ts, ts+1)` instead.
272    pub fn single(ts: Timestamp) -> Self {
273        let unit = ts.unit();
274        let start = Some(ts);
275        let end = ts.value().checked_add(1).map(|v| Timestamp::new(v, unit));
276
277        Self::from_optional(start, end)
278    }
279
280    /// Create a range `[start, INF)`.
281    /// ### Notice
282    /// Left-close right-open range cannot properly represent range with exclusive start like: `(start, ...)`.
283    /// You may resort to `[start-1, ...)` instead.
284    pub fn from_start(start: Timestamp) -> Self {
285        Self {
286            start: Some(start),
287            end: None,
288        }
289    }
290
291    /// Create a range `[-INF, end)`.
292    /// ### Notice
293    /// Left-close right-open range cannot properly represent range with inclusive end like: `[..., END]`.
294    /// If `inclusive` is true, this method returns `[-INF, end+1)` instead.
295    pub fn until_end(end: Timestamp, inclusive: bool) -> Self {
296        let end = if inclusive {
297            end.value()
298                .checked_add(1)
299                .map(|v| Timestamp::new(v, end.unit()))
300        } else {
301            Some(end)
302        };
303        Self { start: None, end }
304    }
305}
306
307/// Create [TimestampRange] from a timestamp tuple.
308/// The tuple's two elements form the "start" and "end" (both inclusive) of the timestamp range.
309impl From<(Timestamp, Timestamp)> for TimestampRange {
310    fn from((start, end): (Timestamp, Timestamp)) -> Self {
311        if start > end {
312            Self::empty()
313        } else {
314            Self::new_inclusive(Some(start), Some(end))
315        }
316    }
317}
318
319/// Create [TimestampRange] from Rust's "range".
320impl<R: RangeBounds<Timestamp>> From<R> for TimestampRange {
321    fn from(r: R) -> Self {
322        let start = match r.start_bound() {
323            Bound::Included(x) => Some(*x),
324            Bound::Excluded(x) => x
325                .value()
326                .checked_sub(1)
327                .map(|v| Timestamp::new(v, x.unit())),
328            Bound::Unbounded => None,
329        };
330        let end = match r.end_bound() {
331            Bound::Included(x) => x
332                .value()
333                .checked_add(1)
334                .map(|v| Timestamp::new(v, x.unit())),
335            Bound::Excluded(x) => Some(*x),
336            Bound::Unbounded => None,
337        };
338        Self::from_optional(start, end)
339    }
340}
341
342/// Time range in milliseconds.
343pub type RangeMillis = GenericRange<TimestampMillis>;
344
345#[cfg(test)]
346mod tests {
347
348    use super::*;
349
350    #[test]
351    fn test_new_range() {
352        let (start, end) = (TimestampMillis::new(0), TimestampMillis::new(100));
353        let range = RangeMillis::new(start, end).unwrap();
354
355        assert_eq!(Some(start), *range.start());
356        assert_eq!(Some(end), *range.end());
357
358        let range2 = RangeMillis::new(0, 100).unwrap();
359        assert_eq!(range, range2);
360
361        let range_eq = RangeMillis::new(123, 123).unwrap();
362        assert_eq!(range_eq.start(), range_eq.end());
363
364        assert_eq!(None, RangeMillis::new(1, 0));
365
366        let range = RangeMillis::empty();
367        assert_eq!(range.start(), range.end());
368        assert_eq!(Some(TimestampMillis::new(0)), *range.start());
369    }
370
371    #[test]
372    fn test_timestamp_range_new_inclusive() {
373        let range = TimestampRange::new_inclusive(
374            Some(Timestamp::new(i64::MAX - 1, TimeUnit::Second)),
375            Some(Timestamp::new(i64::MAX, TimeUnit::Millisecond)),
376        );
377        assert!(range.is_empty());
378
379        let range = TimestampRange::new_inclusive(
380            Some(Timestamp::new(1, TimeUnit::Second)),
381            Some(Timestamp::new(1, TimeUnit::Millisecond)),
382        );
383        assert!(range.is_empty());
384
385        let range = TimestampRange::new_inclusive(
386            Some(Timestamp::new(1, TimeUnit::Second)),
387            Some(Timestamp::new(i64::MAX, TimeUnit::Millisecond)),
388        );
389        assert!(range.end.is_none());
390    }
391
392    #[test]
393    fn test_range_contains() {
394        let range = RangeMillis::new(-10, 10).unwrap();
395        assert!(!range.is_empty());
396        assert!(range.contains(&-10));
397        assert!(range.contains(&0));
398        assert!(range.contains(&9));
399        assert!(!range.contains(&10));
400
401        let range = RangeMillis::new(i64::MIN, i64::MAX).unwrap();
402        assert!(!range.is_empty());
403        assert!(range.contains(&TimestampMillis::MIN));
404        assert!(range.contains(&TimestampMillis::MAX));
405        assert!(!range.contains(&TimestampMillis::INF));
406
407        let range = RangeMillis::new(0, 0).unwrap();
408        assert!(range.is_empty());
409        assert!(!range.contains(&0));
410    }
411
412    #[test]
413    fn test_range_with_diff_unit() {
414        let r1 = TimestampRange::with_unit(1, 2, TimeUnit::Second).unwrap();
415        let r2 = TimestampRange::with_unit(1000, 2000, TimeUnit::Millisecond).unwrap();
416        assert_eq!(r2, r1);
417
418        let r3 = TimestampRange::with_unit(0, 0, TimeUnit::Second).unwrap();
419        let r4 = TimestampRange::with_unit(0, 0, TimeUnit::Millisecond).unwrap();
420        assert_eq!(r3, r4);
421
422        let r5 = TimestampRange::with_unit(0, 0, TimeUnit::Millisecond).unwrap();
423        let r6 = TimestampRange::with_unit(0, 0, TimeUnit::Microsecond).unwrap();
424        assert_eq!(r5, r6);
425
426        let r5 = TimestampRange::with_unit(-1, 1, TimeUnit::Second).unwrap();
427        let r6 = TimestampRange::with_unit(-1000, 1000, TimeUnit::Millisecond).unwrap();
428        assert_eq!(r5, r6);
429    }
430
431    #[test]
432    fn test_range_and() {
433        assert_eq!(
434            TimestampRange::with_unit(5, 10, TimeUnit::Millisecond).unwrap(),
435            TimestampRange::with_unit(1, 10, TimeUnit::Millisecond)
436                .unwrap()
437                .and(&TimestampRange::with_unit(5, 20, TimeUnit::Millisecond).unwrap())
438        );
439
440        let empty = TimestampRange::with_unit(0, 10, TimeUnit::Millisecond)
441            .unwrap()
442            .and(&TimestampRange::with_unit(11, 20, TimeUnit::Millisecond).unwrap());
443        assert!(empty.is_empty());
444
445        // whatever AND'ed with empty shall return empty
446        let empty_and_all = empty.and(&TimestampRange::min_to_max());
447        assert!(empty_and_all.is_empty());
448        assert!(empty.and(&empty).is_empty());
449        assert!(empty
450            .and(&TimestampRange::with_unit(0, 10, TimeUnit::Millisecond).unwrap())
451            .is_empty());
452
453        // AND TimestampRange with different unit
454        let anded = TimestampRange::with_unit(0, 10, TimeUnit::Millisecond)
455            .unwrap()
456            .and(&TimestampRange::with_unit(1000, 12000, TimeUnit::Microsecond).unwrap());
457        assert_eq!(
458            TimestampRange::with_unit(1, 10, TimeUnit::Millisecond).unwrap(),
459            anded
460        );
461
462        let anded = TimestampRange::with_unit(0, 10, TimeUnit::Millisecond)
463            .unwrap()
464            .and(&TimestampRange::with_unit(1000, 12000, TimeUnit::Microsecond).unwrap());
465        assert_eq!(
466            TimestampRange::with_unit(1, 10, TimeUnit::Millisecond).unwrap(),
467            anded
468        );
469    }
470
471    #[test]
472    fn test_range_or() {
473        assert_eq!(
474            TimestampRange::with_unit(1, 20, TimeUnit::Millisecond).unwrap(),
475            TimestampRange::with_unit(1, 10, TimeUnit::Millisecond)
476                .unwrap()
477                .or(&TimestampRange::with_unit(5, 20, TimeUnit::Millisecond).unwrap())
478        );
479
480        assert_eq!(
481            TimestampRange::min_to_max(),
482            TimestampRange::min_to_max().or(&TimestampRange::min_to_max())
483        );
484
485        let empty = TimestampRange::empty().or(&TimestampRange::empty());
486        assert!(empty.is_empty());
487
488        let t1 = TimestampRange::with_unit(-10, 0, TimeUnit::Second).unwrap();
489        let t2 = TimestampRange::with_unit(-30, -20, TimeUnit::Second).unwrap();
490        assert_eq!(
491            TimestampRange::with_unit(-30, 0, TimeUnit::Second).unwrap(),
492            t1.or(&t2)
493        );
494
495        let t1 = TimestampRange::with_unit(-10, 0, TimeUnit::Second).unwrap();
496        assert_eq!(t1, t1.or(&t1));
497    }
498
499    #[test]
500    fn test_intersect() {
501        let t1 = TimestampRange::with_unit(-10, 0, TimeUnit::Second).unwrap();
502        let t2 = TimestampRange::with_unit(-30, -20, TimeUnit::Second).unwrap();
503        assert!(!t1.intersects(&t2));
504
505        let t1 = TimestampRange::with_unit(10, 20, TimeUnit::Second).unwrap();
506        let t2 = TimestampRange::with_unit(0, 30, TimeUnit::Second).unwrap();
507        assert!(t1.intersects(&t2));
508
509        let t1 = TimestampRange::with_unit(-20, -10, TimeUnit::Second).unwrap();
510        let t2 = TimestampRange::with_unit(-10, 0, TimeUnit::Second).unwrap();
511        assert!(!t1.intersects(&t2));
512
513        let t1 = TimestampRange::with_unit(0, 1, TimeUnit::Second).unwrap();
514        let t2 = TimestampRange::with_unit(999, 1000, TimeUnit::Millisecond).unwrap();
515        assert!(t1.intersects(&t2));
516
517        let t1 = TimestampRange::with_unit(1, 2, TimeUnit::Second).unwrap();
518        let t2 = TimestampRange::with_unit(1000, 2000, TimeUnit::Millisecond).unwrap();
519        assert!(t1.intersects(&t2));
520
521        let t1 = TimestampRange::with_unit(0, 1, TimeUnit::Second).unwrap();
522        assert!(t1.intersects(&t1));
523
524        let t1 = TimestampRange::with_unit(0, 1, TimeUnit::Second).unwrap();
525        let t2 = TimestampRange::empty();
526        assert!(!t1.intersects(&t2));
527
528        // empty range does not intersect with empty range
529        let empty = TimestampRange::empty();
530        assert!(!empty.intersects(&empty));
531
532        // full range intersects with full range
533        let full = TimestampRange::min_to_max();
534        assert!(full.intersects(&full));
535
536        assert!(!full.intersects(&empty));
537    }
538
539    #[test]
540    fn test_new_inclusive() {
541        let range = TimestampRange::new_inclusive(
542            Some(Timestamp::new_millisecond(1)),
543            Some(Timestamp::new_millisecond(3)),
544        );
545        assert!(!range.is_empty());
546        assert!(range.contains(&Timestamp::new_millisecond(1)));
547        assert!(range.contains(&Timestamp::new_millisecond(3)));
548
549        let range = TimestampRange::new_inclusive(
550            Some(Timestamp::new_millisecond(1)),
551            Some(Timestamp::new_millisecond(1)),
552        );
553        assert!(!range.is_empty());
554        assert_eq!(1, range.start.unwrap().value());
555        assert!(range.contains(&Timestamp::new_millisecond(1)));
556
557        let range = TimestampRange::new_inclusive(
558            Some(Timestamp::new_millisecond(2)),
559            Some(Timestamp::new_millisecond(1)),
560        );
561        assert!(range.is_empty());
562    }
563
564    #[test]
565    fn test_serialize_timestamp_range() {
566        macro_rules! test_serde_for_unit {
567            ($($unit: expr),*) => {
568                $(
569                let original_range = TimestampRange::with_unit(0, 10, $unit).unwrap();
570                let string = serde_json::to_string(&original_range).unwrap();
571                let deserialized: TimestampRange = serde_json::from_str(&string).unwrap();
572                assert_eq!(original_range, deserialized);
573                )*
574            };
575        }
576
577        test_serde_for_unit!(
578            TimeUnit::Second,
579            TimeUnit::Millisecond,
580            TimeUnit::Microsecond,
581            TimeUnit::Nanosecond
582        );
583    }
584
585    #[test]
586    fn test_from_timestamp_tuple() {
587        let timestamp_range: TimestampRange =
588            (Timestamp::new_millisecond(1), Timestamp::new_millisecond(3)).into();
589        assert_eq!(
590            timestamp_range,
591            TimestampRange::from_optional(
592                Some(Timestamp::new_millisecond(1)),
593                Some(Timestamp::new_millisecond(4))
594            )
595        );
596
597        let timestamp_range: TimestampRange =
598            (Timestamp::new_millisecond(1), Timestamp::new_millisecond(1)).into();
599        assert_eq!(
600            timestamp_range,
601            TimestampRange::from_optional(
602                Some(Timestamp::new_millisecond(1)),
603                Some(Timestamp::new_millisecond(2))
604            )
605        );
606
607        let timestamp_range: TimestampRange =
608            (Timestamp::new_second(1), Timestamp::new_millisecond(3)).into();
609        assert_eq!(timestamp_range, TimestampRange::empty());
610    }
611
612    #[test]
613    fn test_from_timestamp_range() {
614        let timestamp_range: TimestampRange =
615            (Timestamp::new_millisecond(1)..Timestamp::new_millisecond(3)).into();
616        assert_eq!(
617            timestamp_range,
618            TimestampRange::from_optional(
619                Some(Timestamp::new_millisecond(1)),
620                Some(Timestamp::new_millisecond(3))
621            )
622        );
623
624        let timestamp_range: TimestampRange =
625            (Timestamp::new_millisecond(1)..=Timestamp::new_millisecond(3)).into();
626        assert_eq!(
627            timestamp_range,
628            TimestampRange::from_optional(
629                Some(Timestamp::new_millisecond(1)),
630                Some(Timestamp::new_millisecond(4))
631            )
632        );
633
634        let timestamp_range: TimestampRange = (Timestamp::new_millisecond(1)..).into();
635        assert_eq!(
636            timestamp_range,
637            TimestampRange::from_optional(Some(Timestamp::new_millisecond(1)), None)
638        );
639
640        let timestamp_range: TimestampRange = (..Timestamp::new_millisecond(3)).into();
641        assert_eq!(
642            timestamp_range,
643            TimestampRange::from_optional(None, Some(Timestamp::new_millisecond(3)))
644        );
645
646        let timestamp_range: TimestampRange = (..=Timestamp::new_millisecond(3)).into();
647        assert_eq!(
648            timestamp_range,
649            TimestampRange::from_optional(None, Some(Timestamp::new_millisecond(4)))
650        );
651
652        let timestamp_range: TimestampRange = (..).into();
653        assert_eq!(timestamp_range, TimestampRange::min_to_max(),);
654    }
655}