common_function/scalars/timestamp/
to_unixtime.rs1use std::fmt;
16use std::sync::Arc;
17
18use common_query::error::{InvalidFuncArgsSnafu, Result, UnsupportedInputDataTypeSnafu};
19use common_query::prelude::{Signature, Volatility};
20use common_time::{Date, Timestamp};
21use datatypes::prelude::ConcreteDataType;
22use datatypes::vectors::{Int64Vector, VectorRef};
23use snafu::ensure;
24
25use crate::function::{Function, FunctionContext};
26
27#[derive(Clone, Debug, Default)]
29pub struct ToUnixtimeFunction;
30
31const NAME: &str = "to_unixtime";
32
33fn convert_to_seconds(arg: &str, func_ctx: &FunctionContext) -> Option<i64> {
34 let timezone = &func_ctx.query_ctx.timezone();
35 if let Ok(ts) = Timestamp::from_str(arg, Some(timezone)) {
36 return Some(ts.split().0);
37 }
38
39 if let Ok(date) = Date::from_str(arg, Some(timezone)) {
40 return Some(date.to_secs());
41 }
42
43 None
44}
45
46fn convert_timestamps_to_seconds(vector: &VectorRef) -> Vec<Option<i64>> {
47 (0..vector.len())
48 .map(|i| vector.get(i).as_timestamp().map(|ts| ts.split().0))
49 .collect::<Vec<Option<i64>>>()
50}
51
52fn convert_dates_to_seconds(vector: &VectorRef) -> Vec<Option<i64>> {
53 (0..vector.len())
54 .map(|i| vector.get(i).as_date().map(|dt| dt.to_secs()))
55 .collect::<Vec<Option<i64>>>()
56}
57
58impl Function for ToUnixtimeFunction {
59 fn name(&self) -> &str {
60 NAME
61 }
62
63 fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
64 Ok(ConcreteDataType::int64_datatype())
65 }
66
67 fn signature(&self) -> Signature {
68 Signature::uniform(
69 1,
70 vec![
71 ConcreteDataType::string_datatype(),
72 ConcreteDataType::int32_datatype(),
73 ConcreteDataType::int64_datatype(),
74 ConcreteDataType::date_datatype(),
75 ConcreteDataType::timestamp_second_datatype(),
76 ConcreteDataType::timestamp_millisecond_datatype(),
77 ConcreteDataType::timestamp_microsecond_datatype(),
78 ConcreteDataType::timestamp_nanosecond_datatype(),
79 ],
80 Volatility::Immutable,
81 )
82 }
83
84 fn eval(&self, ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
85 ensure!(
86 columns.len() == 1,
87 InvalidFuncArgsSnafu {
88 err_msg: format!(
89 "The length of the args is not correct, expect exactly one, have: {}",
90 columns.len()
91 ),
92 }
93 );
94
95 let vector = &columns[0];
96
97 match columns[0].data_type() {
98 ConcreteDataType::String(_) => Ok(Arc::new(Int64Vector::from(
99 (0..vector.len())
100 .map(|i| convert_to_seconds(&vector.get(i).to_string(), ctx))
101 .collect::<Vec<_>>(),
102 ))),
103 ConcreteDataType::Int64(_) | ConcreteDataType::Int32(_) => {
104 Ok(vector.cast(&ConcreteDataType::int64_datatype()).unwrap())
106 }
107 ConcreteDataType::Date(_) => {
108 let seconds = convert_dates_to_seconds(vector);
109 Ok(Arc::new(Int64Vector::from(seconds)))
110 }
111 ConcreteDataType::Timestamp(_) => {
112 let seconds = convert_timestamps_to_seconds(vector);
113 Ok(Arc::new(Int64Vector::from(seconds)))
114 }
115 _ => UnsupportedInputDataTypeSnafu {
116 function: NAME,
117 datatypes: columns.iter().map(|c| c.data_type()).collect::<Vec<_>>(),
118 }
119 .fail(),
120 }
121 }
122}
123
124impl fmt::Display for ToUnixtimeFunction {
125 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
126 write!(f, "TO_UNIXTIME")
127 }
128}
129
130#[cfg(test)]
131mod tests {
132 use common_query::prelude::TypeSignature;
133 use datatypes::prelude::ConcreteDataType;
134 use datatypes::value::Value;
135 use datatypes::vectors::{
136 DateVector, StringVector, TimestampMillisecondVector, TimestampSecondVector,
137 };
138
139 use super::{ToUnixtimeFunction, *};
140
141 #[test]
142 fn test_string_to_unixtime() {
143 let f = ToUnixtimeFunction;
144 assert_eq!("to_unixtime", f.name());
145 assert_eq!(
146 ConcreteDataType::int64_datatype(),
147 f.return_type(&[]).unwrap()
148 );
149
150 assert!(matches!(f.signature(),
151 Signature {
152 type_signature: TypeSignature::Uniform(1, valid_types),
153 volatility: Volatility::Immutable
154 } if valid_types == vec![
155 ConcreteDataType::string_datatype(),
156 ConcreteDataType::int32_datatype(),
157 ConcreteDataType::int64_datatype(),
158 ConcreteDataType::date_datatype(),
159 ConcreteDataType::timestamp_second_datatype(),
160 ConcreteDataType::timestamp_millisecond_datatype(),
161 ConcreteDataType::timestamp_microsecond_datatype(),
162 ConcreteDataType::timestamp_nanosecond_datatype(),
163 ]
164 ));
165
166 let times = vec![
167 Some("2023-03-01T06:35:02Z"),
168 None,
169 Some("2022-06-30T23:59:60Z"),
170 Some("invalid_time_stamp"),
171 ];
172 let results = [Some(1677652502), None, Some(1656633600), None];
173 let args: Vec<VectorRef> = vec![Arc::new(StringVector::from(times.clone()))];
174 let vector = f.eval(&FunctionContext::default(), &args).unwrap();
175 assert_eq!(4, vector.len());
176 for (i, _t) in times.iter().enumerate() {
177 let v = vector.get(i);
178 if i == 1 || i == 3 {
179 assert_eq!(Value::Null, v);
180 continue;
181 }
182 match v {
183 Value::Int64(ts) => {
184 assert_eq!(ts, (*results.get(i).unwrap()).unwrap());
185 }
186 _ => unreachable!(),
187 }
188 }
189 }
190
191 #[test]
192 fn test_int_to_unixtime() {
193 let f = ToUnixtimeFunction;
194
195 let times = vec![Some(3_i64), None, Some(5_i64), None];
196 let results = [Some(3), None, Some(5), None];
197 let args: Vec<VectorRef> = vec![Arc::new(Int64Vector::from(times.clone()))];
198 let vector = f.eval(&FunctionContext::default(), &args).unwrap();
199 assert_eq!(4, vector.len());
200 for (i, _t) in times.iter().enumerate() {
201 let v = vector.get(i);
202 if i == 1 || i == 3 {
203 assert_eq!(Value::Null, v);
204 continue;
205 }
206 match v {
207 Value::Int64(ts) => {
208 assert_eq!(ts, (*results.get(i).unwrap()).unwrap());
209 }
210 _ => unreachable!(),
211 }
212 }
213 }
214
215 #[test]
216 fn test_date_to_unixtime() {
217 let f = ToUnixtimeFunction;
218
219 let times = vec![Some(123), None, Some(42), None];
220 let results = [Some(10627200), None, Some(3628800), None];
221 let date_vector = DateVector::from(times.clone());
222 let args: Vec<VectorRef> = vec![Arc::new(date_vector)];
223 let vector = f.eval(&FunctionContext::default(), &args).unwrap();
224 assert_eq!(4, vector.len());
225 for (i, _t) in times.iter().enumerate() {
226 let v = vector.get(i);
227 if i == 1 || i == 3 {
228 assert_eq!(Value::Null, v);
229 continue;
230 }
231 match v {
232 Value::Int64(ts) => {
233 assert_eq!(ts, (*results.get(i).unwrap()).unwrap());
234 }
235 _ => unreachable!(),
236 }
237 }
238 }
239
240 #[test]
241 fn test_timestamp_to_unixtime() {
242 let f = ToUnixtimeFunction;
243
244 let times = vec![Some(123), None, Some(42), None];
245 let results = [Some(123), None, Some(42), None];
246 let ts_vector = TimestampSecondVector::from(times.clone());
247 let args: Vec<VectorRef> = vec![Arc::new(ts_vector)];
248 let vector = f.eval(&FunctionContext::default(), &args).unwrap();
249 assert_eq!(4, vector.len());
250 for (i, _t) in times.iter().enumerate() {
251 let v = vector.get(i);
252 if i == 1 || i == 3 {
253 assert_eq!(Value::Null, v);
254 continue;
255 }
256 match v {
257 Value::Int64(ts) => {
258 assert_eq!(ts, (*results.get(i).unwrap()).unwrap());
259 }
260 _ => unreachable!(),
261 }
262 }
263
264 let times = vec![Some(123000), None, Some(42000), None];
265 let results = [Some(123), None, Some(42), None];
266 let ts_vector = TimestampMillisecondVector::from(times.clone());
267 let args: Vec<VectorRef> = vec![Arc::new(ts_vector)];
268 let vector = f.eval(&FunctionContext::default(), &args).unwrap();
269 assert_eq!(4, vector.len());
270 for (i, _t) in times.iter().enumerate() {
271 let v = vector.get(i);
272 if i == 1 || i == 3 {
273 assert_eq!(Value::Null, v);
274 continue;
275 }
276 match v {
277 Value::Int64(ts) => {
278 assert_eq!(ts, (*results.get(i).unwrap()).unwrap());
279 }
280 _ => unreachable!(),
281 }
282 }
283 }
284}