1use std::net::{Ipv4Addr, Ipv6Addr};
16use std::str::FromStr;
17use std::sync::Arc;
18
19use common_query::error::InvalidFuncArgsSnafu;
20use datafusion::arrow::datatypes::DataType;
21use datafusion_common::DataFusionError;
22use datafusion_common::arrow::array::{Array, AsArray, BinaryViewBuilder, StringViewBuilder};
23use datafusion_common::arrow::compute;
24use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility};
25use derive_more::Display;
26
27use crate::function::{Function, extract_args};
28
29#[derive(Clone, Debug, Display)]
35#[display("{}", self.name())]
36pub(crate) struct Ipv6NumToString {
37 signature: Signature,
38}
39
40impl Default for Ipv6NumToString {
41 fn default() -> Self {
42 Self {
43 signature: Signature::string(1, Volatility::Immutable),
44 }
45 }
46}
47
48impl Function for Ipv6NumToString {
49 fn name(&self) -> &str {
50 "ipv6_num_to_string"
51 }
52
53 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
54 Ok(DataType::Utf8View)
55 }
56
57 fn signature(&self) -> &Signature {
58 &self.signature
59 }
60
61 fn invoke_with_args(
62 &self,
63 args: ScalarFunctionArgs,
64 ) -> datafusion_common::Result<ColumnarValue> {
65 let [arg0] = extract_args(self.name(), &args)?;
66
67 let arg0 = compute::cast(&arg0, &DataType::Utf8View)?;
68 let hex_vec = arg0.as_string_view();
69 let size = hex_vec.len();
70 let mut builder = StringViewBuilder::with_capacity(size);
71
72 for i in 0..size {
73 let hex_str = hex_vec.is_valid(i).then(|| hex_vec.value(i));
74 let ip_str = match hex_str {
75 Some(s) => {
76 let hex_str = s.to_lowercase();
77
78 let bytes = if hex_str.len() == 32 {
80 let mut bytes = [0u8; 16];
81 for i in 0..16 {
82 let byte_str = &hex_str[i * 2..i * 2 + 2];
83 bytes[i] = u8::from_str_radix(byte_str, 16).map_err(|_| {
84 InvalidFuncArgsSnafu {
85 err_msg: format!("Invalid hex characters in '{}'", byte_str),
86 }
87 .build()
88 })?;
89 }
90 bytes
91 } else {
92 return Err(DataFusionError::Execution(format!(
93 "expecting 32 hex characters, got {}",
94 hex_str.len()
95 )));
96 };
97
98 let addr = Ipv6Addr::from(bytes);
100
101 if let Some(ipv4) = addr.to_ipv4() {
103 if addr.octets()[0..10].iter().all(|&b| b == 0)
104 && addr.octets()[10] == 0xFF
105 && addr.octets()[11] == 0xFF
106 {
107 Some(format!("::ffff:{}", ipv4))
108 } else {
109 Some(addr.to_string())
110 }
111 } else {
112 Some(addr.to_string())
113 }
114 }
115 _ => None,
116 };
117
118 builder.append_option(ip_str.as_deref());
119 }
120
121 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
122 }
123}
124
125#[derive(Clone, Debug, Display)]
133#[display("{}", self.name())]
134pub(crate) struct Ipv6StringToNum {
135 signature: Signature,
136}
137
138impl Default for Ipv6StringToNum {
139 fn default() -> Self {
140 Self {
141 signature: Signature::string(1, Volatility::Immutable),
142 }
143 }
144}
145
146impl Function for Ipv6StringToNum {
147 fn name(&self) -> &str {
148 "ipv6_string_to_num"
149 }
150
151 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
152 Ok(DataType::BinaryView)
153 }
154
155 fn signature(&self) -> &Signature {
156 &self.signature
157 }
158
159 fn invoke_with_args(
160 &self,
161 args: ScalarFunctionArgs,
162 ) -> datafusion_common::Result<ColumnarValue> {
163 let [arg0] = extract_args(self.name(), &args)?;
164 let arg0 = compute::cast(&arg0, &DataType::Utf8View)?;
165 let ip_vec = arg0.as_string_view();
166
167 let size = ip_vec.len();
168 let mut builder = BinaryViewBuilder::with_capacity(size);
169
170 for i in 0..size {
171 let ip_str = ip_vec.is_valid(i).then(|| ip_vec.value(i));
172 let ip_binary = match ip_str {
173 Some(addr_str) => {
174 let addr = if let Ok(ipv6) = Ipv6Addr::from_str(addr_str) {
175 ipv6
177 } else if let Ok(ipv4) = Ipv4Addr::from_str(addr_str) {
178 ipv4.to_ipv6_mapped()
180 } else {
181 return Err(DataFusionError::Execution(format!(
183 "Invalid IPv6 address format: {}",
184 addr_str
185 )));
186 };
187
188 let octets = addr.octets();
190 Some(octets.to_vec())
191 }
192 _ => None,
193 };
194
195 builder.append_option(ip_binary.as_deref());
196 }
197
198 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use std::fmt::Write;
205 use std::sync::Arc;
206
207 use arrow_schema::Field;
208 use datafusion_common::arrow::array::StringViewArray;
209
210 use super::*;
211
212 #[test]
213 fn test_ipv6_num_to_string() {
214 let func = Ipv6NumToString::default();
215
216 let hex_str1 = "20010db8000000000000000000000001";
218
219 let hex_str2 = "00000000000000000000ffffc0a80001";
221
222 let values = vec![hex_str1, hex_str2];
223 let arg0 = ColumnarValue::Array(Arc::new(StringViewArray::from_iter_values(&values)));
224
225 let args = ScalarFunctionArgs {
226 args: vec![arg0],
227 arg_fields: vec![],
228 number_rows: 2,
229 return_field: Arc::new(Field::new("x", DataType::Utf8View, false)),
230 config_options: Arc::new(Default::default()),
231 };
232 let result = func.invoke_with_args(args).unwrap();
233 let result = result.to_array(2).unwrap();
234 let result = result.as_string_view();
235
236 assert_eq!(result.value(0), "2001:db8::1");
237 assert_eq!(result.value(1), "::ffff:192.168.0.1");
238 }
239
240 #[test]
241 fn test_ipv6_num_to_string_uppercase() {
242 let func = Ipv6NumToString::default();
243
244 let hex_str = "20010DB8000000000000000000000001";
246
247 let values = vec![hex_str];
248 let arg0 = ColumnarValue::Array(Arc::new(StringViewArray::from_iter_values(&values)));
249
250 let args = ScalarFunctionArgs {
251 args: vec![arg0],
252 arg_fields: vec![],
253 number_rows: 1,
254 return_field: Arc::new(Field::new("x", DataType::Utf8View, false)),
255 config_options: Arc::new(Default::default()),
256 };
257 let result = func.invoke_with_args(args).unwrap();
258 let result = result.to_array(1).unwrap();
259 let result = result.as_string_view();
260
261 assert_eq!(result.value(0), "2001:db8::1");
262 }
263
264 #[test]
265 fn test_ipv6_num_to_string_error() {
266 let func = Ipv6NumToString::default();
267
268 let hex_str = "20010db8";
270
271 let values = vec![hex_str];
272 let arg0 = ColumnarValue::Array(Arc::new(StringViewArray::from_iter_values(&values)));
273
274 let args = ScalarFunctionArgs {
276 args: vec![arg0],
277 arg_fields: vec![],
278 number_rows: 2,
279 return_field: Arc::new(Field::new("x", DataType::Utf8View, false)),
280 config_options: Arc::new(Default::default()),
281 };
282 let result = func.invoke_with_args(args);
283 assert!(result.is_err());
284
285 let error_msg = result.unwrap_err().to_string();
287 assert_eq!(
288 error_msg,
289 "Execution error: expecting 32 hex characters, got 8"
290 );
291 }
292
293 #[test]
294 fn test_ipv6_string_to_num() {
295 let func = Ipv6StringToNum::default();
296
297 let values = vec!["2001:db8::1", "::ffff:192.168.0.1", "192.168.0.1"];
298 let arg0 = ColumnarValue::Array(Arc::new(StringViewArray::from_iter_values(&values)));
299
300 let args = ScalarFunctionArgs {
301 args: vec![arg0],
302 arg_fields: vec![],
303 number_rows: 3,
304 return_field: Arc::new(Field::new("x", DataType::Utf8View, false)),
305 config_options: Arc::new(Default::default()),
306 };
307 let result = func.invoke_with_args(args).unwrap();
308 let result = result.to_array(3).unwrap();
309 let result = result.as_binary_view();
310
311 let expected_1 = [
313 0x20, 0x01, 0x0d, 0xb8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0x01,
314 ];
315
316 let expected_2 = [
318 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xFF, 0xFF, 0xC0, 0xA8, 0, 0x01,
319 ];
320
321 assert_eq!(result.value(0), &expected_1);
322 assert_eq!(result.value(1), &expected_2);
323 assert_eq!(result.value(2), &expected_2);
324 }
325
326 #[test]
327 fn test_ipv6_conversions_roundtrip() {
328 let to_num = Ipv6StringToNum::default();
329 let to_string = Ipv6NumToString::default();
330
331 let values = vec!["2001:db8::1", "::ffff:192.168.0.1"];
333 let arg0 = ColumnarValue::Array(Arc::new(StringViewArray::from_iter_values(&values)));
334
335 let args = ScalarFunctionArgs {
337 args: vec![arg0],
338 arg_fields: vec![],
339 number_rows: 2,
340 return_field: Arc::new(Field::new("x", DataType::BinaryView, false)),
341 config_options: Arc::new(Default::default()),
342 };
343 let result = to_num.invoke_with_args(args).unwrap();
344
345 let mut hex_strings = Vec::new();
347 let result = result.to_array(2).unwrap();
348 let binary_vector = result.as_binary_view();
349
350 for i in 0..binary_vector.len() {
351 let bytes = binary_vector.value(i);
352 let hex = bytes.iter().fold(String::new(), |mut acc, b| {
353 write!(&mut acc, "{:02x}", b).unwrap();
354 acc
355 });
356 hex_strings.push(hex);
357 }
358
359 let hex_str_refs: Vec<&str> = hex_strings.iter().map(|s| s.as_str()).collect();
360 let arg0 = ColumnarValue::Array(Arc::new(StringViewArray::from_iter_values(&hex_str_refs)));
361
362 let args = ScalarFunctionArgs {
364 args: vec![arg0],
365 arg_fields: vec![],
366 number_rows: 2,
367 return_field: Arc::new(Field::new("x", DataType::Utf8View, false)),
368 config_options: Arc::new(Default::default()),
369 };
370 let result = to_string.invoke_with_args(args).unwrap();
371 let result = result.to_array(2).unwrap();
372 let result = result.as_string_view();
373
374 assert_eq!(result.value(0), values[0]);
376 assert_eq!(result.value(1), values[1]);
377 }
378
379 #[test]
380 fn test_ipv6_conversions_hex_roundtrip() {
381 let to_string = Ipv6NumToString::default();
384 let to_binary = Ipv6StringToNum::default();
385
386 let hex_values = vec![
388 "20010db8000000000000000000000001",
389 "00000000000000000000ffffc0a80001",
390 ];
391 let arg0 = ColumnarValue::Array(Arc::new(StringViewArray::from_iter_values(&hex_values)));
392
393 let args = ScalarFunctionArgs {
395 args: vec![arg0],
396 arg_fields: vec![],
397 number_rows: 2,
398 return_field: Arc::new(Field::new("x", DataType::Utf8View, false)),
399 config_options: Arc::new(Default::default()),
400 };
401 let result = to_string.invoke_with_args(args).unwrap();
402
403 let args = ScalarFunctionArgs {
405 args: vec![result],
406 arg_fields: vec![],
407 number_rows: 2,
408 return_field: Arc::new(Field::new("x", DataType::BinaryView, false)),
409 config_options: Arc::new(Default::default()),
410 };
411 let result = to_binary.invoke_with_args(args).unwrap();
412 let result = result.to_array(2).unwrap();
413 let result = result.as_binary_view();
414
415 let expected_bin1 = [
417 0x20, 0x01, 0x0d, 0xb8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0x01,
418 ];
419 let expected_bin2 = [
420 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xFF, 0xFF, 0xC0, 0xA8, 0, 0x01,
421 ];
422
423 assert_eq!(result.value(0), &expected_bin1);
424 assert_eq!(result.value(1), &expected_bin2);
425 }
426}