common_function/admin/
migrate_region.rs1use std::time::Duration;
16
17use common_macro::admin_fn;
18use common_meta::rpc::procedure::MigrateRegionRequest;
19use common_query::error::{InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result};
20use common_query::prelude::{Signature, TypeSignature, Volatility};
21use datatypes::prelude::ConcreteDataType;
22use datatypes::value::{Value, ValueRef};
23use session::context::QueryContextRef;
24
25use crate::handlers::ProcedureServiceHandlerRef;
26use crate::helper::cast_u64;
27
28const DEFAULT_TIMEOUT_SECS: u64 = 300;
30
31#[admin_fn(
42 name = MigrateRegionFunction,
43 display_name = migrate_region,
44 sig_fn = signature,
45 ret = string
46)]
47pub(crate) async fn migrate_region(
48 procedure_service_handler: &ProcedureServiceHandlerRef,
49 _ctx: &QueryContextRef,
50 params: &[ValueRef<'_>],
51) -> Result<Value> {
52 let (region_id, from_peer, to_peer, timeout) = match params.len() {
53 3 => {
54 let region_id = cast_u64(¶ms[0])?;
55 let from_peer = cast_u64(¶ms[1])?;
56 let to_peer = cast_u64(¶ms[2])?;
57
58 (region_id, from_peer, to_peer, Some(DEFAULT_TIMEOUT_SECS))
59 }
60
61 4 => {
62 let region_id = cast_u64(¶ms[0])?;
63 let from_peer = cast_u64(¶ms[1])?;
64 let to_peer = cast_u64(¶ms[2])?;
65 let replay_timeout = cast_u64(¶ms[3])?;
66
67 (region_id, from_peer, to_peer, replay_timeout)
68 }
69
70 size => {
71 return InvalidFuncArgsSnafu {
72 err_msg: format!(
73 "The length of the args is not correct, expect exactly 3 or 4, have: {}",
74 size
75 ),
76 }
77 .fail();
78 }
79 };
80
81 match (region_id, from_peer, to_peer, timeout) {
82 (Some(region_id), Some(from_peer), Some(to_peer), Some(timeout)) => {
83 let pid = procedure_service_handler
84 .migrate_region(MigrateRegionRequest {
85 region_id,
86 from_peer,
87 to_peer,
88 timeout: Duration::from_secs(timeout),
89 })
90 .await?;
91
92 match pid {
93 Some(pid) => Ok(Value::from(pid)),
94 None => Ok(Value::Null),
95 }
96 }
97
98 _ => Ok(Value::Null),
99 }
100}
101
102fn signature() -> Signature {
103 Signature::one_of(
104 vec![
105 TypeSignature::Uniform(3, ConcreteDataType::numerics()),
107 TypeSignature::Uniform(4, ConcreteDataType::numerics()),
109 ],
110 Volatility::Immutable,
111 )
112}
113
114#[cfg(test)]
115mod tests {
116 use std::sync::Arc;
117
118 use common_query::prelude::TypeSignature;
119 use datatypes::vectors::{StringVector, UInt64Vector, VectorRef};
120
121 use super::*;
122 use crate::function::{AsyncFunction, FunctionContext};
123
124 #[test]
125 fn test_migrate_region_misc() {
126 let f = MigrateRegionFunction;
127 assert_eq!("migrate_region", f.name());
128 assert_eq!(
129 ConcreteDataType::string_datatype(),
130 f.return_type(&[]).unwrap()
131 );
132 assert!(matches!(f.signature(),
133 Signature {
134 type_signature: TypeSignature::OneOf(sigs),
135 volatility: Volatility::Immutable
136 } if sigs.len() == 2));
137 }
138
139 #[tokio::test]
140 async fn test_missing_procedure_service() {
141 let f = MigrateRegionFunction;
142
143 let args = vec![1, 1, 1];
144
145 let args = args
146 .into_iter()
147 .map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _)
148 .collect::<Vec<_>>();
149
150 let result = f.eval(FunctionContext::default(), &args).await.unwrap_err();
151 assert_eq!(
152 "Missing ProcedureServiceHandler, not expected",
153 result.to_string()
154 );
155 }
156
157 #[tokio::test]
158 async fn test_migrate_region() {
159 let f = MigrateRegionFunction;
160
161 let args = vec![1, 1, 1];
162
163 let args = args
164 .into_iter()
165 .map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _)
166 .collect::<Vec<_>>();
167
168 let result = f.eval(FunctionContext::mock(), &args).await.unwrap();
169
170 let expect: VectorRef = Arc::new(StringVector::from(vec!["test_pid"]));
171 assert_eq!(expect, result);
172 }
173}