common_function/scalars/ip/
ipv6.rs1use std::net::{Ipv4Addr, Ipv6Addr};
16use std::str::FromStr;
17
18use common_query::error::{InvalidFuncArgsSnafu, Result};
19use datafusion::arrow::datatypes::DataType;
20use datafusion_expr::{Signature, Volatility};
21use datatypes::prelude::Value;
22use datatypes::scalars::ScalarVectorBuilder;
23use datatypes::vectors::{BinaryVectorBuilder, MutableVector, StringVectorBuilder, VectorRef};
24use derive_more::Display;
25use snafu::ensure;
26
27use crate::function::{Function, FunctionContext};
28
29#[derive(Clone, Debug, Default, Display)]
35#[display("{}", self.name())]
36pub struct Ipv6NumToString;
37
38impl Function for Ipv6NumToString {
39 fn name(&self) -> &str {
40 "ipv6_num_to_string"
41 }
42
43 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
44 Ok(DataType::Utf8)
45 }
46
47 fn signature(&self) -> Signature {
48 Signature::string(1, Volatility::Immutable)
49 }
50
51 fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
52 ensure!(
53 columns.len() == 1,
54 InvalidFuncArgsSnafu {
55 err_msg: format!("Expected 1 argument, got {}", columns.len())
56 }
57 );
58
59 let hex_vec = &columns[0];
60 let size = hex_vec.len();
61 let mut results = StringVectorBuilder::with_capacity(size);
62
63 for i in 0..size {
64 let hex_str = hex_vec.get(i);
65 let ip_str = match hex_str {
66 Value::String(s) => {
67 let hex_str = s.as_utf8().to_lowercase();
68
69 let bytes = if hex_str.len() == 32 {
71 let mut bytes = [0u8; 16];
72 for i in 0..16 {
73 let byte_str = &hex_str[i * 2..i * 2 + 2];
74 bytes[i] = u8::from_str_radix(byte_str, 16).map_err(|_| {
75 InvalidFuncArgsSnafu {
76 err_msg: format!("Invalid hex characters in '{}'", byte_str),
77 }
78 .build()
79 })?;
80 }
81 bytes
82 } else {
83 return InvalidFuncArgsSnafu {
84 err_msg: format!("Expected 32 hex characters, got {}", hex_str.len()),
85 }
86 .fail();
87 };
88
89 let addr = Ipv6Addr::from(bytes);
91
92 if let Some(ipv4) = addr.to_ipv4() {
94 if addr.octets()[0..10].iter().all(|&b| b == 0)
95 && addr.octets()[10] == 0xFF
96 && addr.octets()[11] == 0xFF
97 {
98 Some(format!("::ffff:{}", ipv4))
99 } else {
100 Some(addr.to_string())
101 }
102 } else {
103 Some(addr.to_string())
104 }
105 }
106 _ => None,
107 };
108
109 results.push(ip_str.as_deref());
110 }
111
112 Ok(results.to_vector())
113 }
114}
115
116#[derive(Clone, Debug, Default, Display)]
124#[display("{}", self.name())]
125pub struct Ipv6StringToNum;
126
127impl Function for Ipv6StringToNum {
128 fn name(&self) -> &str {
129 "ipv6_string_to_num"
130 }
131
132 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
133 Ok(DataType::Binary)
134 }
135
136 fn signature(&self) -> Signature {
137 Signature::string(1, Volatility::Immutable)
138 }
139
140 fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
141 ensure!(
142 columns.len() == 1,
143 InvalidFuncArgsSnafu {
144 err_msg: format!("Expected 1 argument, got {}", columns.len())
145 }
146 );
147
148 let ip_vec = &columns[0];
149 let size = ip_vec.len();
150 let mut results = BinaryVectorBuilder::with_capacity(size);
151
152 for i in 0..size {
153 let ip_str = ip_vec.get(i);
154 let ip_binary = match ip_str {
155 Value::String(s) => {
156 let addr_str = s.as_utf8();
157
158 let addr = if let Ok(ipv6) = Ipv6Addr::from_str(addr_str) {
159 ipv6
161 } else if let Ok(ipv4) = Ipv4Addr::from_str(addr_str) {
162 ipv4.to_ipv6_mapped()
164 } else {
165 return InvalidFuncArgsSnafu {
167 err_msg: format!("Invalid IPv6 address format: {}", addr_str),
168 }
169 .fail();
170 };
171
172 let octets = addr.octets();
174 Some(octets.to_vec())
175 }
176 _ => None,
177 };
178
179 results.push(ip_binary.as_deref());
180 }
181
182 Ok(results.to_vector())
183 }
184}
185
186#[cfg(test)]
187mod tests {
188 use std::fmt::Write;
189 use std::sync::Arc;
190
191 use datatypes::scalars::ScalarVector;
192 use datatypes::vectors::{BinaryVector, StringVector, Vector};
193
194 use super::*;
195
196 #[test]
197 fn test_ipv6_num_to_string() {
198 let func = Ipv6NumToString;
199 let ctx = FunctionContext::default();
200
201 let hex_str1 = "20010db8000000000000000000000001";
203
204 let hex_str2 = "00000000000000000000ffffc0a80001";
206
207 let values = vec![hex_str1, hex_str2];
208 let input = Arc::new(StringVector::from_slice(&values)) as VectorRef;
209
210 let result = func.eval(&ctx, &[input]).unwrap();
211 let result = result.as_any().downcast_ref::<StringVector>().unwrap();
212
213 assert_eq!(result.get_data(0).unwrap(), "2001:db8::1");
214 assert_eq!(result.get_data(1).unwrap(), "::ffff:192.168.0.1");
215 }
216
217 #[test]
218 fn test_ipv6_num_to_string_uppercase() {
219 let func = Ipv6NumToString;
220 let ctx = FunctionContext::default();
221
222 let hex_str = "20010DB8000000000000000000000001";
224
225 let values = vec![hex_str];
226 let input = Arc::new(StringVector::from_slice(&values)) as VectorRef;
227
228 let result = func.eval(&ctx, &[input]).unwrap();
229 let result = result.as_any().downcast_ref::<StringVector>().unwrap();
230
231 assert_eq!(result.get_data(0).unwrap(), "2001:db8::1");
232 }
233
234 #[test]
235 fn test_ipv6_num_to_string_error() {
236 let func = Ipv6NumToString;
237 let ctx = FunctionContext::default();
238
239 let hex_str = "20010db8";
241
242 let values = vec![hex_str];
243 let input = Arc::new(StringVector::from_slice(&values)) as VectorRef;
244
245 let result = func.eval(&ctx, &[input]);
247 assert!(result.is_err());
248
249 let error_msg = result.unwrap_err().to_string();
251 assert!(error_msg.contains("Expected 32 hex characters"));
252 }
253
254 #[test]
255 fn test_ipv6_string_to_num() {
256 let func = Ipv6StringToNum;
257 let ctx = FunctionContext::default();
258
259 let values = vec!["2001:db8::1", "::ffff:192.168.0.1", "192.168.0.1"];
260 let input = Arc::new(StringVector::from_slice(&values)) as VectorRef;
261
262 let result = func.eval(&ctx, &[input]).unwrap();
263 let result = result.as_any().downcast_ref::<BinaryVector>().unwrap();
264
265 let expected_1 = [
267 0x20, 0x01, 0x0d, 0xb8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0x01,
268 ];
269
270 let expected_2 = [
272 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xFF, 0xFF, 0xC0, 0xA8, 0, 0x01,
273 ];
274
275 assert_eq!(result.get_data(0).unwrap(), &expected_1);
276 assert_eq!(result.get_data(1).unwrap(), &expected_2);
277 assert_eq!(result.get_data(2).unwrap(), &expected_2);
278 }
279
280 #[test]
281 fn test_ipv6_conversions_roundtrip() {
282 let to_num = Ipv6StringToNum;
283 let to_string = Ipv6NumToString;
284 let ctx = FunctionContext::default();
285
286 let values = vec!["2001:db8::1", "::ffff:192.168.0.1"];
288 let input = Arc::new(StringVector::from_slice(&values)) as VectorRef;
289
290 let binary_result = to_num.eval(&ctx, std::slice::from_ref(&input)).unwrap();
292
293 let mut hex_strings = Vec::new();
295 let binary_vector = binary_result
296 .as_any()
297 .downcast_ref::<BinaryVector>()
298 .unwrap();
299
300 for i in 0..binary_vector.len() {
301 let bytes = binary_vector.get_data(i).unwrap();
302 let hex = bytes.iter().fold(String::new(), |mut acc, b| {
303 write!(&mut acc, "{:02x}", b).unwrap();
304 acc
305 });
306 hex_strings.push(hex);
307 }
308
309 let hex_str_refs: Vec<&str> = hex_strings.iter().map(|s| s.as_str()).collect();
310 let hex_input = Arc::new(StringVector::from_slice(&hex_str_refs)) as VectorRef;
311
312 let string_result = to_string.eval(&ctx, &[hex_input]).unwrap();
314 let str_result = string_result
315 .as_any()
316 .downcast_ref::<StringVector>()
317 .unwrap();
318
319 assert_eq!(str_result.get_data(0).unwrap(), values[0]);
321 assert_eq!(str_result.get_data(1).unwrap(), values[1]);
322 }
323
324 #[test]
325 fn test_ipv6_conversions_hex_roundtrip() {
326 let to_string = Ipv6NumToString;
329 let to_binary = Ipv6StringToNum;
330 let ctx = FunctionContext::default();
331
332 let hex_values = vec![
334 "20010db8000000000000000000000001",
335 "00000000000000000000ffffc0a80001",
336 ];
337 let hex_input = Arc::new(StringVector::from_slice(&hex_values)) as VectorRef;
338
339 let string_result = to_string.eval(&ctx, &[hex_input]).unwrap();
341
342 let binary_result = to_binary.eval(&ctx, &[string_result]).unwrap();
344 let bin_result = binary_result
345 .as_any()
346 .downcast_ref::<BinaryVector>()
347 .unwrap();
348
349 let expected_bin1 = [
351 0x20, 0x01, 0x0d, 0xb8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0x01,
352 ];
353 let expected_bin2 = [
354 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xFF, 0xFF, 0xC0, 0xA8, 0, 0x01,
355 ];
356
357 assert_eq!(bin_result.get_data(0).unwrap(), &expected_bin1);
358 assert_eq!(bin_result.get_data(1).unwrap(), &expected_bin2);
359 }
360}