1use std::fmt::{self, Display};
16use std::sync::Arc;
17
18use arrow::compute;
19use datafusion_common::arrow::array::{
20 Array, AsArray, BooleanBuilder, Float64Builder, Int64Builder, StringViewBuilder,
21};
22use datafusion_common::arrow::datatypes::DataType;
23use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature};
24
25use crate::function::{Function, extract_args};
26use crate::helper;
27
28fn get_json_by_path(json: &[u8], path: &str) -> Option<Vec<u8>> {
29 let json_path = jsonb::jsonpath::parse_json_path(path.as_bytes());
30 match json_path {
31 Ok(json_path) => {
32 let mut sub_jsonb = Vec::new();
33 let mut sub_offsets = Vec::new();
34 match jsonb::get_by_path(json, json_path, &mut sub_jsonb, &mut sub_offsets) {
35 Ok(_) => Some(sub_jsonb),
36 Err(_) => None,
37 }
38 }
39 _ => None,
40 }
41}
42
43macro_rules! json_get {
46 ($name:ident, $type:ident, $rust_type:ident, $doc:expr) => {
48 paste::paste! {
49 #[doc = $doc]
50 #[derive(Clone, Debug)]
51 pub struct $name {
52 signature: Signature,
53 }
54
55 impl $name {
56 pub const NAME: &'static str = stringify!([<$name:snake>]);
57 }
58
59 impl Default for $name {
60 fn default() -> Self {
61 Self {
62 signature: helper::one_of_sigs2(
64 vec![DataType::Binary, DataType::BinaryView],
65 vec![DataType::Utf8, DataType::Utf8View],
66 ),
67 }
68 }
69 }
70
71 impl Function for $name {
72 fn name(&self) -> &str {
73 Self::NAME
74 }
75
76 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
77 Ok(DataType::[<$type>])
78 }
79
80 fn signature(&self) -> &Signature {
81 &self.signature
82 }
83
84 fn invoke_with_args(
85 &self,
86 args: ScalarFunctionArgs,
87 ) -> datafusion_common::Result<ColumnarValue> {
88 let [arg0, arg1] = extract_args(self.name(), &args)?;
89 let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
90 let jsons = arg0.as_binary_view();
91 let arg1 = compute::cast(&arg1, &DataType::Utf8View)?;
92 let paths = arg1.as_string_view();
93
94 let size = jsons.len();
95 let mut builder = [<$type Builder>]::with_capacity(size);
96
97 for i in 0..size {
98 let json = jsons.is_valid(i).then(|| jsons.value(i));
99 let path = paths.is_valid(i).then(|| paths.value(i));
100 let result = match (json, path) {
101 (Some(json), Some(path)) => {
102 get_json_by_path(json, path)
103 .and_then(|json| { jsonb::[<to_ $rust_type>](&json).ok() })
104 }
105 _ => None,
106 };
107
108 builder.append_option(result);
109 }
110
111 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
112 }
113 }
114
115 impl Display for $name {
116 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
117 write!(f, "{}", Self::NAME.to_ascii_uppercase())
118 }
119 }
120 }
121 };
122}
123
124json_get!(
125 JsonGetInt,
126 Int64,
127 i64,
128 "Get the value from the JSONB by the given path and return it as an integer."
129);
130
131json_get!(
132 JsonGetFloat,
133 Float64,
134 f64,
135 "Get the value from the JSONB by the given path and return it as a float."
136);
137
138json_get!(
139 JsonGetBool,
140 Boolean,
141 bool,
142 "Get the value from the JSONB by the given path and return it as a boolean."
143);
144
145#[derive(Clone, Debug)]
147pub struct JsonGetString {
148 signature: Signature,
149}
150
151impl JsonGetString {
152 pub const NAME: &'static str = "json_get_string";
153}
154
155impl Default for JsonGetString {
156 fn default() -> Self {
157 Self {
158 signature: helper::one_of_sigs2(
160 vec![DataType::Binary, DataType::BinaryView],
161 vec![DataType::Utf8, DataType::Utf8View],
162 ),
163 }
164 }
165}
166
167impl Function for JsonGetString {
168 fn name(&self) -> &str {
169 Self::NAME
170 }
171
172 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
173 Ok(DataType::Utf8View)
174 }
175
176 fn signature(&self) -> &Signature {
177 &self.signature
178 }
179
180 fn invoke_with_args(
181 &self,
182 args: ScalarFunctionArgs,
183 ) -> datafusion_common::Result<ColumnarValue> {
184 let [arg0, arg1] = extract_args(self.name(), &args)?;
185 let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
186 let jsons = arg0.as_binary_view();
187 let arg1 = compute::cast(&arg1, &DataType::Utf8View)?;
188 let paths = arg1.as_string_view();
189
190 let size = jsons.len();
191 let mut builder = StringViewBuilder::with_capacity(size);
192
193 for i in 0..size {
194 let json = jsons.is_valid(i).then(|| jsons.value(i));
195 let path = paths.is_valid(i).then(|| paths.value(i));
196 let result = match (json, path) {
197 (Some(json), Some(path)) => {
198 get_json_by_path(json, path).and_then(|json| jsonb::to_str(&json).ok())
199 }
200 _ => None,
201 };
202 builder.append_option(result);
203 }
204
205 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
206 }
207}
208
209impl Display for JsonGetString {
210 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
211 write!(f, "{}", Self::NAME.to_ascii_uppercase())
212 }
213}
214
215#[cfg(test)]
216mod tests {
217 use std::sync::Arc;
218
219 use arrow_schema::Field;
220 use datafusion_common::arrow::array::{BinaryArray, StringArray};
221 use datafusion_common::arrow::datatypes::{Float64Type, Int64Type};
222
223 use super::*;
224
225 #[test]
226 fn test_json_get_int() {
227 let json_get_int = JsonGetInt::default();
228
229 assert_eq!("json_get_int", json_get_int.name());
230 assert_eq!(
231 DataType::Int64,
232 json_get_int
233 .return_type(&[DataType::Binary, DataType::Utf8])
234 .unwrap()
235 );
236
237 let json_strings = [
238 r#"{"a": {"b": 2}, "b": 2, "c": 3}"#,
239 r#"{"a": 4, "b": {"c": 6}, "c": 6}"#,
240 r#"{"a": 7, "b": 8, "c": {"a": 7}}"#,
241 ];
242 let paths = vec!["$.a.b", "$.a", "$.c"];
243 let results = [Some(2), Some(4), None];
244
245 let jsonbs = json_strings
246 .iter()
247 .map(|s| {
248 let value = jsonb::parse_value(s.as_bytes()).unwrap();
249 value.to_vec()
250 })
251 .collect::<Vec<_>>();
252
253 let args = ScalarFunctionArgs {
254 args: vec![
255 ColumnarValue::Array(Arc::new(BinaryArray::from_iter_values(jsonbs))),
256 ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))),
257 ],
258 arg_fields: vec![],
259 number_rows: 3,
260 return_field: Arc::new(Field::new("x", DataType::Int64, false)),
261 config_options: Arc::new(Default::default()),
262 };
263 let result = json_get_int
264 .invoke_with_args(args)
265 .and_then(|x| x.to_array(3))
266 .unwrap();
267 let vector = result.as_primitive::<Int64Type>();
268
269 assert_eq!(3, vector.len());
270 for (i, gt) in results.iter().enumerate() {
271 let result = vector.is_valid(i).then(|| vector.value(i));
272 assert_eq!(*gt, result);
273 }
274 }
275
276 #[test]
277 fn test_json_get_float() {
278 let json_get_float = JsonGetFloat::default();
279
280 assert_eq!("json_get_float", json_get_float.name());
281 assert_eq!(
282 DataType::Float64,
283 json_get_float
284 .return_type(&[DataType::Binary, DataType::Utf8])
285 .unwrap()
286 );
287
288 let json_strings = [
289 r#"{"a": {"b": 2.1}, "b": 2.2, "c": 3.3}"#,
290 r#"{"a": 4.4, "b": {"c": 6.6}, "c": 6.6}"#,
291 r#"{"a": 7.7, "b": 8.8, "c": {"a": 7.7}}"#,
292 ];
293 let paths = vec!["$.a.b", "$.a", "$.c"];
294 let results = [Some(2.1), Some(4.4), None];
295
296 let jsonbs = json_strings
297 .iter()
298 .map(|s| {
299 let value = jsonb::parse_value(s.as_bytes()).unwrap();
300 value.to_vec()
301 })
302 .collect::<Vec<_>>();
303
304 let args = ScalarFunctionArgs {
305 args: vec![
306 ColumnarValue::Array(Arc::new(BinaryArray::from_iter_values(jsonbs))),
307 ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))),
308 ],
309 arg_fields: vec![],
310 number_rows: 3,
311 return_field: Arc::new(Field::new("x", DataType::Float64, false)),
312 config_options: Arc::new(Default::default()),
313 };
314 let result = json_get_float
315 .invoke_with_args(args)
316 .and_then(|x| x.to_array(3))
317 .unwrap();
318 let vector = result.as_primitive::<Float64Type>();
319
320 assert_eq!(3, vector.len());
321 for (i, gt) in results.iter().enumerate() {
322 let result = vector.is_valid(i).then(|| vector.value(i));
323 assert_eq!(*gt, result);
324 }
325 }
326
327 #[test]
328 fn test_json_get_bool() {
329 let json_get_bool = JsonGetBool::default();
330
331 assert_eq!("json_get_bool", json_get_bool.name());
332 assert_eq!(
333 DataType::Boolean,
334 json_get_bool
335 .return_type(&[DataType::Binary, DataType::Utf8])
336 .unwrap()
337 );
338
339 let json_strings = [
340 r#"{"a": {"b": true}, "b": false, "c": true}"#,
341 r#"{"a": false, "b": {"c": true}, "c": false}"#,
342 r#"{"a": true, "b": false, "c": {"a": true}}"#,
343 ];
344 let paths = vec!["$.a.b", "$.a", "$.c"];
345 let results = [Some(true), Some(false), None];
346
347 let jsonbs = json_strings
348 .iter()
349 .map(|s| {
350 let value = jsonb::parse_value(s.as_bytes()).unwrap();
351 value.to_vec()
352 })
353 .collect::<Vec<_>>();
354
355 let args = ScalarFunctionArgs {
356 args: vec![
357 ColumnarValue::Array(Arc::new(BinaryArray::from_iter_values(jsonbs))),
358 ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))),
359 ],
360 arg_fields: vec![],
361 number_rows: 3,
362 return_field: Arc::new(Field::new("x", DataType::Boolean, false)),
363 config_options: Arc::new(Default::default()),
364 };
365 let result = json_get_bool
366 .invoke_with_args(args)
367 .and_then(|x| x.to_array(3))
368 .unwrap();
369 let vector = result.as_boolean();
370
371 assert_eq!(3, vector.len());
372 for (i, gt) in results.iter().enumerate() {
373 let result = vector.is_valid(i).then(|| vector.value(i));
374 assert_eq!(*gt, result);
375 }
376 }
377
378 #[test]
379 fn test_json_get_string() {
380 let json_get_string = JsonGetString::default();
381
382 assert_eq!("json_get_string", json_get_string.name());
383 assert_eq!(
384 DataType::Utf8View,
385 json_get_string
386 .return_type(&[DataType::Binary, DataType::Utf8])
387 .unwrap()
388 );
389
390 let json_strings = [
391 r#"{"a": {"b": "a"}, "b": "b", "c": "c"}"#,
392 r#"{"a": "d", "b": {"c": "e"}, "c": "f"}"#,
393 r#"{"a": "g", "b": "h", "c": {"a": "g"}}"#,
394 ];
395 let paths = vec!["$.a.b", "$.a", ""];
396 let results = [Some("a"), Some("d"), None];
397
398 let jsonbs = json_strings
399 .iter()
400 .map(|s| {
401 let value = jsonb::parse_value(s.as_bytes()).unwrap();
402 value.to_vec()
403 })
404 .collect::<Vec<_>>();
405
406 let args = ScalarFunctionArgs {
407 args: vec![
408 ColumnarValue::Array(Arc::new(BinaryArray::from_iter_values(jsonbs))),
409 ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))),
410 ],
411 arg_fields: vec![],
412 number_rows: 3,
413 return_field: Arc::new(Field::new("x", DataType::Utf8View, false)),
414 config_options: Arc::new(Default::default()),
415 };
416 let result = json_get_string
417 .invoke_with_args(args)
418 .and_then(|x| x.to_array(3))
419 .unwrap();
420 let vector = result.as_string_view();
421
422 assert_eq!(3, vector.len());
423 for (i, gt) in results.iter().enumerate() {
424 let result = vector.is_valid(i).then(|| vector.value(i));
425 assert_eq!(*gt, result);
426 }
427 }
428}