common_function/scalars/date/
date_format.rs1use std::fmt;
16use std::sync::Arc;
17
18use common_error::ext::BoxedError;
19use common_query::error;
20use common_time::{Date, Timestamp};
21use datafusion_common::DataFusionError;
22use datafusion_common::arrow::array::{Array, AsArray, StringViewBuilder};
23use datafusion_common::arrow::datatypes::{
24 ArrowTimestampType, DataType, Date32Type, Date64Type, TimeUnit,
25};
26use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature};
27use snafu::ResultExt;
28
29use crate::function::{Function, extract_args, find_function_context};
30use crate::helper;
31use crate::helper::with_match_timestamp_types;
32
33#[derive(Clone, Debug)]
35pub(crate) struct DateFormatFunction {
36 signature: Signature,
37}
38
39impl Default for DateFormatFunction {
40 fn default() -> Self {
41 Self {
42 signature: helper::one_of_sigs2(
43 vec![
44 DataType::Date32,
45 DataType::Date64,
46 DataType::Timestamp(TimeUnit::Second, None),
47 DataType::Timestamp(TimeUnit::Millisecond, None),
48 DataType::Timestamp(TimeUnit::Microsecond, None),
49 DataType::Timestamp(TimeUnit::Nanosecond, None),
50 ],
51 vec![DataType::Utf8],
52 ),
53 }
54 }
55}
56
57impl Function for DateFormatFunction {
58 fn name(&self) -> &str {
59 "date_format"
60 }
61
62 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
63 Ok(DataType::Utf8View)
64 }
65
66 fn signature(&self) -> &Signature {
67 &self.signature
68 }
69
70 fn invoke_with_args(
71 &self,
72 args: ScalarFunctionArgs,
73 ) -> datafusion_common::Result<ColumnarValue> {
74 let ctx = find_function_context(&args)?;
75 let timezone = &ctx.query_ctx.timezone();
76
77 let [left, arg1] = extract_args(self.name(), &args)?;
78 let formats = arg1.as_string::<i32>();
79
80 let size = left.len();
81 let left_datatype = left.data_type();
82 let mut builder = StringViewBuilder::with_capacity(size);
83
84 match left_datatype {
85 DataType::Timestamp(_, _) => {
86 with_match_timestamp_types!(left_datatype, |$S| {
87 let array = left.as_primitive::<$S>();
88 for (date, format) in array.iter().zip(formats.iter()) {
89 let result = match (date, format) {
90 (Some(date), Some(format)) => {
91 let ts = Timestamp::new(date, $S::UNIT.into());
92 let x = ts.as_formatted_string(&format, Some(timezone))
93 .map_err(|e| DataFusionError::Execution(format!(
94 "cannot format {ts:?} as '{format}': {e}"
95 )))?;
96 Some(x)
97 }
98 _ => None
99 };
100 builder.append_option(result.as_deref());
101 }
102 })?;
103 }
104 DataType::Date32 => {
105 let left = left.as_primitive::<Date32Type>();
106 for i in 0..size {
107 let date = left.is_valid(i).then(|| Date::from(left.value(i)));
108 let format = formats.is_valid(i).then(|| formats.value(i));
109
110 let result = match (date, format) {
111 (Some(date), Some(fmt)) => date
112 .as_formatted_string(fmt, Some(timezone))
113 .map_err(BoxedError::new)
114 .context(error::ExecuteSnafu)?,
115 _ => None,
116 };
117
118 builder.append_option(result.as_deref());
119 }
120 }
121 DataType::Date64 => {
122 let left = left.as_primitive::<Date64Type>();
123 for i in 0..size {
124 let date = left.is_valid(i).then(|| {
125 let ms = left.value(i);
126 Timestamp::new_millisecond(ms)
127 });
128 let format = formats.is_valid(i).then(|| formats.value(i));
129
130 let result = match (date, format) {
131 (Some(ts), Some(fmt)) => {
132 Some(ts.as_formatted_string(fmt, Some(timezone)).map_err(|e| {
133 DataFusionError::Execution(format!(
134 "cannot format {ts:?} as '{fmt}': {e}"
135 ))
136 })?)
137 }
138 _ => None,
139 };
140
141 builder.append_option(result.as_deref());
142 }
143 }
144 x => {
145 return Err(DataFusionError::Execution(format!(
146 "unsupported input data type {x}"
147 )));
148 }
149 }
150
151 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
152 }
153}
154
155impl fmt::Display for DateFormatFunction {
156 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
157 write!(f, "DATE_FORMAT")
158 }
159}
160
161#[cfg(test)]
162mod tests {
163 use std::sync::Arc;
164
165 use arrow_schema::Field;
166 use datafusion_common::arrow::array::{
167 Date32Array, Date64Array, StringArray, TimestampSecondArray,
168 };
169 use datafusion_common::config::ConfigOptions;
170 use datafusion_expr::{TypeSignature, Volatility};
171
172 use super::{DateFormatFunction, *};
173 use crate::function::FunctionContext;
174
175 #[test]
176 fn test_date_format_misc() {
177 let f = DateFormatFunction::default();
178 assert_eq!("date_format", f.name());
179 assert_eq!(
180 DataType::Utf8View,
181 f.return_type(&[DataType::Timestamp(TimeUnit::Microsecond, None)])
182 .unwrap()
183 );
184 assert_eq!(
185 DataType::Utf8View,
186 f.return_type(&[DataType::Timestamp(TimeUnit::Second, None)])
187 .unwrap()
188 );
189 assert_eq!(
190 DataType::Utf8View,
191 f.return_type(&[DataType::Date32]).unwrap()
192 );
193 assert!(matches!(f.signature(),
194 Signature {
195 type_signature: TypeSignature::OneOf(sigs),
196 volatility: Volatility::Immutable
197 } if sigs.len() == 6));
198 }
199
200 #[test]
201 fn test_timestamp_date_format() {
202 let f = DateFormatFunction::default();
203
204 let times = vec![Some(123), None, Some(42), None];
205 let formats = vec![
206 "%Y-%m-%d %T.%3f",
207 "%Y-%m-%d %T.%3f",
208 "%Y-%m-%d %T.%3f",
209 "%Y-%m-%d %T.%3f",
210 ];
211 let results = [
212 Some("1970-01-01 00:02:03.000"),
213 None,
214 Some("1970-01-01 00:00:42.000"),
215 None,
216 ];
217
218 let mut config_options = ConfigOptions::default();
219 config_options.extensions.insert(FunctionContext::default());
220 let config_options = Arc::new(config_options);
221
222 let args = ScalarFunctionArgs {
223 args: vec![
224 ColumnarValue::Array(Arc::new(TimestampSecondArray::from(times))),
225 ColumnarValue::Array(Arc::new(StringArray::from_iter_values(formats))),
226 ],
227 arg_fields: vec![],
228 number_rows: 4,
229 return_field: Arc::new(Field::new("x", DataType::Utf8View, false)),
230 config_options,
231 };
232 let result = f
233 .invoke_with_args(args)
234 .and_then(|x| x.to_array(4))
235 .unwrap();
236 let vector = result.as_string_view();
237
238 assert_eq!(4, vector.len());
239 for (actual, expect) in vector.iter().zip(results) {
240 assert_eq!(actual, expect);
241 }
242 }
243
244 #[test]
245 fn test_date64_date_format() {
246 let f = DateFormatFunction::default();
247
248 let dates = vec![Some(123000), None, Some(42000), None];
249 let formats = vec![
250 "%Y-%m-%d %T.%3f",
251 "%Y-%m-%d %T.%3f",
252 "%Y-%m-%d %T.%3f",
253 "%Y-%m-%d %T.%3f",
254 ];
255 let results = [
256 Some("1970-01-01 00:02:03.000"),
257 None,
258 Some("1970-01-01 00:00:42.000"),
259 None,
260 ];
261
262 let mut config_options = ConfigOptions::default();
263 config_options.extensions.insert(FunctionContext::default());
264 let config_options = Arc::new(config_options);
265
266 let args = ScalarFunctionArgs {
267 args: vec![
268 ColumnarValue::Array(Arc::new(Date64Array::from(dates))),
269 ColumnarValue::Array(Arc::new(StringArray::from_iter_values(formats))),
270 ],
271 arg_fields: vec![],
272 number_rows: 4,
273 return_field: Arc::new(Field::new("x", DataType::Utf8View, false)),
274 config_options,
275 };
276 let result = f
277 .invoke_with_args(args)
278 .and_then(|x| x.to_array(4))
279 .unwrap();
280 let vector = result.as_string_view();
281
282 assert_eq!(4, vector.len());
283 for (actual, expect) in vector.iter().zip(results) {
284 assert_eq!(actual, expect);
285 }
286 }
287
288 #[test]
289 fn test_date_date_format() {
290 let f = DateFormatFunction::default();
291
292 let dates = vec![Some(123), None, Some(42), None];
293 let formats = vec![
294 "%Y-%m-%d %T.%3f",
295 "%Y-%m-%d %T.%3f",
296 "%Y-%m-%d %T.%3f",
297 "%Y-%m-%d %T.%3f",
298 ];
299 let results = [
300 Some("1970-05-04 00:00:00.000"),
301 None,
302 Some("1970-02-12 00:00:00.000"),
303 None,
304 ];
305
306 let mut config_options = ConfigOptions::default();
307 config_options.extensions.insert(FunctionContext::default());
308 let config_options = Arc::new(config_options);
309
310 let args = ScalarFunctionArgs {
311 args: vec![
312 ColumnarValue::Array(Arc::new(Date32Array::from(dates))),
313 ColumnarValue::Array(Arc::new(StringArray::from_iter_values(formats))),
314 ],
315 arg_fields: vec![],
316 number_rows: 4,
317 return_field: Arc::new(Field::new("x", DataType::Utf8View, false)),
318 config_options,
319 };
320 let result = f
321 .invoke_with_args(args)
322 .and_then(|x| x.to_array(4))
323 .unwrap();
324 let vector = result.as_string_view();
325
326 assert_eq!(4, vector.len());
327 for (actual, expect) in vector.iter().zip(results) {
328 assert_eq!(actual, expect);
329 }
330 }
331}