common_function/scalars/ip/
ipv6.rs1use std::net::{Ipv4Addr, Ipv6Addr};
16use std::str::FromStr;
17
18use common_query::error::{InvalidFuncArgsSnafu, Result};
19use common_query::prelude::{Signature, TypeSignature};
20use datafusion::logical_expr::Volatility;
21use datatypes::prelude::{ConcreteDataType, Value};
22use datatypes::scalars::ScalarVectorBuilder;
23use datatypes::vectors::{BinaryVectorBuilder, MutableVector, StringVectorBuilder, VectorRef};
24use derive_more::Display;
25use snafu::ensure;
26
27use crate::function::{Function, FunctionContext};
28
29#[derive(Clone, Debug, Default, Display)]
35#[display("{}", self.name())]
36pub struct Ipv6NumToString;
37
38impl Function for Ipv6NumToString {
39 fn name(&self) -> &str {
40 "ipv6_num_to_string"
41 }
42
43 fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
44 Ok(ConcreteDataType::string_datatype())
45 }
46
47 fn signature(&self) -> Signature {
48 Signature::new(
49 TypeSignature::Exact(vec![ConcreteDataType::string_datatype()]),
50 Volatility::Immutable,
51 )
52 }
53
54 fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
55 ensure!(
56 columns.len() == 1,
57 InvalidFuncArgsSnafu {
58 err_msg: format!("Expected 1 argument, got {}", columns.len())
59 }
60 );
61
62 let hex_vec = &columns[0];
63 let size = hex_vec.len();
64 let mut results = StringVectorBuilder::with_capacity(size);
65
66 for i in 0..size {
67 let hex_str = hex_vec.get(i);
68 let ip_str = match hex_str {
69 Value::String(s) => {
70 let hex_str = s.as_utf8().to_lowercase();
71
72 let bytes = if hex_str.len() == 32 {
74 let mut bytes = [0u8; 16];
75 for i in 0..16 {
76 let byte_str = &hex_str[i * 2..i * 2 + 2];
77 bytes[i] = u8::from_str_radix(byte_str, 16).map_err(|_| {
78 InvalidFuncArgsSnafu {
79 err_msg: format!("Invalid hex characters in '{}'", byte_str),
80 }
81 .build()
82 })?;
83 }
84 bytes
85 } else {
86 return InvalidFuncArgsSnafu {
87 err_msg: format!("Expected 32 hex characters, got {}", hex_str.len()),
88 }
89 .fail();
90 };
91
92 let addr = Ipv6Addr::from(bytes);
94
95 if let Some(ipv4) = addr.to_ipv4() {
97 if addr.octets()[0..10].iter().all(|&b| b == 0)
98 && addr.octets()[10] == 0xFF
99 && addr.octets()[11] == 0xFF
100 {
101 Some(format!("::ffff:{}", ipv4))
102 } else {
103 Some(addr.to_string())
104 }
105 } else {
106 Some(addr.to_string())
107 }
108 }
109 _ => None,
110 };
111
112 results.push(ip_str.as_deref());
113 }
114
115 Ok(results.to_vector())
116 }
117}
118
119#[derive(Clone, Debug, Default, Display)]
127#[display("{}", self.name())]
128pub struct Ipv6StringToNum;
129
130impl Function for Ipv6StringToNum {
131 fn name(&self) -> &str {
132 "ipv6_string_to_num"
133 }
134
135 fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
136 Ok(ConcreteDataType::binary_datatype())
137 }
138
139 fn signature(&self) -> Signature {
140 Signature::new(
141 TypeSignature::Exact(vec![ConcreteDataType::string_datatype()]),
142 Volatility::Immutable,
143 )
144 }
145
146 fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
147 ensure!(
148 columns.len() == 1,
149 InvalidFuncArgsSnafu {
150 err_msg: format!("Expected 1 argument, got {}", columns.len())
151 }
152 );
153
154 let ip_vec = &columns[0];
155 let size = ip_vec.len();
156 let mut results = BinaryVectorBuilder::with_capacity(size);
157
158 for i in 0..size {
159 let ip_str = ip_vec.get(i);
160 let ip_binary = match ip_str {
161 Value::String(s) => {
162 let addr_str = s.as_utf8();
163
164 let addr = if let Ok(ipv6) = Ipv6Addr::from_str(addr_str) {
165 ipv6
167 } else if let Ok(ipv4) = Ipv4Addr::from_str(addr_str) {
168 ipv4.to_ipv6_mapped()
170 } else {
171 return InvalidFuncArgsSnafu {
173 err_msg: format!("Invalid IPv6 address format: {}", addr_str),
174 }
175 .fail();
176 };
177
178 let octets = addr.octets();
180 Some(octets.to_vec())
181 }
182 _ => None,
183 };
184
185 results.push(ip_binary.as_deref());
186 }
187
188 Ok(results.to_vector())
189 }
190}
191
192#[cfg(test)]
193mod tests {
194 use std::fmt::Write;
195 use std::sync::Arc;
196
197 use datatypes::scalars::ScalarVector;
198 use datatypes::vectors::{BinaryVector, StringVector, Vector};
199
200 use super::*;
201
202 #[test]
203 fn test_ipv6_num_to_string() {
204 let func = Ipv6NumToString;
205 let ctx = FunctionContext::default();
206
207 let hex_str1 = "20010db8000000000000000000000001";
209
210 let hex_str2 = "00000000000000000000ffffc0a80001";
212
213 let values = vec![hex_str1, hex_str2];
214 let input = Arc::new(StringVector::from_slice(&values)) as VectorRef;
215
216 let result = func.eval(&ctx, &[input]).unwrap();
217 let result = result.as_any().downcast_ref::<StringVector>().unwrap();
218
219 assert_eq!(result.get_data(0).unwrap(), "2001:db8::1");
220 assert_eq!(result.get_data(1).unwrap(), "::ffff:192.168.0.1");
221 }
222
223 #[test]
224 fn test_ipv6_num_to_string_uppercase() {
225 let func = Ipv6NumToString;
226 let ctx = FunctionContext::default();
227
228 let hex_str = "20010DB8000000000000000000000001";
230
231 let values = vec![hex_str];
232 let input = Arc::new(StringVector::from_slice(&values)) as VectorRef;
233
234 let result = func.eval(&ctx, &[input]).unwrap();
235 let result = result.as_any().downcast_ref::<StringVector>().unwrap();
236
237 assert_eq!(result.get_data(0).unwrap(), "2001:db8::1");
238 }
239
240 #[test]
241 fn test_ipv6_num_to_string_error() {
242 let func = Ipv6NumToString;
243 let ctx = FunctionContext::default();
244
245 let hex_str = "20010db8";
247
248 let values = vec![hex_str];
249 let input = Arc::new(StringVector::from_slice(&values)) as VectorRef;
250
251 let result = func.eval(&ctx, &[input]);
253 assert!(result.is_err());
254
255 let error_msg = result.unwrap_err().to_string();
257 assert!(error_msg.contains("Expected 32 hex characters"));
258 }
259
260 #[test]
261 fn test_ipv6_string_to_num() {
262 let func = Ipv6StringToNum;
263 let ctx = FunctionContext::default();
264
265 let values = vec!["2001:db8::1", "::ffff:192.168.0.1", "192.168.0.1"];
266 let input = Arc::new(StringVector::from_slice(&values)) as VectorRef;
267
268 let result = func.eval(&ctx, &[input]).unwrap();
269 let result = result.as_any().downcast_ref::<BinaryVector>().unwrap();
270
271 let expected_1 = [
273 0x20, 0x01, 0x0d, 0xb8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0x01,
274 ];
275
276 let expected_2 = [
278 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xFF, 0xFF, 0xC0, 0xA8, 0, 0x01,
279 ];
280
281 assert_eq!(result.get_data(0).unwrap(), &expected_1);
282 assert_eq!(result.get_data(1).unwrap(), &expected_2);
283 assert_eq!(result.get_data(2).unwrap(), &expected_2);
284 }
285
286 #[test]
287 fn test_ipv6_conversions_roundtrip() {
288 let to_num = Ipv6StringToNum;
289 let to_string = Ipv6NumToString;
290 let ctx = FunctionContext::default();
291
292 let values = vec!["2001:db8::1", "::ffff:192.168.0.1"];
294 let input = Arc::new(StringVector::from_slice(&values)) as VectorRef;
295
296 let binary_result = to_num.eval(&ctx, &[input.clone()]).unwrap();
298
299 let mut hex_strings = Vec::new();
301 let binary_vector = binary_result
302 .as_any()
303 .downcast_ref::<BinaryVector>()
304 .unwrap();
305
306 for i in 0..binary_vector.len() {
307 let bytes = binary_vector.get_data(i).unwrap();
308 let hex = bytes.iter().fold(String::new(), |mut acc, b| {
309 write!(&mut acc, "{:02x}", b).unwrap();
310 acc
311 });
312 hex_strings.push(hex);
313 }
314
315 let hex_str_refs: Vec<&str> = hex_strings.iter().map(|s| s.as_str()).collect();
316 let hex_input = Arc::new(StringVector::from_slice(&hex_str_refs)) as VectorRef;
317
318 let string_result = to_string.eval(&ctx, &[hex_input]).unwrap();
320 let str_result = string_result
321 .as_any()
322 .downcast_ref::<StringVector>()
323 .unwrap();
324
325 assert_eq!(str_result.get_data(0).unwrap(), values[0]);
327 assert_eq!(str_result.get_data(1).unwrap(), values[1]);
328 }
329
330 #[test]
331 fn test_ipv6_conversions_hex_roundtrip() {
332 let to_string = Ipv6NumToString;
335 let to_binary = Ipv6StringToNum;
336 let ctx = FunctionContext::default();
337
338 let hex_values = vec![
340 "20010db8000000000000000000000001",
341 "00000000000000000000ffffc0a80001",
342 ];
343 let hex_input = Arc::new(StringVector::from_slice(&hex_values)) as VectorRef;
344
345 let string_result = to_string.eval(&ctx, &[hex_input]).unwrap();
347
348 let binary_result = to_binary.eval(&ctx, &[string_result]).unwrap();
350 let bin_result = binary_result
351 .as_any()
352 .downcast_ref::<BinaryVector>()
353 .unwrap();
354
355 let expected_bin1 = [
357 0x20, 0x01, 0x0d, 0xb8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0x01,
358 ];
359 let expected_bin2 = [
360 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xFF, 0xFF, 0xC0, 0xA8, 0, 0x01,
361 ];
362
363 assert_eq!(bin_result.get_data(0).unwrap(), &expected_bin1);
364 assert_eq!(bin_result.get_data(1).unwrap(), &expected_bin2);
365 }
366}