tests_fuzz/utils/
migration.rs1use std::collections::{BTreeMap, HashSet};
16use std::time::Duration;
17
18use common_telemetry::info;
19use snafu::ResultExt;
20use sqlx::{MySql, Pool, Row};
21use store_api::storage::RegionId;
22
23use crate::error::{self};
24use crate::ir::Ident;
25use crate::utils::partition::{fetch_partitions, region_distribution};
26use crate::utils::wait::wait_condition_fn;
27
28pub async fn migrate_region(
32 e: &Pool<MySql>,
33 region_id: u64,
34 from_peer_id: u64,
35 to_peer_id: u64,
36 timeout_secs: u64,
37) -> String {
38 let sql =
39 format!("admin migrate_region({region_id}, {from_peer_id}, {to_peer_id}, {timeout_secs});");
40 let result = sqlx::query(&sql)
41 .fetch_one(e)
42 .await
43 .context(error::ExecuteQuerySnafu { sql })
44 .unwrap();
45 result.try_get(0).unwrap()
46}
47
48pub async fn wait_for_region_distribution(
50 greptime: &Pool<MySql>,
51 timeout: Duration,
52 table_name: Ident,
53 expected_region_distribution: BTreeMap<u64, HashSet<RegionId>>,
54) {
55 wait_condition_fn(
56 timeout,
57 || {
58 let greptime = greptime.clone();
59 let table_name = table_name.clone();
60 Box::pin(async move {
61 let partitions = fetch_partitions(&greptime, table_name).await.unwrap();
62 region_distribution(partitions)
63 .into_iter()
64 .map(|(datanode, regions)| {
65 (datanode, regions.into_iter().collect::<HashSet<_>>())
66 })
67 .collect::<BTreeMap<_, _>>()
68 })
69 },
70 move |region_distribution| {
71 info!("region distribution: {:?}", region_distribution);
72 if expected_region_distribution.keys().len() != region_distribution.keys().len() {
73 return false;
74 }
75
76 for (datanode, expected_regions) in &expected_region_distribution {
77 match region_distribution.get(datanode) {
78 Some(regions) => {
79 if expected_regions != regions {
80 return false;
81 }
82 }
83 None => return false,
84 }
85 }
86 true
87 },
88 Duration::from_secs(5),
89 )
90 .await
91}