1use arrow::datatypes::{
16 DataType as ArrowDataType, TimeUnit as ArrowTimeUnit,
17 TimestampMicrosecondType as ArrowTimestampMicrosecondType,
18 TimestampMillisecondType as ArrowTimestampMillisecondType,
19 TimestampNanosecondType as ArrowTimestampNanosecondType,
20 TimestampSecondType as ArrowTimestampSecondType,
21};
22use common_time::timestamp::TimeUnit;
23use common_time::Timestamp;
24use enum_dispatch::enum_dispatch;
25use paste::paste;
26use serde::{Deserialize, Serialize};
27use snafu::OptionExt;
28
29use crate::data_type::ConcreteDataType;
30use crate::error;
31use crate::error::InvalidTimestampPrecisionSnafu;
32use crate::prelude::{
33 DataType, LogicalTypeId, MutableVector, ScalarVectorBuilder, Value, ValueRef, Vector,
34};
35use crate::timestamp::{
36 TimestampMicrosecond, TimestampMillisecond, TimestampNanosecond, TimestampSecond,
37};
38use crate::types::LogicalPrimitiveType;
39use crate::vectors::{
40 PrimitiveVector, TimestampMicrosecondVector, TimestampMicrosecondVectorBuilder,
41 TimestampMillisecondVector, TimestampMillisecondVectorBuilder, TimestampNanosecondVector,
42 TimestampNanosecondVectorBuilder, TimestampSecondVector, TimestampSecondVectorBuilder,
43};
44
45const SECOND_VARIATION: u64 = 0;
46const MILLISECOND_VARIATION: u64 = 3;
47const MICROSECOND_VARIATION: u64 = 6;
48const NANOSECOND_VARIATION: u64 = 9;
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
51#[enum_dispatch(DataType)]
52pub enum TimestampType {
53 Second(TimestampSecondType),
54 Millisecond(TimestampMillisecondType),
55 Microsecond(TimestampMicrosecondType),
56 Nanosecond(TimestampNanosecondType),
57}
58
59impl TryFrom<u64> for TimestampType {
60 type Error = error::Error;
61
62 fn try_from(value: u64) -> Result<Self, Self::Error> {
68 match value {
69 SECOND_VARIATION => Ok(TimestampType::Second(TimestampSecondType)),
70 MILLISECOND_VARIATION => Ok(TimestampType::Millisecond(TimestampMillisecondType)),
71 MICROSECOND_VARIATION => Ok(TimestampType::Microsecond(TimestampMicrosecondType)),
72 NANOSECOND_VARIATION => Ok(TimestampType::Nanosecond(TimestampNanosecondType)),
73 _ => InvalidTimestampPrecisionSnafu { precision: value }.fail(),
74 }
75 }
76}
77
78impl TimestampType {
79 pub fn unit(&self) -> TimeUnit {
81 match self {
82 TimestampType::Second(_) => TimeUnit::Second,
83 TimestampType::Millisecond(_) => TimeUnit::Millisecond,
84 TimestampType::Microsecond(_) => TimeUnit::Microsecond,
85 TimestampType::Nanosecond(_) => TimeUnit::Nanosecond,
86 }
87 }
88
89 pub fn create_timestamp(&self, val: i64) -> Timestamp {
90 Timestamp::new(val, self.unit())
91 }
92
93 pub fn precision(&self) -> u64 {
94 match self {
95 TimestampType::Second(_) => SECOND_VARIATION,
96 TimestampType::Millisecond(_) => MILLISECOND_VARIATION,
97 TimestampType::Microsecond(_) => MICROSECOND_VARIATION,
98 TimestampType::Nanosecond(_) => NANOSECOND_VARIATION,
99 }
100 }
101}
102
103macro_rules! impl_data_type_for_timestamp {
104 ($unit: ident) => {
105 paste! {
106 #[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
107 pub struct [<Timestamp $unit Type>];
108
109 impl DataType for [<Timestamp $unit Type>] {
110 fn name(&self) -> String {
111 stringify!([<Timestamp $unit>]).to_string()
112 }
113
114 fn logical_type_id(&self) -> LogicalTypeId {
115 LogicalTypeId::[<Timestamp $unit>]
116 }
117
118 fn default_value(&self) -> Value {
119 Value::Timestamp(Timestamp::new(0, TimeUnit::$unit))
120 }
121
122 fn as_arrow_type(&self) -> ArrowDataType {
123 ArrowDataType::Timestamp(ArrowTimeUnit::$unit, None)
124 }
125
126 fn create_mutable_vector(&self, capacity: usize) -> Box<dyn MutableVector> {
127 Box::new([<Timestamp $unit Vector Builder>]::with_capacity(capacity))
128 }
129
130 fn try_cast(&self, from: Value)-> Option<Value>{
131 match from {
132 Value::Timestamp(v) => v.convert_to(TimeUnit::$unit).map(Value::Timestamp),
133 Value::String(v) => Timestamp::from_str_utc(v.as_utf8()).map(Value::Timestamp).ok(),
134 Value::Int64(v) => Some(Value::Timestamp(Timestamp::new(v, TimeUnit::$unit))),
135 Value::Date(v) => Timestamp::new_second(v.to_secs()).convert_to(TimeUnit::$unit).map(Value::Timestamp),
136 _ => None
137 }
138 }
139 }
140
141 impl LogicalPrimitiveType for [<Timestamp $unit Type>] {
142 type ArrowPrimitive = [<Arrow Timestamp $unit Type>];
143 type Native = i64;
144 type Wrapper = [<Timestamp $unit>];
145 type LargestType = Self;
146
147 fn build_data_type() -> ConcreteDataType {
148 ConcreteDataType::Timestamp(TimestampType::$unit(
149 [<Timestamp $unit Type>]::default(),
150 ))
151 }
152
153 fn type_name() -> &'static str {
154 stringify!([<Timestamp $unit Type>])
155 }
156
157 fn cast_vector(vector: &dyn Vector) -> crate::Result<&PrimitiveVector<Self>> {
158 vector
159 .as_any()
160 .downcast_ref::<[<Timestamp $unit Vector>]>()
161 .with_context(|| error::CastTypeSnafu {
162 msg: format!(
163 "Failed to cast {} to {}",
164 vector.vector_type_name(), stringify!([<Timestamp $unit Vector>])
165 ),
166 })
167 }
168
169 fn cast_value_ref(value: ValueRef) -> crate::Result<Option<Self::Wrapper>> {
170 match value {
171 ValueRef::Null => Ok(None),
172 ValueRef::Int64(v) =>{
173 Ok(Some([<Timestamp $unit>]::from(v)))
174 }
175 ValueRef::Timestamp(t) => match t.unit() {
176 TimeUnit::$unit => Ok(Some([<Timestamp $unit>](t))),
177 other => error::CastTypeSnafu {
178 msg: format!(
179 "Failed to cast Timestamp value with different unit {:?} to {}",
180 other, stringify!([<Timestamp $unit>])
181 ),
182 }
183 .fail(),
184 },
185 other => error::CastTypeSnafu {
186 msg: format!("Failed to cast value {:?} to {}", other, stringify!([<Timestamp $unit>])),
187 }
188 .fail(),
189 }
190 }
191 }
192 }
193 }
194}
195
196impl_data_type_for_timestamp!(Nanosecond);
197impl_data_type_for_timestamp!(Second);
198impl_data_type_for_timestamp!(Millisecond);
199impl_data_type_for_timestamp!(Microsecond);
200
201#[cfg(test)]
202mod tests {
203 use common_time::timezone::set_default_timezone;
204 use common_time::Date;
205
206 use super::*;
207
208 #[test]
209 fn test_timestamp_type_unit() {
210 assert_eq!(
211 TimeUnit::Second,
212 TimestampType::Second(TimestampSecondType).unit()
213 );
214 assert_eq!(
215 TimeUnit::Millisecond,
216 TimestampType::Millisecond(TimestampMillisecondType).unit()
217 );
218 assert_eq!(
219 TimeUnit::Microsecond,
220 TimestampType::Microsecond(TimestampMicrosecondType).unit()
221 );
222 assert_eq!(
223 TimeUnit::Nanosecond,
224 TimestampType::Nanosecond(TimestampNanosecondType).unit()
225 );
226 }
227
228 #[test]
230 fn test_timestamp_cast() {
231 set_default_timezone(Some("Asia/Shanghai")).unwrap();
232 let s = Value::String("2021-01-01 01:02:03".to_string().into());
234 let ts = ConcreteDataType::timestamp_second_datatype()
235 .try_cast(s)
236 .unwrap();
237 assert_eq!(ts, Value::Timestamp(Timestamp::new_second(1609462923)));
239 let s = Value::String("12345".to_string().into());
241 let ts = ConcreteDataType::timestamp_second_datatype().try_cast(s);
242 assert_eq!(ts, None);
243
244 let n = Value::Int64(1694589525);
245 let ts = ConcreteDataType::timestamp_second_datatype()
247 .try_cast(n)
248 .unwrap();
249 assert_eq!(ts, Value::Timestamp(Timestamp::new_second(1694589525)));
250
251 let d = Value::Date(Date::from_str_utc("1970-01-01").unwrap());
253 let ts = ConcreteDataType::timestamp_millisecond_datatype()
254 .try_cast(d)
255 .unwrap();
256 assert_eq!(ts, Value::Timestamp(Timestamp::new_millisecond(0)));
257
258 let second = Value::Timestamp(Timestamp::new_second(123));
260 let microsecond = ConcreteDataType::timestamp_microsecond_datatype()
261 .try_cast(second)
262 .unwrap();
263 assert_eq!(
264 microsecond,
265 Value::Timestamp(Timestamp::new_microsecond(123 * 1000000))
266 )
267 }
268}