common_function/scalars/ip/
ipv6.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::net::{Ipv4Addr, Ipv6Addr};
16use std::str::FromStr;
17use std::sync::Arc;
18
19use common_query::error::InvalidFuncArgsSnafu;
20use datafusion::arrow::datatypes::DataType;
21use datafusion_common::DataFusionError;
22use datafusion_common::arrow::array::{Array, AsArray, BinaryViewBuilder, StringViewBuilder};
23use datafusion_common::arrow::compute;
24use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility};
25use derive_more::Display;
26
27use crate::function::{Function, extract_args};
28
29/// Function that converts a hex string representation of an IPv6 address to a formatted string.
30///
31/// For example:
32/// - "20010DB8000000000000000000000001" returns "2001:db8::1"
33/// - "00000000000000000000FFFFC0A80001" returns "::ffff:192.168.0.1"
34#[derive(Clone, Debug, Display)]
35#[display("{}", self.name())]
36pub(crate) struct Ipv6NumToString {
37    signature: Signature,
38}
39
40impl Default for Ipv6NumToString {
41    fn default() -> Self {
42        Self {
43            signature: Signature::string(1, Volatility::Immutable),
44        }
45    }
46}
47
48impl Function for Ipv6NumToString {
49    fn name(&self) -> &str {
50        "ipv6_num_to_string"
51    }
52
53    fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
54        Ok(DataType::Utf8View)
55    }
56
57    fn signature(&self) -> &Signature {
58        &self.signature
59    }
60
61    fn invoke_with_args(
62        &self,
63        args: ScalarFunctionArgs,
64    ) -> datafusion_common::Result<ColumnarValue> {
65        let [arg0] = extract_args(self.name(), &args)?;
66
67        let arg0 = compute::cast(&arg0, &DataType::Utf8View)?;
68        let hex_vec = arg0.as_string_view();
69        let size = hex_vec.len();
70        let mut builder = StringViewBuilder::with_capacity(size);
71
72        for i in 0..size {
73            let hex_str = hex_vec.is_valid(i).then(|| hex_vec.value(i));
74            let ip_str = match hex_str {
75                Some(s) => {
76                    let hex_str = s.to_lowercase();
77
78                    // Validate and convert hex string to bytes
79                    let bytes = if hex_str.len() == 32 {
80                        let mut bytes = [0u8; 16];
81                        for i in 0..16 {
82                            let byte_str = &hex_str[i * 2..i * 2 + 2];
83                            bytes[i] = u8::from_str_radix(byte_str, 16).map_err(|_| {
84                                InvalidFuncArgsSnafu {
85                                    err_msg: format!("Invalid hex characters in '{}'", byte_str),
86                                }
87                                .build()
88                            })?;
89                        }
90                        bytes
91                    } else {
92                        return Err(DataFusionError::Execution(format!(
93                            "expecting 32 hex characters, got {}",
94                            hex_str.len()
95                        )));
96                    };
97
98                    // Convert bytes to IPv6 address
99                    let addr = Ipv6Addr::from(bytes);
100
101                    // Special handling for IPv6-mapped IPv4 addresses
102                    if let Some(ipv4) = addr.to_ipv4() {
103                        if addr.octets()[0..10].iter().all(|&b| b == 0)
104                            && addr.octets()[10] == 0xFF
105                            && addr.octets()[11] == 0xFF
106                        {
107                            Some(format!("::ffff:{}", ipv4))
108                        } else {
109                            Some(addr.to_string())
110                        }
111                    } else {
112                        Some(addr.to_string())
113                    }
114                }
115                _ => None,
116            };
117
118            builder.append_option(ip_str.as_deref());
119        }
120
121        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
122    }
123}
124
125/// Function that converts a string representation of an IPv6 address to its binary representation.
126///
127/// For example:
128/// - "2001:db8::1" returns its binary representation
129/// - If the input string contains a valid IPv4 address, returns its IPv6 equivalent
130/// - HEX can be uppercase or lowercase
131/// - Invalid IPv6 format throws an exception
132#[derive(Clone, Debug, Display)]
133#[display("{}", self.name())]
134pub(crate) struct Ipv6StringToNum {
135    signature: Signature,
136}
137
138impl Default for Ipv6StringToNum {
139    fn default() -> Self {
140        Self {
141            signature: Signature::string(1, Volatility::Immutable),
142        }
143    }
144}
145
146impl Function for Ipv6StringToNum {
147    fn name(&self) -> &str {
148        "ipv6_string_to_num"
149    }
150
151    fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
152        Ok(DataType::BinaryView)
153    }
154
155    fn signature(&self) -> &Signature {
156        &self.signature
157    }
158
159    fn invoke_with_args(
160        &self,
161        args: ScalarFunctionArgs,
162    ) -> datafusion_common::Result<ColumnarValue> {
163        let [arg0] = extract_args(self.name(), &args)?;
164        let arg0 = compute::cast(&arg0, &DataType::Utf8View)?;
165        let ip_vec = arg0.as_string_view();
166
167        let size = ip_vec.len();
168        let mut builder = BinaryViewBuilder::with_capacity(size);
169
170        for i in 0..size {
171            let ip_str = ip_vec.is_valid(i).then(|| ip_vec.value(i));
172            let ip_binary = match ip_str {
173                Some(addr_str) => {
174                    let addr = if let Ok(ipv6) = Ipv6Addr::from_str(addr_str) {
175                        // Direct IPv6 address
176                        ipv6
177                    } else if let Ok(ipv4) = Ipv4Addr::from_str(addr_str) {
178                        // IPv4 address to be converted to IPv6
179                        ipv4.to_ipv6_mapped()
180                    } else {
181                        // Invalid format
182                        return Err(DataFusionError::Execution(format!(
183                            "Invalid IPv6 address format: {}",
184                            addr_str
185                        )));
186                    };
187
188                    // Convert IPv6 address to binary (16 bytes)
189                    let octets = addr.octets();
190                    Some(octets.to_vec())
191                }
192                _ => None,
193            };
194
195            builder.append_option(ip_binary.as_deref());
196        }
197
198        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use std::fmt::Write;
205    use std::sync::Arc;
206
207    use arrow_schema::Field;
208    use datafusion_common::arrow::array::StringViewArray;
209
210    use super::*;
211
212    #[test]
213    fn test_ipv6_num_to_string() {
214        let func = Ipv6NumToString::default();
215
216        // Hex string for "2001:db8::1"
217        let hex_str1 = "20010db8000000000000000000000001";
218
219        // Hex string for IPv4-mapped IPv6 address "::ffff:192.168.0.1"
220        let hex_str2 = "00000000000000000000ffffc0a80001";
221
222        let values = vec![hex_str1, hex_str2];
223        let arg0 = ColumnarValue::Array(Arc::new(StringViewArray::from_iter_values(&values)));
224
225        let args = ScalarFunctionArgs {
226            args: vec![arg0],
227            arg_fields: vec![],
228            number_rows: 2,
229            return_field: Arc::new(Field::new("x", DataType::Utf8View, false)),
230            config_options: Arc::new(Default::default()),
231        };
232        let result = func.invoke_with_args(args).unwrap();
233        let result = result.to_array(2).unwrap();
234        let result = result.as_string_view();
235
236        assert_eq!(result.value(0), "2001:db8::1");
237        assert_eq!(result.value(1), "::ffff:192.168.0.1");
238    }
239
240    #[test]
241    fn test_ipv6_num_to_string_uppercase() {
242        let func = Ipv6NumToString::default();
243
244        // Uppercase hex string for "2001:db8::1"
245        let hex_str = "20010DB8000000000000000000000001";
246
247        let values = vec![hex_str];
248        let arg0 = ColumnarValue::Array(Arc::new(StringViewArray::from_iter_values(&values)));
249
250        let args = ScalarFunctionArgs {
251            args: vec![arg0],
252            arg_fields: vec![],
253            number_rows: 1,
254            return_field: Arc::new(Field::new("x", DataType::Utf8View, false)),
255            config_options: Arc::new(Default::default()),
256        };
257        let result = func.invoke_with_args(args).unwrap();
258        let result = result.to_array(1).unwrap();
259        let result = result.as_string_view();
260
261        assert_eq!(result.value(0), "2001:db8::1");
262    }
263
264    #[test]
265    fn test_ipv6_num_to_string_error() {
266        let func = Ipv6NumToString::default();
267
268        // Invalid hex string - wrong length
269        let hex_str = "20010db8";
270
271        let values = vec![hex_str];
272        let arg0 = ColumnarValue::Array(Arc::new(StringViewArray::from_iter_values(&values)));
273
274        // Should return an error
275        let args = ScalarFunctionArgs {
276            args: vec![arg0],
277            arg_fields: vec![],
278            number_rows: 2,
279            return_field: Arc::new(Field::new("x", DataType::Utf8View, false)),
280            config_options: Arc::new(Default::default()),
281        };
282        let result = func.invoke_with_args(args);
283        assert!(result.is_err());
284
285        // Check that the error message contains expected text
286        let error_msg = result.unwrap_err().to_string();
287        assert_eq!(
288            error_msg,
289            "Execution error: expecting 32 hex characters, got 8"
290        );
291    }
292
293    #[test]
294    fn test_ipv6_string_to_num() {
295        let func = Ipv6StringToNum::default();
296
297        let values = vec!["2001:db8::1", "::ffff:192.168.0.1", "192.168.0.1"];
298        let arg0 = ColumnarValue::Array(Arc::new(StringViewArray::from_iter_values(&values)));
299
300        let args = ScalarFunctionArgs {
301            args: vec![arg0],
302            arg_fields: vec![],
303            number_rows: 3,
304            return_field: Arc::new(Field::new("x", DataType::Utf8View, false)),
305            config_options: Arc::new(Default::default()),
306        };
307        let result = func.invoke_with_args(args).unwrap();
308        let result = result.to_array(3).unwrap();
309        let result = result.as_binary_view();
310
311        // Expected binary for "2001:db8::1"
312        let expected_1 = [
313            0x20, 0x01, 0x0d, 0xb8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0x01,
314        ];
315
316        // Expected binary for "::ffff:192.168.0.1" or "192.168.0.1" (IPv4-mapped)
317        let expected_2 = [
318            0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xFF, 0xFF, 0xC0, 0xA8, 0, 0x01,
319        ];
320
321        assert_eq!(result.value(0), &expected_1);
322        assert_eq!(result.value(1), &expected_2);
323        assert_eq!(result.value(2), &expected_2);
324    }
325
326    #[test]
327    fn test_ipv6_conversions_roundtrip() {
328        let to_num = Ipv6StringToNum::default();
329        let to_string = Ipv6NumToString::default();
330
331        // Test data
332        let values = vec!["2001:db8::1", "::ffff:192.168.0.1"];
333        let arg0 = ColumnarValue::Array(Arc::new(StringViewArray::from_iter_values(&values)));
334
335        // Convert IPv6 addresses to binary
336        let args = ScalarFunctionArgs {
337            args: vec![arg0],
338            arg_fields: vec![],
339            number_rows: 2,
340            return_field: Arc::new(Field::new("x", DataType::BinaryView, false)),
341            config_options: Arc::new(Default::default()),
342        };
343        let result = to_num.invoke_with_args(args).unwrap();
344
345        // Convert binary to hex string representation (for ipv6_num_to_string)
346        let mut hex_strings = Vec::new();
347        let result = result.to_array(2).unwrap();
348        let binary_vector = result.as_binary_view();
349
350        for i in 0..binary_vector.len() {
351            let bytes = binary_vector.value(i);
352            let hex = bytes.iter().fold(String::new(), |mut acc, b| {
353                write!(&mut acc, "{:02x}", b).unwrap();
354                acc
355            });
356            hex_strings.push(hex);
357        }
358
359        let hex_str_refs: Vec<&str> = hex_strings.iter().map(|s| s.as_str()).collect();
360        let arg0 = ColumnarValue::Array(Arc::new(StringViewArray::from_iter_values(&hex_str_refs)));
361
362        // Now convert hex to formatted string
363        let args = ScalarFunctionArgs {
364            args: vec![arg0],
365            arg_fields: vec![],
366            number_rows: 2,
367            return_field: Arc::new(Field::new("x", DataType::Utf8View, false)),
368            config_options: Arc::new(Default::default()),
369        };
370        let result = to_string.invoke_with_args(args).unwrap();
371        let result = result.to_array(2).unwrap();
372        let result = result.as_string_view();
373
374        // Compare with original input
375        assert_eq!(result.value(0), values[0]);
376        assert_eq!(result.value(1), values[1]);
377    }
378
379    #[test]
380    fn test_ipv6_conversions_hex_roundtrip() {
381        // Create a new test to verify that the string output from ipv6_num_to_string
382        // can be converted back using ipv6_string_to_num
383        let to_string = Ipv6NumToString::default();
384        let to_binary = Ipv6StringToNum::default();
385
386        // Hex representation of IPv6 addresses
387        let hex_values = vec![
388            "20010db8000000000000000000000001",
389            "00000000000000000000ffffc0a80001",
390        ];
391        let arg0 = ColumnarValue::Array(Arc::new(StringViewArray::from_iter_values(&hex_values)));
392
393        // Convert hex to string representation
394        let args = ScalarFunctionArgs {
395            args: vec![arg0],
396            arg_fields: vec![],
397            number_rows: 2,
398            return_field: Arc::new(Field::new("x", DataType::Utf8View, false)),
399            config_options: Arc::new(Default::default()),
400        };
401        let result = to_string.invoke_with_args(args).unwrap();
402
403        // Then convert string representation back to binary
404        let args = ScalarFunctionArgs {
405            args: vec![result],
406            arg_fields: vec![],
407            number_rows: 2,
408            return_field: Arc::new(Field::new("x", DataType::BinaryView, false)),
409            config_options: Arc::new(Default::default()),
410        };
411        let result = to_binary.invoke_with_args(args).unwrap();
412        let result = result.to_array(2).unwrap();
413        let result = result.as_binary_view();
414
415        // Expected binary values
416        let expected_bin1 = [
417            0x20, 0x01, 0x0d, 0xb8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0x01,
418        ];
419        let expected_bin2 = [
420            0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xFF, 0xFF, 0xC0, 0xA8, 0, 0x01,
421        ];
422
423        assert_eq!(result.value(0), &expected_bin1);
424        assert_eq!(result.value(1), &expected_bin2);
425    }
426}