tests_fuzz/utils/
partition.rs1use std::collections::BTreeMap;
16use std::time::Duration;
17
18use common_telemetry::info;
19use snafu::ResultExt;
20use sqlx::MySqlPool;
21use store_api::storage::RegionId;
22
23use super::wait::wait_condition_fn;
24use crate::error::{self, Result};
25use crate::ir::Ident;
26
27#[derive(Debug, sqlx::FromRow)]
28pub struct Partition {
29 pub datanode_id: u64,
30 pub region_id: u64,
31}
32
33#[derive(Debug, sqlx::FromRow)]
34pub struct PartitionCount {
35 pub count: i64,
36}
37
38pub async fn count_partitions(db: &MySqlPool, datanode_id: u64) -> Result<PartitionCount> {
39 let sql = "select count(1) as count from information_schema.region_peers where peer_id == ?";
40 sqlx::query_as::<_, PartitionCount>(sql)
41 .bind(datanode_id)
42 .fetch_one(db)
43 .await
44 .context(error::ExecuteQuerySnafu { sql })
45}
46
47pub async fn fetch_partition(db: &MySqlPool, region_id: u64) -> Result<Partition> {
49 let sql = "select region_id, peer_id as datanode_id from information_schema.region_peers where region_id = ?;";
50 sqlx::query_as::<_, Partition>(sql)
51 .bind(region_id)
52 .fetch_one(db)
53 .await
54 .context(error::ExecuteQuerySnafu { sql })
55}
56
57pub async fn fetch_partitions(db: &MySqlPool, table_name: Ident) -> Result<Vec<Partition>> {
59 let sql = "select b.peer_id as datanode_id, a.greptime_partition_id as region_id
60from information_schema.partitions a left join information_schema.region_peers b
61on a.greptime_partition_id = b.region_id where a.table_name= ? order by datanode_id asc;";
62 sqlx::query_as::<_, Partition>(sql)
63 .bind(table_name.value.to_string())
64 .fetch_all(db)
65 .await
66 .context(error::ExecuteQuerySnafu { sql })
67}
68
69pub fn region_distribution(partitions: Vec<Partition>) -> BTreeMap<u64, Vec<RegionId>> {
73 let mut distribution: BTreeMap<u64, Vec<RegionId>> = BTreeMap::new();
74 for partition in partitions {
75 distribution
76 .entry(partition.datanode_id)
77 .or_default()
78 .push(RegionId::from_u64(partition.region_id));
79 }
80
81 distribution
82}
83
84pub fn pretty_print_region_distribution(distribution: &BTreeMap<u64, Vec<RegionId>>) {
88 for (node, regions) in distribution {
89 info!("Datanode: {node}, num of regions: {}", regions.len());
90 }
91}
92
93pub async fn wait_for_all_regions_evicted(
98 greptime: MySqlPool,
99 selected_datanode: u64,
100 timeout: Duration,
101) {
102 wait_condition_fn(
103 timeout,
104 || {
105 let greptime = greptime.clone();
106 Box::pin(async move {
107 let partition = count_partitions(&greptime, selected_datanode)
108 .await
109 .unwrap();
110 info!(
111 "Datanode: {selected_datanode}, num of partitions: {}",
112 partition.count
113 );
114 partition.count
115 })
116 },
117 |count| count == 0,
118 Duration::from_secs(5),
119 )
120 .await;
121}