common_function/scalars/ip/
cidr.rsuse std::net::{Ipv4Addr, Ipv6Addr};
use std::str::FromStr;
use common_query::error::{InvalidFuncArgsSnafu, Result};
use common_query::prelude::{Signature, TypeSignature};
use datafusion::logical_expr::Volatility;
use datatypes::prelude::{ConcreteDataType, Value};
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{MutableVector, StringVectorBuilder, VectorRef};
use derive_more::Display;
use snafu::ensure;
use crate::function::{Function, FunctionContext};
#[derive(Clone, Debug, Default, Display)]
#[display("{}", self.name())]
pub struct Ipv4ToCidr;
impl Function for Ipv4ToCidr {
fn name(&self) -> &str {
"ipv4_to_cidr"
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::string_datatype())
}
fn signature(&self) -> Signature {
Signature::one_of(
vec![
TypeSignature::Exact(vec![ConcreteDataType::string_datatype()]),
TypeSignature::Exact(vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::uint8_datatype(),
]),
],
Volatility::Immutable,
)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 1 || columns.len() == 2,
InvalidFuncArgsSnafu {
err_msg: format!("Expected 1 or 2 arguments, got {}", columns.len())
}
);
let ip_vec = &columns[0];
let mut results = StringVectorBuilder::with_capacity(ip_vec.len());
let has_subnet_arg = columns.len() == 2;
let subnet_vec = if has_subnet_arg {
ensure!(
columns[1].len() == ip_vec.len(),
InvalidFuncArgsSnafu {
err_msg:
"Subnet mask must have the same number of elements as the IP addresses"
.to_string()
}
);
Some(&columns[1])
} else {
None
};
for i in 0..ip_vec.len() {
let ip_str = ip_vec.get(i);
let subnet = subnet_vec.map(|v| v.get(i));
let cidr = match (ip_str, subnet) {
(Value::String(s), Some(Value::UInt8(mask))) => {
let ip_str = s.as_utf8().trim();
if ip_str.is_empty() {
return InvalidFuncArgsSnafu {
err_msg: "Empty IPv4 address".to_string(),
}
.fail();
}
let ip_addr = complete_and_parse_ipv4(ip_str)?;
let mask_bits = u32::MAX.wrapping_shl(32 - mask as u32);
let masked_ip = Ipv4Addr::from(u32::from(ip_addr) & mask_bits);
Some(format!("{}/{}", masked_ip, mask))
}
(Value::String(s), None) => {
let ip_str = s.as_utf8().trim();
if ip_str.is_empty() {
return InvalidFuncArgsSnafu {
err_msg: "Empty IPv4 address".to_string(),
}
.fail();
}
let ip_addr = complete_and_parse_ipv4(ip_str)?;
let ip_bits = u32::from(ip_addr);
let dots = ip_str.chars().filter(|&c| c == '.').count();
let subnet_mask = match dots {
0 => 8, 1 => 16, 2 => 24, _ => {
let trailing_zeros = ip_bits.trailing_zeros();
if trailing_zeros % 8 == 0 {
32 - trailing_zeros.min(32) as u8
} else {
32 - (trailing_zeros as u8 / 8) * 8
}
}
};
let mask_bits = u32::MAX.wrapping_shl(32 - subnet_mask as u32);
let masked_ip = Ipv4Addr::from(ip_bits & mask_bits);
Some(format!("{}/{}", masked_ip, subnet_mask))
}
_ => None,
};
results.push(cidr.as_deref());
}
Ok(results.to_vector())
}
}
#[derive(Clone, Debug, Default, Display)]
#[display("{}", self.name())]
pub struct Ipv6ToCidr;
impl Function for Ipv6ToCidr {
fn name(&self) -> &str {
"ipv6_to_cidr"
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::string_datatype())
}
fn signature(&self) -> Signature {
Signature::one_of(
vec![
TypeSignature::Exact(vec![ConcreteDataType::string_datatype()]),
TypeSignature::Exact(vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::uint8_datatype(),
]),
],
Volatility::Immutable,
)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 1 || columns.len() == 2,
InvalidFuncArgsSnafu {
err_msg: format!("Expected 1 or 2 arguments, got {}", columns.len())
}
);
let ip_vec = &columns[0];
let size = ip_vec.len();
let mut results = StringVectorBuilder::with_capacity(size);
let has_subnet_arg = columns.len() == 2;
let subnet_vec = if has_subnet_arg {
Some(&columns[1])
} else {
None
};
for i in 0..size {
let ip_str = ip_vec.get(i);
let subnet = subnet_vec.map(|v| v.get(i));
let cidr = match (ip_str, subnet) {
(Value::String(s), Some(Value::UInt8(mask))) => {
let ip_str = s.as_utf8().trim();
if ip_str.is_empty() {
return InvalidFuncArgsSnafu {
err_msg: "Empty IPv6 address".to_string(),
}
.fail();
}
let ip_addr = complete_and_parse_ipv6(ip_str)?;
let masked_ip = mask_ipv6(&ip_addr, mask);
Some(format!("{}/{}", masked_ip, mask))
}
(Value::String(s), None) => {
let ip_str = s.as_utf8().trim();
if ip_str.is_empty() {
return InvalidFuncArgsSnafu {
err_msg: "Empty IPv6 address".to_string(),
}
.fail();
}
let ip_addr = complete_and_parse_ipv6(ip_str)?;
let subnet_mask = auto_detect_ipv6_subnet(&ip_addr);
let masked_ip = mask_ipv6(&ip_addr, subnet_mask);
Some(format!("{}/{}", masked_ip, subnet_mask))
}
_ => None,
};
results.push(cidr.as_deref());
}
Ok(results.to_vector())
}
}
fn complete_and_parse_ipv4(ip_str: &str) -> Result<Ipv4Addr> {
if let Ok(addr) = Ipv4Addr::from_str(ip_str) {
return Ok(addr);
}
let dots = ip_str.chars().filter(|&c| c == '.').count();
let completed = match dots {
0 => format!("{}.0.0.0", ip_str),
1 => format!("{}.0.0", ip_str),
2 => format!("{}.0", ip_str),
_ => ip_str.to_string(),
};
Ipv4Addr::from_str(&completed).map_err(|_| {
InvalidFuncArgsSnafu {
err_msg: format!("Invalid IPv4 address: {}", ip_str),
}
.build()
})
}
fn complete_and_parse_ipv6(ip_str: &str) -> Result<Ipv6Addr> {
if let Ok(addr) = Ipv6Addr::from_str(ip_str) {
return Ok(addr);
}
let completed = if ip_str.ends_with(':') {
format!("{}:", ip_str)
} else if !ip_str.contains("::") {
format!("{}::", ip_str)
} else {
ip_str.to_string()
};
Ipv6Addr::from_str(&completed).map_err(|_| {
InvalidFuncArgsSnafu {
err_msg: format!("Invalid IPv6 address: {}", ip_str),
}
.build()
})
}
fn mask_ipv6(addr: &Ipv6Addr, subnet: u8) -> Ipv6Addr {
let octets = addr.octets();
let mut result = [0u8; 16];
for i in 0..16 {
let bit_pos = i * 8;
if bit_pos < subnet as usize {
if bit_pos + 8 <= subnet as usize {
result[i] = octets[i];
} else {
let shift = 8 - (subnet as usize - bit_pos);
result[i] = octets[i] & (0xFF << shift);
}
}
}
Ipv6Addr::from(result)
}
fn auto_detect_ipv6_subnet(addr: &Ipv6Addr) -> u8 {
let segments = addr.segments();
let str_addr = addr.to_string();
if str_addr.starts_with("2001:db8::") || str_addr.starts_with("2001:db8:") {
return 32;
}
if str_addr == "::1" {
return 128; }
if str_addr.starts_with("fe80::") {
return 16; }
let mut subnet = 128;
for i in (0..8).rev() {
if segments[i] != 0 {
if segments[i] & 0xFF == 0 {
subnet = (i * 16) + 8;
} else {
subnet = (i + 1) * 16; }
break;
}
}
if subnet < 16 {
subnet = 64;
}
subnet as u8
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use datatypes::scalars::ScalarVector;
use datatypes::vectors::{StringVector, UInt8Vector};
use super::*;
#[test]
fn test_ipv4_to_cidr_auto() {
let func = Ipv4ToCidr;
let ctx = FunctionContext::default();
let values = vec!["192.168.1.0", "10.0.0.0", "172.16", "192"];
let input = Arc::new(StringVector::from_slice(&values)) as VectorRef;
let result = func.eval(&ctx, &[input]).unwrap();
let result = result.as_any().downcast_ref::<StringVector>().unwrap();
assert_eq!(result.get_data(0).unwrap(), "192.168.1.0/24");
assert_eq!(result.get_data(1).unwrap(), "10.0.0.0/8");
assert_eq!(result.get_data(2).unwrap(), "172.16.0.0/16");
assert_eq!(result.get_data(3).unwrap(), "192.0.0.0/8");
}
#[test]
fn test_ipv4_to_cidr_with_subnet() {
let func = Ipv4ToCidr;
let ctx = FunctionContext::default();
let ip_values = vec!["192.168.1.1", "10.0.0.1", "172.16.5.5"];
let subnet_values = vec![24u8, 16u8, 12u8];
let ip_input = Arc::new(StringVector::from_slice(&ip_values)) as VectorRef;
let subnet_input = Arc::new(UInt8Vector::from_vec(subnet_values)) as VectorRef;
let result = func.eval(&ctx, &[ip_input, subnet_input]).unwrap();
let result = result.as_any().downcast_ref::<StringVector>().unwrap();
assert_eq!(result.get_data(0).unwrap(), "192.168.1.0/24");
assert_eq!(result.get_data(1).unwrap(), "10.0.0.0/16");
assert_eq!(result.get_data(2).unwrap(), "172.16.0.0/12");
}
#[test]
fn test_ipv6_to_cidr_auto() {
let func = Ipv6ToCidr;
let ctx = FunctionContext::default();
let values = vec!["2001:db8::", "2001:db8", "fe80::1", "::1"];
let input = Arc::new(StringVector::from_slice(&values)) as VectorRef;
let result = func.eval(&ctx, &[input]).unwrap();
let result = result.as_any().downcast_ref::<StringVector>().unwrap();
assert_eq!(result.get_data(0).unwrap(), "2001:db8::/32");
assert_eq!(result.get_data(1).unwrap(), "2001:db8::/32");
assert_eq!(result.get_data(2).unwrap(), "fe80::/16");
assert_eq!(result.get_data(3).unwrap(), "::1/128"); }
#[test]
fn test_ipv6_to_cidr_with_subnet() {
let func = Ipv6ToCidr;
let ctx = FunctionContext::default();
let ip_values = vec!["2001:db8::", "fe80::1", "2001:db8:1234::"];
let subnet_values = vec![48u8, 10u8, 56u8];
let ip_input = Arc::new(StringVector::from_slice(&ip_values)) as VectorRef;
let subnet_input = Arc::new(UInt8Vector::from_vec(subnet_values)) as VectorRef;
let result = func.eval(&ctx, &[ip_input, subnet_input]).unwrap();
let result = result.as_any().downcast_ref::<StringVector>().unwrap();
assert_eq!(result.get_data(0).unwrap(), "2001:db8::/48");
assert_eq!(result.get_data(1).unwrap(), "fe80::/10");
assert_eq!(result.get_data(2).unwrap(), "2001:db8:1234::/56");
}
#[test]
fn test_invalid_inputs() {
let ipv4_func = Ipv4ToCidr;
let ipv6_func = Ipv6ToCidr;
let ctx = FunctionContext::default();
let empty_values = vec![""];
let empty_input = Arc::new(StringVector::from_slice(&empty_values)) as VectorRef;
let ipv4_result = ipv4_func.eval(&ctx, &[empty_input.clone()]);
let ipv6_result = ipv6_func.eval(&ctx, &[empty_input.clone()]);
assert!(ipv4_result.is_err());
assert!(ipv6_result.is_err());
let invalid_values = vec!["not an ip", "192.168.1.256", "zzzz::ffff"];
let invalid_input = Arc::new(StringVector::from_slice(&invalid_values)) as VectorRef;
let ipv4_result = ipv4_func.eval(&ctx, &[invalid_input.clone()]);
assert!(ipv4_result.is_err());
}
}