common_function/table/
migrate_region.rsuse std::time::Duration;
use common_macro::admin_fn;
use common_meta::rpc::procedure::MigrateRegionRequest;
use common_query::error::{InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result};
use common_query::prelude::{Signature, TypeSignature, Volatility};
use datatypes::prelude::ConcreteDataType;
use datatypes::value::{Value, ValueRef};
use session::context::QueryContextRef;
use crate::handlers::ProcedureServiceHandlerRef;
use crate::helper::cast_u64;
const DEFAULT_TIMEOUT_SECS: u64 = 30;
#[admin_fn(
name = MigrateRegionFunction,
display_name = migrate_region,
sig_fn = signature,
ret = string
)]
pub(crate) async fn migrate_region(
procedure_service_handler: &ProcedureServiceHandlerRef,
_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
let (region_id, from_peer, to_peer, timeout) = match params.len() {
3 => {
let region_id = cast_u64(¶ms[0])?;
let from_peer = cast_u64(¶ms[1])?;
let to_peer = cast_u64(¶ms[2])?;
(region_id, from_peer, to_peer, Some(DEFAULT_TIMEOUT_SECS))
}
4 => {
let region_id = cast_u64(¶ms[0])?;
let from_peer = cast_u64(¶ms[1])?;
let to_peer = cast_u64(¶ms[2])?;
let replay_timeout = cast_u64(¶ms[3])?;
(region_id, from_peer, to_peer, replay_timeout)
}
size => {
return InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly 3 or 4, have: {}",
size
),
}
.fail();
}
};
match (region_id, from_peer, to_peer, timeout) {
(Some(region_id), Some(from_peer), Some(to_peer), Some(timeout)) => {
let pid = procedure_service_handler
.migrate_region(MigrateRegionRequest {
region_id,
from_peer,
to_peer,
timeout: Duration::from_secs(timeout),
})
.await?;
match pid {
Some(pid) => Ok(Value::from(pid)),
None => Ok(Value::Null),
}
}
_ => Ok(Value::Null),
}
}
fn signature() -> Signature {
Signature::one_of(
vec![
TypeSignature::Uniform(3, ConcreteDataType::numerics()),
TypeSignature::Uniform(4, ConcreteDataType::numerics()),
],
Volatility::Immutable,
)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_query::prelude::TypeSignature;
use datatypes::vectors::{StringVector, UInt64Vector, VectorRef};
use super::*;
use crate::function::{AsyncFunction, FunctionContext};
#[test]
fn test_migrate_region_misc() {
let f = MigrateRegionFunction;
assert_eq!("migrate_region", f.name());
assert_eq!(
ConcreteDataType::string_datatype(),
f.return_type(&[]).unwrap()
);
assert!(matches!(f.signature(),
Signature {
type_signature: TypeSignature::OneOf(sigs),
volatility: Volatility::Immutable
} if sigs.len() == 2));
}
#[tokio::test]
async fn test_missing_procedure_service() {
let f = MigrateRegionFunction;
let args = vec![1, 1, 1];
let args = args
.into_iter()
.map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _)
.collect::<Vec<_>>();
let result = f.eval(FunctionContext::default(), &args).await.unwrap_err();
assert_eq!(
"Missing ProcedureServiceHandler, not expected",
result.to_string()
);
}
#[tokio::test]
async fn test_migrate_region() {
let f = MigrateRegionFunction;
let args = vec![1, 1, 1];
let args = args
.into_iter()
.map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _)
.collect::<Vec<_>>();
let result = f.eval(FunctionContext::mock(), &args).await.unwrap();
let expect: VectorRef = Arc::new(StringVector::from(vec!["test_pid"]));
assert_eq!(expect, result);
}
}