tests_fuzz/utils/
migration.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashSet};
16use std::time::Duration;
17
18use common_telemetry::info;
19use snafu::ResultExt;
20use sqlx::{MySql, Pool, Row};
21use store_api::storage::RegionId;
22
23use crate::error::{self};
24use crate::ir::Ident;
25use crate::utils::partition::{fetch_partitions, region_distribution};
26use crate::utils::wait::wait_condition_fn;
27
28/// Migrates a region from one peer to another within a specified timeout.
29///
30/// Returns the procedure id.
31pub async fn migrate_region(
32    e: &Pool<MySql>,
33    region_id: u64,
34    from_peer_id: u64,
35    to_peer_id: u64,
36    timeout_secs: u64,
37) -> String {
38    let sql =
39        format!("admin migrate_region({region_id}, {from_peer_id}, {to_peer_id}, {timeout_secs});");
40    let result = sqlx::query(&sql)
41        .fetch_one(e)
42        .await
43        .context(error::ExecuteQuerySnafu { sql })
44        .unwrap();
45    result.try_get(0).unwrap()
46}
47
48/// Waits until the region distribution matches the expected distribution within a specified timeout.
49pub async fn wait_for_region_distribution(
50    greptime: &Pool<MySql>,
51    timeout: Duration,
52    table_name: Ident,
53    expected_region_distribution: BTreeMap<u64, HashSet<RegionId>>,
54) {
55    wait_condition_fn(
56        timeout,
57        || {
58            let greptime = greptime.clone();
59            let table_name = table_name.clone();
60            Box::pin(async move {
61                let partitions = fetch_partitions(&greptime, table_name).await.unwrap();
62                region_distribution(partitions)
63                    .into_iter()
64                    .map(|(datanode, regions)| {
65                        (datanode, regions.into_iter().collect::<HashSet<_>>())
66                    })
67                    .collect::<BTreeMap<_, _>>()
68            })
69        },
70        move |region_distribution| {
71            info!("region distribution: {:?}", region_distribution);
72            if expected_region_distribution.keys().len() != region_distribution.keys().len() {
73                return false;
74            }
75
76            for (datanode, expected_regions) in &expected_region_distribution {
77                match region_distribution.get(datanode) {
78                    Some(regions) => {
79                        if expected_regions != regions {
80                            return false;
81                        }
82                    }
83                    None => return false,
84                }
85            }
86            true
87        },
88        Duration::from_secs(5),
89    )
90    .await
91}