datatypes/
timestamp.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use arrow_array::{
16    ArrayRef, PrimitiveArray, TimestampMicrosecondArray, TimestampMillisecondArray,
17    TimestampNanosecondArray, TimestampSecondArray,
18};
19use arrow_schema::DataType;
20use common_time::timestamp::TimeUnit;
21use common_time::Timestamp;
22use paste::paste;
23use serde::{Deserialize, Serialize};
24
25use crate::prelude::{Scalar, Value, ValueRef};
26use crate::scalars::ScalarRef;
27use crate::types::{
28    TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
29    TimestampSecondType, WrapperType,
30};
31use crate::vectors::{
32    TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
33    TimestampSecondVector,
34};
35
36macro_rules! define_timestamp_with_unit {
37    ($unit: ident) => {
38        paste! {
39            #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
40            pub struct [<Timestamp $unit>](pub Timestamp);
41
42            impl [<Timestamp $unit>] {
43                pub fn new(val: i64) -> Self {
44                    Self(Timestamp::new(val, TimeUnit::$unit))
45                }
46            }
47
48            impl Default for [<Timestamp $unit>] {
49                fn default() -> Self {
50                    Self::new(0)
51                }
52            }
53
54            impl From<[<Timestamp $unit>]> for Value {
55                fn from(t: [<Timestamp $unit>]) -> Value {
56                    Value::Timestamp(t.0)
57                }
58            }
59
60            impl From<[<Timestamp $unit>]> for serde_json::Value {
61                fn from(t: [<Timestamp $unit>]) -> Self {
62                    t.0.into()
63                }
64            }
65
66            impl From<[<Timestamp $unit>]> for ValueRef<'static> {
67                fn from(t: [<Timestamp $unit>]) -> Self {
68                    ValueRef::Timestamp(t.0)
69                }
70            }
71
72            impl Scalar for [<Timestamp $unit>] {
73                type VectorType = [<Timestamp $unit Vector>];
74                type RefType<'a> = [<Timestamp $unit>];
75
76                fn as_scalar_ref(&self) -> Self::RefType<'_> {
77                    *self
78                }
79
80                fn upcast_gat<'short, 'long: 'short>(
81                    long: Self::RefType<'long>,
82                ) -> Self::RefType<'short> {
83                    long
84                }
85            }
86
87            impl<'a> ScalarRef<'a> for [<Timestamp $unit>] {
88                type ScalarType = [<Timestamp $unit>];
89
90                fn to_owned_scalar(&self) -> Self::ScalarType {
91                    *self
92                }
93            }
94
95            impl WrapperType for [<Timestamp $unit>] {
96                type LogicalType = [<Timestamp $unit Type>];
97                type Native = i64;
98
99                fn from_native(value: Self::Native) -> Self {
100                    Self::new(value)
101                }
102
103                fn into_native(self) -> Self::Native {
104                    self.0.into()
105                }
106            }
107
108            impl From<i64> for [<Timestamp $unit>] {
109                fn from(val: i64) -> Self {
110                    [<Timestamp $unit>]::from_native(val)
111                }
112            }
113
114            impl From<[<Timestamp $unit>]> for i64{
115                fn from(val: [<Timestamp $unit>]) -> Self {
116                    val.0.value()
117                }
118            }
119
120            impl TryFrom<Value> for Option<[<Timestamp $unit>]> {
121                type Error = $crate::error::Error;
122
123                #[inline]
124                fn try_from(from: Value) -> std::result::Result<Self, Self::Error> {
125                    match from {
126                        Value::Timestamp(v) if v.unit() == TimeUnit::$unit => {
127                            Ok(Some([<Timestamp $unit>](v)))
128                        },
129                        Value::Null => Ok(None),
130                        _ => $crate::error::TryFromValueSnafu {
131                            reason: format!("{:?} is not a {}", from, stringify!([<Timestamp $unit>])),
132                        }
133                        .fail(),
134                    }
135                }
136            }
137        }
138    };
139}
140
141define_timestamp_with_unit!(Second);
142define_timestamp_with_unit!(Millisecond);
143define_timestamp_with_unit!(Microsecond);
144define_timestamp_with_unit!(Nanosecond);
145
146pub fn timestamp_array_to_primitive(
147    ts_array: &ArrayRef,
148) -> Option<(
149    PrimitiveArray<arrow_array::types::Int64Type>,
150    arrow::datatypes::TimeUnit,
151)> {
152    let DataType::Timestamp(unit, _) = ts_array.data_type() else {
153        return None;
154    };
155
156    let ts_primitive = match unit {
157        arrow_schema::TimeUnit::Second => ts_array
158            .as_any()
159            .downcast_ref::<TimestampSecondArray>()
160            .unwrap()
161            .reinterpret_cast::<arrow_array::types::Int64Type>(),
162        arrow_schema::TimeUnit::Millisecond => ts_array
163            .as_any()
164            .downcast_ref::<TimestampMillisecondArray>()
165            .unwrap()
166            .reinterpret_cast::<arrow_array::types::Int64Type>(),
167        arrow_schema::TimeUnit::Microsecond => ts_array
168            .as_any()
169            .downcast_ref::<TimestampMicrosecondArray>()
170            .unwrap()
171            .reinterpret_cast::<arrow_array::types::Int64Type>(),
172        arrow_schema::TimeUnit::Nanosecond => ts_array
173            .as_any()
174            .downcast_ref::<TimestampNanosecondArray>()
175            .unwrap()
176            .reinterpret_cast::<arrow_array::types::Int64Type>(),
177    };
178    Some((ts_primitive, *unit))
179}
180
181#[cfg(test)]
182mod tests {
183    use common_time::timezone::set_default_timezone;
184
185    use super::*;
186
187    #[test]
188    fn test_to_serde_json_value() {
189        set_default_timezone(Some("Asia/Shanghai")).unwrap();
190        let ts = TimestampSecond::new(123);
191        let val = serde_json::Value::from(ts);
192        match val {
193            serde_json::Value::String(s) => {
194                assert_eq!("1970-01-01 08:02:03+0800", s);
195            }
196            _ => unreachable!(),
197        }
198    }
199
200    #[test]
201    fn test_timestamp_scalar() {
202        let ts = TimestampSecond::new(123);
203        assert_eq!(ts, ts.as_scalar_ref());
204        assert_eq!(ts, ts.to_owned_scalar());
205        let ts = TimestampMillisecond::new(123);
206        assert_eq!(ts, ts.as_scalar_ref());
207        assert_eq!(ts, ts.to_owned_scalar());
208        let ts = TimestampMicrosecond::new(123);
209        assert_eq!(ts, ts.as_scalar_ref());
210        assert_eq!(ts, ts.to_owned_scalar());
211        let ts = TimestampNanosecond::new(123);
212        assert_eq!(ts, ts.as_scalar_ref());
213        assert_eq!(ts, ts.to_owned_scalar());
214    }
215}