common_function/scalars/timestamp/
to_unixtime.rs1use std::fmt;
16use std::sync::Arc;
17
18use common_time::{Date, Timestamp};
19use datafusion_common::DataFusionError;
20use datafusion_common::arrow::array::{
21 Array, ArrayRef, AsArray, Date32Array, Int64Array, Int64Builder,
22};
23use datafusion_common::arrow::compute;
24use datafusion_common::arrow::datatypes::{ArrowTimestampType, DataType, Date32Type, TimeUnit};
25use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility};
26
27use crate::function::{Function, FunctionContext, extract_args, find_function_context};
28use crate::helper::with_match_timestamp_types;
29
30#[derive(Clone, Debug)]
32pub(crate) struct ToUnixtimeFunction {
33 signature: Signature,
34}
35
36impl Default for ToUnixtimeFunction {
37 fn default() -> Self {
38 Self {
39 signature: Signature::uniform(
40 1,
41 vec![
42 DataType::Utf8,
43 DataType::Int32,
44 DataType::Int64,
45 DataType::Date32,
46 DataType::Timestamp(TimeUnit::Second, None),
47 DataType::Timestamp(TimeUnit::Millisecond, None),
48 DataType::Timestamp(TimeUnit::Microsecond, None),
49 DataType::Timestamp(TimeUnit::Nanosecond, None),
50 ],
51 Volatility::Immutable,
52 ),
53 }
54 }
55}
56
57const NAME: &str = "to_unixtime";
58
59fn convert_to_seconds(arg: &str, func_ctx: &FunctionContext) -> Option<i64> {
60 let timezone = &func_ctx.query_ctx.timezone();
61 if let Ok(ts) = Timestamp::from_str(arg, Some(timezone)) {
62 return Some(ts.split().0);
63 }
64
65 if let Ok(date) = Date::from_str(arg, Some(timezone)) {
66 return Some(date.to_secs());
67 }
68
69 None
70}
71
72fn convert_timestamps_to_seconds(array: &ArrayRef) -> datafusion_common::Result<Vec<Option<i64>>> {
73 with_match_timestamp_types!(array.data_type(), |$S| {
74 let array = array.as_primitive::<$S>();
75 array
76 .iter()
77 .map(|x| x.map(|i| Timestamp::new(i, $S::UNIT.into()).split().0))
78 .collect::<Vec<_>>()
79 })
80}
81
82fn convert_dates_to_seconds(vector: &Date32Array) -> Vec<Option<i64>> {
83 (0..vector.len())
84 .map(|i| {
85 vector
86 .is_valid(i)
87 .then(|| Date::from(vector.value(i)).to_secs())
88 })
89 .collect::<Vec<Option<i64>>>()
90}
91
92impl Function for ToUnixtimeFunction {
93 fn name(&self) -> &str {
94 NAME
95 }
96
97 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
98 Ok(DataType::Int64)
99 }
100
101 fn signature(&self) -> &Signature {
102 &self.signature
103 }
104
105 fn invoke_with_args(
106 &self,
107 args: ScalarFunctionArgs,
108 ) -> datafusion_common::Result<ColumnarValue> {
109 let ctx = find_function_context(&args)?;
110 let [arg0] = extract_args(self.name(), &args)?;
111 let result: ArrayRef = match arg0.data_type() {
112 DataType::Utf8 => {
113 let arg0 = arg0.as_string::<i32>();
114 let mut builder = Int64Builder::with_capacity(arg0.len());
115 for i in 0..arg0.len() {
116 builder.append_option(
117 arg0.is_valid(i)
118 .then(|| convert_to_seconds(arg0.value(i), ctx))
119 .flatten(),
120 );
121 }
122 Arc::new(builder.finish())
123 }
124 DataType::Int64 | DataType::Int32 => compute::cast(&arg0, &DataType::Int64)?,
125 DataType::Date32 => {
126 let vector = arg0.as_primitive::<Date32Type>();
127 let seconds = convert_dates_to_seconds(vector);
128 Arc::new(Int64Array::from(seconds))
129 }
130 DataType::Timestamp(_, _) => {
131 let seconds = convert_timestamps_to_seconds(&arg0)?;
132 Arc::new(Int64Array::from(seconds))
133 }
134 x => {
135 return Err(DataFusionError::Execution(format!(
136 "{}: unsupported input data type {x}",
137 self.name()
138 )));
139 }
140 };
141 Ok(ColumnarValue::Array(result))
142 }
143}
144
145impl fmt::Display for ToUnixtimeFunction {
146 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
147 write!(f, "TO_UNIXTIME")
148 }
149}
150
151#[cfg(test)]
152mod tests {
153 use arrow_schema::Field;
154 use datafusion_common::arrow::array::{
155 StringArray, TimestampMillisecondArray, TimestampSecondArray,
156 };
157 use datafusion_common::arrow::datatypes::Int64Type;
158 use datafusion_common::config::ConfigOptions;
159
160 use super::{ToUnixtimeFunction, *};
161
162 fn test_invoke(arg0: ArrayRef, expects: &[Option<i64>]) {
163 let mut config_options = ConfigOptions::default();
164 config_options.extensions.insert(FunctionContext::default());
165 let config_options = Arc::new(config_options);
166
167 let number_rows = arg0.len();
168 let args = ScalarFunctionArgs {
169 args: vec![ColumnarValue::Array(arg0)],
170 arg_fields: vec![],
171 number_rows,
172 return_field: Arc::new(Field::new("", DataType::Int64, true)),
173 config_options,
174 };
175 let result = ToUnixtimeFunction::default()
176 .invoke_with_args(args)
177 .and_then(|x| x.to_array(number_rows))
178 .unwrap();
179 let result = result.as_primitive::<Int64Type>();
180
181 assert_eq!(result.len(), expects.len());
182 for (actual, expect) in result.iter().zip(expects) {
183 assert_eq!(&actual, expect);
184 }
185 }
186
187 #[test]
188 fn test_string_to_unixtime() {
189 let f = ToUnixtimeFunction::default();
190 assert_eq!("to_unixtime", f.name());
191 assert_eq!(DataType::Int64, f.return_type(&[]).unwrap());
192
193 let times = vec![
194 Some("2023-03-01T06:35:02Z"),
195 None,
196 Some("2022-06-30T23:59:60Z"),
197 Some("invalid_time_stamp"),
198 ];
199 let results = [Some(1677652502), None, Some(1656633600), None];
200 let arg0 = Arc::new(StringArray::from(times));
201 test_invoke(arg0, &results);
202 }
203
204 #[test]
205 fn test_int_to_unixtime() {
206 let times = vec![Some(3_i64), None, Some(5_i64), None];
207 let results = [Some(3), None, Some(5), None];
208 let arg0 = Arc::new(Int64Array::from(times));
209 test_invoke(arg0, &results);
210 }
211
212 #[test]
213 fn test_date_to_unixtime() {
214 let times = vec![Some(123), None, Some(42), None];
215 let results = [Some(10627200), None, Some(3628800), None];
216 let arg0 = Arc::new(Date32Array::from(times));
217 test_invoke(arg0, &results);
218 }
219
220 #[test]
221 fn test_timestamp_to_unixtime() {
222 let times = vec![Some(123), None, Some(42), None];
223 let results = [Some(123), None, Some(42), None];
224 let arg0 = Arc::new(TimestampSecondArray::from(times));
225 test_invoke(arg0, &results);
226
227 let times = vec![Some(123000), None, Some(42000), None];
228 let results = [Some(123), None, Some(42), None];
229 let arg0 = Arc::new(TimestampMillisecondArray::from(times));
230 test_invoke(arg0, &results);
231 }
232}