common_function/scalars/timestamp/
to_unixtime.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt;
16use std::sync::Arc;
17
18use common_time::{Date, Timestamp};
19use datafusion_common::DataFusionError;
20use datafusion_common::arrow::array::{
21    Array, ArrayRef, AsArray, Date32Array, Int64Array, Int64Builder,
22};
23use datafusion_common::arrow::compute;
24use datafusion_common::arrow::datatypes::{ArrowTimestampType, DataType, Date32Type, TimeUnit};
25use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility};
26
27use crate::function::{Function, FunctionContext, extract_args, find_function_context};
28use crate::helper::with_match_timestamp_types;
29
30/// A function to convert the column into the unix timestamp in seconds.
31#[derive(Clone, Debug)]
32pub(crate) struct ToUnixtimeFunction {
33    signature: Signature,
34}
35
36impl Default for ToUnixtimeFunction {
37    fn default() -> Self {
38        Self {
39            signature: Signature::uniform(
40                1,
41                vec![
42                    DataType::Utf8,
43                    DataType::Int32,
44                    DataType::Int64,
45                    DataType::Date32,
46                    DataType::Timestamp(TimeUnit::Second, None),
47                    DataType::Timestamp(TimeUnit::Millisecond, None),
48                    DataType::Timestamp(TimeUnit::Microsecond, None),
49                    DataType::Timestamp(TimeUnit::Nanosecond, None),
50                ],
51                Volatility::Immutable,
52            ),
53        }
54    }
55}
56
57const NAME: &str = "to_unixtime";
58
59fn convert_to_seconds(arg: &str, func_ctx: &FunctionContext) -> Option<i64> {
60    let timezone = &func_ctx.query_ctx.timezone();
61    if let Ok(ts) = Timestamp::from_str(arg, Some(timezone)) {
62        return Some(ts.split().0);
63    }
64
65    if let Ok(date) = Date::from_str(arg, Some(timezone)) {
66        return Some(date.to_secs());
67    }
68
69    None
70}
71
72fn convert_timestamps_to_seconds(array: &ArrayRef) -> datafusion_common::Result<Vec<Option<i64>>> {
73    with_match_timestamp_types!(array.data_type(), |$S| {
74        let array = array.as_primitive::<$S>();
75        array
76            .iter()
77            .map(|x| x.map(|i| Timestamp::new(i, $S::UNIT.into()).split().0))
78            .collect::<Vec<_>>()
79    })
80}
81
82fn convert_dates_to_seconds(vector: &Date32Array) -> Vec<Option<i64>> {
83    (0..vector.len())
84        .map(|i| {
85            vector
86                .is_valid(i)
87                .then(|| Date::from(vector.value(i)).to_secs())
88        })
89        .collect::<Vec<Option<i64>>>()
90}
91
92impl Function for ToUnixtimeFunction {
93    fn name(&self) -> &str {
94        NAME
95    }
96
97    fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
98        Ok(DataType::Int64)
99    }
100
101    fn signature(&self) -> &Signature {
102        &self.signature
103    }
104
105    fn invoke_with_args(
106        &self,
107        args: ScalarFunctionArgs,
108    ) -> datafusion_common::Result<ColumnarValue> {
109        let ctx = find_function_context(&args)?;
110        let [arg0] = extract_args(self.name(), &args)?;
111        let result: ArrayRef = match arg0.data_type() {
112            DataType::Utf8 => {
113                let arg0 = arg0.as_string::<i32>();
114                let mut builder = Int64Builder::with_capacity(arg0.len());
115                for i in 0..arg0.len() {
116                    builder.append_option(
117                        arg0.is_valid(i)
118                            .then(|| convert_to_seconds(arg0.value(i), ctx))
119                            .flatten(),
120                    );
121                }
122                Arc::new(builder.finish())
123            }
124            DataType::Int64 | DataType::Int32 => compute::cast(&arg0, &DataType::Int64)?,
125            DataType::Date32 => {
126                let vector = arg0.as_primitive::<Date32Type>();
127                let seconds = convert_dates_to_seconds(vector);
128                Arc::new(Int64Array::from(seconds))
129            }
130            DataType::Timestamp(_, _) => {
131                let seconds = convert_timestamps_to_seconds(&arg0)?;
132                Arc::new(Int64Array::from(seconds))
133            }
134            x => {
135                return Err(DataFusionError::Execution(format!(
136                    "{}: unsupported input data type {x}",
137                    self.name()
138                )));
139            }
140        };
141        Ok(ColumnarValue::Array(result))
142    }
143}
144
145impl fmt::Display for ToUnixtimeFunction {
146    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
147        write!(f, "TO_UNIXTIME")
148    }
149}
150
151#[cfg(test)]
152mod tests {
153    use arrow_schema::Field;
154    use datafusion_common::arrow::array::{
155        StringArray, TimestampMillisecondArray, TimestampSecondArray,
156    };
157    use datafusion_common::arrow::datatypes::Int64Type;
158    use datafusion_common::config::ConfigOptions;
159
160    use super::{ToUnixtimeFunction, *};
161
162    fn test_invoke(arg0: ArrayRef, expects: &[Option<i64>]) {
163        let mut config_options = ConfigOptions::default();
164        config_options.extensions.insert(FunctionContext::default());
165        let config_options = Arc::new(config_options);
166
167        let number_rows = arg0.len();
168        let args = ScalarFunctionArgs {
169            args: vec![ColumnarValue::Array(arg0)],
170            arg_fields: vec![],
171            number_rows,
172            return_field: Arc::new(Field::new("", DataType::Int64, true)),
173            config_options,
174        };
175        let result = ToUnixtimeFunction::default()
176            .invoke_with_args(args)
177            .and_then(|x| x.to_array(number_rows))
178            .unwrap();
179        let result = result.as_primitive::<Int64Type>();
180
181        assert_eq!(result.len(), expects.len());
182        for (actual, expect) in result.iter().zip(expects) {
183            assert_eq!(&actual, expect);
184        }
185    }
186
187    #[test]
188    fn test_string_to_unixtime() {
189        let f = ToUnixtimeFunction::default();
190        assert_eq!("to_unixtime", f.name());
191        assert_eq!(DataType::Int64, f.return_type(&[]).unwrap());
192
193        let times = vec![
194            Some("2023-03-01T06:35:02Z"),
195            None,
196            Some("2022-06-30T23:59:60Z"),
197            Some("invalid_time_stamp"),
198        ];
199        let results = [Some(1677652502), None, Some(1656633600), None];
200        let arg0 = Arc::new(StringArray::from(times));
201        test_invoke(arg0, &results);
202    }
203
204    #[test]
205    fn test_int_to_unixtime() {
206        let times = vec![Some(3_i64), None, Some(5_i64), None];
207        let results = [Some(3), None, Some(5), None];
208        let arg0 = Arc::new(Int64Array::from(times));
209        test_invoke(arg0, &results);
210    }
211
212    #[test]
213    fn test_date_to_unixtime() {
214        let times = vec![Some(123), None, Some(42), None];
215        let results = [Some(10627200), None, Some(3628800), None];
216        let arg0 = Arc::new(Date32Array::from(times));
217        test_invoke(arg0, &results);
218    }
219
220    #[test]
221    fn test_timestamp_to_unixtime() {
222        let times = vec![Some(123), None, Some(42), None];
223        let results = [Some(123), None, Some(42), None];
224        let arg0 = Arc::new(TimestampSecondArray::from(times));
225        test_invoke(arg0, &results);
226
227        let times = vec![Some(123000), None, Some(42000), None];
228        let results = [Some(123), None, Some(42), None];
229        let arg0 = Arc::new(TimestampMillisecondArray::from(times));
230        test_invoke(arg0, &results);
231    }
232}