common_function/admin/
migrate_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Duration;
16
17use common_macro::admin_fn;
18use common_meta::rpc::procedure::MigrateRegionRequest;
19use common_query::error::{InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result};
20use common_query::prelude::{Signature, TypeSignature, Volatility};
21use datatypes::prelude::ConcreteDataType;
22use datatypes::value::{Value, ValueRef};
23use session::context::QueryContextRef;
24
25use crate::handlers::ProcedureServiceHandlerRef;
26use crate::helper::cast_u64;
27
28/// The default timeout for migrate region procedure.
29const DEFAULT_TIMEOUT_SECS: u64 = 300;
30
31/// A function to migrate a region from source peer to target peer.
32/// Returns the submitted procedure id if success. Only available in cluster mode.
33///
34/// - `migrate_region(region_id, from_peer, to_peer)`, with timeout(300 seconds).
35/// - `migrate_region(region_id, from_peer, to_peer, timeout(secs))`.
36///
37/// The parameters:
38/// - `region_id`:  the region id
39/// - `from_peer`:  the source peer id
40/// - `to_peer`:  the target peer id
41#[admin_fn(
42    name = MigrateRegionFunction,
43    display_name = migrate_region,
44    sig_fn = signature,
45    ret = string
46)]
47pub(crate) async fn migrate_region(
48    procedure_service_handler: &ProcedureServiceHandlerRef,
49    _ctx: &QueryContextRef,
50    params: &[ValueRef<'_>],
51) -> Result<Value> {
52    let (region_id, from_peer, to_peer, timeout) = match params.len() {
53        3 => {
54            let region_id = cast_u64(&params[0])?;
55            let from_peer = cast_u64(&params[1])?;
56            let to_peer = cast_u64(&params[2])?;
57
58            (region_id, from_peer, to_peer, Some(DEFAULT_TIMEOUT_SECS))
59        }
60
61        4 => {
62            let region_id = cast_u64(&params[0])?;
63            let from_peer = cast_u64(&params[1])?;
64            let to_peer = cast_u64(&params[2])?;
65            let replay_timeout = cast_u64(&params[3])?;
66
67            (region_id, from_peer, to_peer, replay_timeout)
68        }
69
70        size => {
71            return InvalidFuncArgsSnafu {
72                err_msg: format!(
73                    "The length of the args is not correct, expect exactly 3 or 4, have: {}",
74                    size
75                ),
76            }
77            .fail();
78        }
79    };
80
81    match (region_id, from_peer, to_peer, timeout) {
82        (Some(region_id), Some(from_peer), Some(to_peer), Some(timeout)) => {
83            let pid = procedure_service_handler
84                .migrate_region(MigrateRegionRequest {
85                    region_id,
86                    from_peer,
87                    to_peer,
88                    timeout: Duration::from_secs(timeout),
89                })
90                .await?;
91
92            match pid {
93                Some(pid) => Ok(Value::from(pid)),
94                None => Ok(Value::Null),
95            }
96        }
97
98        _ => Ok(Value::Null),
99    }
100}
101
102fn signature() -> Signature {
103    Signature::one_of(
104        vec![
105            // migrate_region(region_id, from_peer, to_peer)
106            TypeSignature::Uniform(3, ConcreteDataType::numerics()),
107            // migrate_region(region_id, from_peer, to_peer, timeout(secs))
108            TypeSignature::Uniform(4, ConcreteDataType::numerics()),
109        ],
110        Volatility::Immutable,
111    )
112}
113
114#[cfg(test)]
115mod tests {
116    use std::sync::Arc;
117
118    use common_query::prelude::TypeSignature;
119    use datatypes::vectors::{StringVector, UInt64Vector, VectorRef};
120
121    use super::*;
122    use crate::function::{AsyncFunction, FunctionContext};
123
124    #[test]
125    fn test_migrate_region_misc() {
126        let f = MigrateRegionFunction;
127        assert_eq!("migrate_region", f.name());
128        assert_eq!(
129            ConcreteDataType::string_datatype(),
130            f.return_type(&[]).unwrap()
131        );
132        assert!(matches!(f.signature(),
133                         Signature {
134                             type_signature: TypeSignature::OneOf(sigs),
135                             volatility: Volatility::Immutable
136                         } if sigs.len() == 2));
137    }
138
139    #[tokio::test]
140    async fn test_missing_procedure_service() {
141        let f = MigrateRegionFunction;
142
143        let args = vec![1, 1, 1];
144
145        let args = args
146            .into_iter()
147            .map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _)
148            .collect::<Vec<_>>();
149
150        let result = f.eval(FunctionContext::default(), &args).await.unwrap_err();
151        assert_eq!(
152            "Missing ProcedureServiceHandler, not expected",
153            result.to_string()
154        );
155    }
156
157    #[tokio::test]
158    async fn test_migrate_region() {
159        let f = MigrateRegionFunction;
160
161        let args = vec![1, 1, 1];
162
163        let args = args
164            .into_iter()
165            .map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _)
166            .collect::<Vec<_>>();
167
168        let result = f.eval(FunctionContext::mock(), &args).await.unwrap();
169
170        let expect: VectorRef = Arc::new(StringVector::from(vec!["test_pid"]));
171        assert_eq!(expect, result);
172    }
173}