1use arrow_array::{
16 ArrayRef, PrimitiveArray, TimestampMicrosecondArray, TimestampMillisecondArray,
17 TimestampNanosecondArray, TimestampSecondArray,
18};
19use arrow_schema::DataType;
20use common_time::timestamp::TimeUnit;
21use common_time::Timestamp;
22use paste::paste;
23use serde::{Deserialize, Serialize};
24
25use crate::prelude::{Scalar, Value, ValueRef};
26use crate::scalars::ScalarRef;
27use crate::types::{
28 TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
29 TimestampSecondType, WrapperType,
30};
31use crate::vectors::{
32 TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
33 TimestampSecondVector,
34};
35
36macro_rules! define_timestamp_with_unit {
37 ($unit: ident) => {
38 paste! {
39 #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
40 pub struct [<Timestamp $unit>](pub Timestamp);
41
42 impl [<Timestamp $unit>] {
43 pub fn new(val: i64) -> Self {
44 Self(Timestamp::new(val, TimeUnit::$unit))
45 }
46 }
47
48 impl Default for [<Timestamp $unit>] {
49 fn default() -> Self {
50 Self::new(0)
51 }
52 }
53
54 impl From<[<Timestamp $unit>]> for Value {
55 fn from(t: [<Timestamp $unit>]) -> Value {
56 Value::Timestamp(t.0)
57 }
58 }
59
60 impl From<[<Timestamp $unit>]> for serde_json::Value {
61 fn from(t: [<Timestamp $unit>]) -> Self {
62 t.0.into()
63 }
64 }
65
66 impl From<[<Timestamp $unit>]> for ValueRef<'static> {
67 fn from(t: [<Timestamp $unit>]) -> Self {
68 ValueRef::Timestamp(t.0)
69 }
70 }
71
72 impl Scalar for [<Timestamp $unit>] {
73 type VectorType = [<Timestamp $unit Vector>];
74 type RefType<'a> = [<Timestamp $unit>];
75
76 fn as_scalar_ref(&self) -> Self::RefType<'_> {
77 *self
78 }
79
80 fn upcast_gat<'short, 'long: 'short>(
81 long: Self::RefType<'long>,
82 ) -> Self::RefType<'short> {
83 long
84 }
85 }
86
87 impl<'a> ScalarRef<'a> for [<Timestamp $unit>] {
88 type ScalarType = [<Timestamp $unit>];
89
90 fn to_owned_scalar(&self) -> Self::ScalarType {
91 *self
92 }
93 }
94
95 impl WrapperType for [<Timestamp $unit>] {
96 type LogicalType = [<Timestamp $unit Type>];
97 type Native = i64;
98
99 fn from_native(value: Self::Native) -> Self {
100 Self::new(value)
101 }
102
103 fn into_native(self) -> Self::Native {
104 self.0.into()
105 }
106 }
107
108 impl From<i64> for [<Timestamp $unit>] {
109 fn from(val: i64) -> Self {
110 [<Timestamp $unit>]::from_native(val)
111 }
112 }
113
114 impl From<[<Timestamp $unit>]> for i64{
115 fn from(val: [<Timestamp $unit>]) -> Self {
116 val.0.value()
117 }
118 }
119
120 impl TryFrom<Value> for Option<[<Timestamp $unit>]> {
121 type Error = $crate::error::Error;
122
123 #[inline]
124 fn try_from(from: Value) -> std::result::Result<Self, Self::Error> {
125 match from {
126 Value::Timestamp(v) if v.unit() == TimeUnit::$unit => {
127 Ok(Some([<Timestamp $unit>](v)))
128 },
129 Value::Null => Ok(None),
130 _ => $crate::error::TryFromValueSnafu {
131 reason: format!("{:?} is not a {}", from, stringify!([<Timestamp $unit>])),
132 }
133 .fail(),
134 }
135 }
136 }
137 }
138 };
139}
140
141define_timestamp_with_unit!(Second);
142define_timestamp_with_unit!(Millisecond);
143define_timestamp_with_unit!(Microsecond);
144define_timestamp_with_unit!(Nanosecond);
145
146pub fn timestamp_array_to_primitive(
147 ts_array: &ArrayRef,
148) -> Option<(
149 PrimitiveArray<arrow_array::types::Int64Type>,
150 arrow::datatypes::TimeUnit,
151)> {
152 let DataType::Timestamp(unit, _) = ts_array.data_type() else {
153 return None;
154 };
155
156 let ts_primitive = match unit {
157 arrow_schema::TimeUnit::Second => ts_array
158 .as_any()
159 .downcast_ref::<TimestampSecondArray>()
160 .unwrap()
161 .reinterpret_cast::<arrow_array::types::Int64Type>(),
162 arrow_schema::TimeUnit::Millisecond => ts_array
163 .as_any()
164 .downcast_ref::<TimestampMillisecondArray>()
165 .unwrap()
166 .reinterpret_cast::<arrow_array::types::Int64Type>(),
167 arrow_schema::TimeUnit::Microsecond => ts_array
168 .as_any()
169 .downcast_ref::<TimestampMicrosecondArray>()
170 .unwrap()
171 .reinterpret_cast::<arrow_array::types::Int64Type>(),
172 arrow_schema::TimeUnit::Nanosecond => ts_array
173 .as_any()
174 .downcast_ref::<TimestampNanosecondArray>()
175 .unwrap()
176 .reinterpret_cast::<arrow_array::types::Int64Type>(),
177 };
178 Some((ts_primitive, *unit))
179}
180
181#[cfg(test)]
182mod tests {
183 use common_time::timezone::set_default_timezone;
184
185 use super::*;
186
187 #[test]
188 fn test_to_serde_json_value() {
189 set_default_timezone(Some("Asia/Shanghai")).unwrap();
190 let ts = TimestampSecond::new(123);
191 let val = serde_json::Value::from(ts);
192 match val {
193 serde_json::Value::String(s) => {
194 assert_eq!("1970-01-01 08:02:03+0800", s);
195 }
196 _ => unreachable!(),
197 }
198 }
199
200 #[test]
201 fn test_timestamp_scalar() {
202 let ts = TimestampSecond::new(123);
203 assert_eq!(ts, ts.as_scalar_ref());
204 assert_eq!(ts, ts.to_owned_scalar());
205 let ts = TimestampMillisecond::new(123);
206 assert_eq!(ts, ts.as_scalar_ref());
207 assert_eq!(ts, ts.to_owned_scalar());
208 let ts = TimestampMicrosecond::new(123);
209 assert_eq!(ts, ts.as_scalar_ref());
210 assert_eq!(ts, ts.to_owned_scalar());
211 let ts = TimestampNanosecond::new(123);
212 assert_eq!(ts, ts.as_scalar_ref());
213 assert_eq!(ts, ts.to_owned_scalar());
214 }
215}