common_function/scalars/timestamp/
to_unixtime.rs1use std::fmt;
16use std::sync::Arc;
17
18use common_query::error::{InvalidFuncArgsSnafu, Result, UnsupportedInputDataTypeSnafu};
19use common_time::{Date, Timestamp};
20use datafusion_expr::{Signature, Volatility};
21use datatypes::arrow::datatypes::{DataType, TimeUnit};
22use datatypes::prelude::ConcreteDataType;
23use datatypes::vectors::{Int64Vector, VectorRef};
24use snafu::ensure;
25
26use crate::function::{Function, FunctionContext};
27
28#[derive(Clone, Debug, Default)]
30pub struct ToUnixtimeFunction;
31
32const NAME: &str = "to_unixtime";
33
34fn convert_to_seconds(arg: &str, func_ctx: &FunctionContext) -> Option<i64> {
35 let timezone = &func_ctx.query_ctx.timezone();
36 if let Ok(ts) = Timestamp::from_str(arg, Some(timezone)) {
37 return Some(ts.split().0);
38 }
39
40 if let Ok(date) = Date::from_str(arg, Some(timezone)) {
41 return Some(date.to_secs());
42 }
43
44 None
45}
46
47fn convert_timestamps_to_seconds(vector: &VectorRef) -> Vec<Option<i64>> {
48 (0..vector.len())
49 .map(|i| vector.get(i).as_timestamp().map(|ts| ts.split().0))
50 .collect::<Vec<Option<i64>>>()
51}
52
53fn convert_dates_to_seconds(vector: &VectorRef) -> Vec<Option<i64>> {
54 (0..vector.len())
55 .map(|i| vector.get(i).as_date().map(|dt| dt.to_secs()))
56 .collect::<Vec<Option<i64>>>()
57}
58
59impl Function for ToUnixtimeFunction {
60 fn name(&self) -> &str {
61 NAME
62 }
63
64 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
65 Ok(DataType::Int64)
66 }
67
68 fn signature(&self) -> Signature {
69 Signature::uniform(
70 1,
71 vec![
72 DataType::Utf8,
73 DataType::Int32,
74 DataType::Int64,
75 DataType::Date32,
76 DataType::Timestamp(TimeUnit::Second, None),
77 DataType::Timestamp(TimeUnit::Millisecond, None),
78 DataType::Timestamp(TimeUnit::Microsecond, None),
79 DataType::Timestamp(TimeUnit::Nanosecond, None),
80 ],
81 Volatility::Immutable,
82 )
83 }
84
85 fn eval(&self, ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
86 ensure!(
87 columns.len() == 1,
88 InvalidFuncArgsSnafu {
89 err_msg: format!(
90 "The length of the args is not correct, expect exactly one, have: {}",
91 columns.len()
92 ),
93 }
94 );
95
96 let vector = &columns[0];
97
98 match columns[0].data_type() {
99 ConcreteDataType::String(_) => Ok(Arc::new(Int64Vector::from(
100 (0..vector.len())
101 .map(|i| convert_to_seconds(&vector.get(i).to_string(), ctx))
102 .collect::<Vec<_>>(),
103 ))),
104 ConcreteDataType::Int64(_) | ConcreteDataType::Int32(_) => {
105 Ok(vector.cast(&ConcreteDataType::int64_datatype()).unwrap())
107 }
108 ConcreteDataType::Date(_) => {
109 let seconds = convert_dates_to_seconds(vector);
110 Ok(Arc::new(Int64Vector::from(seconds)))
111 }
112 ConcreteDataType::Timestamp(_) => {
113 let seconds = convert_timestamps_to_seconds(vector);
114 Ok(Arc::new(Int64Vector::from(seconds)))
115 }
116 _ => UnsupportedInputDataTypeSnafu {
117 function: NAME,
118 datatypes: columns.iter().map(|c| c.data_type()).collect::<Vec<_>>(),
119 }
120 .fail(),
121 }
122 }
123}
124
125impl fmt::Display for ToUnixtimeFunction {
126 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
127 write!(f, "TO_UNIXTIME")
128 }
129}
130
131#[cfg(test)]
132mod tests {
133 use datafusion_expr::TypeSignature;
134 use datatypes::value::Value;
135 use datatypes::vectors::{
136 DateVector, StringVector, TimestampMillisecondVector, TimestampSecondVector,
137 };
138
139 use super::{ToUnixtimeFunction, *};
140
141 #[test]
142 fn test_string_to_unixtime() {
143 let f = ToUnixtimeFunction;
144 assert_eq!("to_unixtime", f.name());
145 assert_eq!(DataType::Int64, f.return_type(&[]).unwrap());
146
147 assert!(matches!(f.signature(),
148 Signature {
149 type_signature: TypeSignature::Uniform(1, valid_types),
150 volatility: Volatility::Immutable
151 } if valid_types == vec![
152 DataType::Utf8,
153 DataType::Int32,
154 DataType::Int64,
155 DataType::Date32,
156 DataType::Timestamp(TimeUnit::Second, None),
157 DataType::Timestamp(TimeUnit::Millisecond, None),
158 DataType::Timestamp(TimeUnit::Microsecond, None),
159 DataType::Timestamp(TimeUnit::Nanosecond, None),
160 ]
161 ));
162
163 let times = vec![
164 Some("2023-03-01T06:35:02Z"),
165 None,
166 Some("2022-06-30T23:59:60Z"),
167 Some("invalid_time_stamp"),
168 ];
169 let results = [Some(1677652502), None, Some(1656633600), None];
170 let args: Vec<VectorRef> = vec![Arc::new(StringVector::from(times.clone()))];
171 let vector = f.eval(&FunctionContext::default(), &args).unwrap();
172 assert_eq!(4, vector.len());
173 for (i, _t) in times.iter().enumerate() {
174 let v = vector.get(i);
175 if i == 1 || i == 3 {
176 assert_eq!(Value::Null, v);
177 continue;
178 }
179 match v {
180 Value::Int64(ts) => {
181 assert_eq!(ts, (*results.get(i).unwrap()).unwrap());
182 }
183 _ => unreachable!(),
184 }
185 }
186 }
187
188 #[test]
189 fn test_int_to_unixtime() {
190 let f = ToUnixtimeFunction;
191
192 let times = vec![Some(3_i64), None, Some(5_i64), None];
193 let results = [Some(3), None, Some(5), None];
194 let args: Vec<VectorRef> = vec![Arc::new(Int64Vector::from(times.clone()))];
195 let vector = f.eval(&FunctionContext::default(), &args).unwrap();
196 assert_eq!(4, vector.len());
197 for (i, _t) in times.iter().enumerate() {
198 let v = vector.get(i);
199 if i == 1 || i == 3 {
200 assert_eq!(Value::Null, v);
201 continue;
202 }
203 match v {
204 Value::Int64(ts) => {
205 assert_eq!(ts, (*results.get(i).unwrap()).unwrap());
206 }
207 _ => unreachable!(),
208 }
209 }
210 }
211
212 #[test]
213 fn test_date_to_unixtime() {
214 let f = ToUnixtimeFunction;
215
216 let times = vec![Some(123), None, Some(42), None];
217 let results = [Some(10627200), None, Some(3628800), None];
218 let date_vector = DateVector::from(times.clone());
219 let args: Vec<VectorRef> = vec![Arc::new(date_vector)];
220 let vector = f.eval(&FunctionContext::default(), &args).unwrap();
221 assert_eq!(4, vector.len());
222 for (i, _t) in times.iter().enumerate() {
223 let v = vector.get(i);
224 if i == 1 || i == 3 {
225 assert_eq!(Value::Null, v);
226 continue;
227 }
228 match v {
229 Value::Int64(ts) => {
230 assert_eq!(ts, (*results.get(i).unwrap()).unwrap());
231 }
232 _ => unreachable!(),
233 }
234 }
235 }
236
237 #[test]
238 fn test_timestamp_to_unixtime() {
239 let f = ToUnixtimeFunction;
240
241 let times = vec![Some(123), None, Some(42), None];
242 let results = [Some(123), None, Some(42), None];
243 let ts_vector = TimestampSecondVector::from(times.clone());
244 let args: Vec<VectorRef> = vec![Arc::new(ts_vector)];
245 let vector = f.eval(&FunctionContext::default(), &args).unwrap();
246 assert_eq!(4, vector.len());
247 for (i, _t) in times.iter().enumerate() {
248 let v = vector.get(i);
249 if i == 1 || i == 3 {
250 assert_eq!(Value::Null, v);
251 continue;
252 }
253 match v {
254 Value::Int64(ts) => {
255 assert_eq!(ts, (*results.get(i).unwrap()).unwrap());
256 }
257 _ => unreachable!(),
258 }
259 }
260
261 let times = vec![Some(123000), None, Some(42000), None];
262 let results = [Some(123), None, Some(42), None];
263 let ts_vector = TimestampMillisecondVector::from(times.clone());
264 let args: Vec<VectorRef> = vec![Arc::new(ts_vector)];
265 let vector = f.eval(&FunctionContext::default(), &args).unwrap();
266 assert_eq!(4, vector.len());
267 for (i, _t) in times.iter().enumerate() {
268 let v = vector.get(i);
269 if i == 1 || i == 3 {
270 assert_eq!(Value::Null, v);
271 continue;
272 }
273 match v {
274 Value::Int64(ts) => {
275 assert_eq!(ts, (*results.get(i).unwrap()).unwrap());
276 }
277 _ => unreachable!(),
278 }
279 }
280 }
281}