tests_fuzz/utils/
partition.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::time::Duration;
17
18use common_telemetry::info;
19use snafu::ResultExt;
20use sqlx::MySqlPool;
21use store_api::storage::RegionId;
22
23use super::wait::wait_condition_fn;
24use crate::error::{self, Result};
25use crate::ir::Ident;
26
27#[derive(Debug, sqlx::FromRow)]
28pub struct Partition {
29    pub datanode_id: u64,
30    pub region_id: u64,
31}
32
33#[derive(Debug, sqlx::FromRow)]
34pub struct PartitionCount {
35    pub count: i64,
36}
37
38pub async fn count_partitions(db: &MySqlPool, datanode_id: u64) -> Result<PartitionCount> {
39    let sql = "select count(1) as count from information_schema.region_peers where peer_id == ?";
40    sqlx::query_as::<_, PartitionCount>(sql)
41        .bind(datanode_id)
42        .fetch_one(db)
43        .await
44        .context(error::ExecuteQuerySnafu { sql })
45}
46
47/// Returns the [Partition] of the specific `region_id`
48pub async fn fetch_partition(db: &MySqlPool, region_id: u64) -> Result<Partition> {
49    let sql = "select region_id, peer_id as datanode_id from information_schema.region_peers where region_id = ?;";
50    sqlx::query_as::<_, Partition>(sql)
51        .bind(region_id)
52        .fetch_one(db)
53        .await
54        .context(error::ExecuteQuerySnafu { sql })
55}
56
57/// Returns all [Partition] of the specific `table`
58pub async fn fetch_partitions(db: &MySqlPool, table_name: Ident) -> Result<Vec<Partition>> {
59    let sql = "select b.peer_id as datanode_id, a.greptime_partition_id as region_id
60from information_schema.partitions a left join information_schema.region_peers b
61on a.greptime_partition_id = b.region_id where a.table_name= ? order by datanode_id asc;";
62    sqlx::query_as::<_, Partition>(sql)
63        .bind(table_name.value.to_string())
64        .fetch_all(db)
65        .await
66        .context(error::ExecuteQuerySnafu { sql })
67}
68
69/// Creates a distribution map of regions to datanodes based on the provided partitions.
70///
71/// This function iterates over the provided partitions and groups the regions by their associated datanode IDs.
72pub fn region_distribution(partitions: Vec<Partition>) -> BTreeMap<u64, Vec<RegionId>> {
73    let mut distribution: BTreeMap<u64, Vec<RegionId>> = BTreeMap::new();
74    for partition in partitions {
75        distribution
76            .entry(partition.datanode_id)
77            .or_default()
78            .push(RegionId::from_u64(partition.region_id));
79    }
80
81    distribution
82}
83
84/// Pretty prints the region distribution for each datanode.
85///
86/// This function logs the number of regions for each datanode in the distribution map.
87pub fn pretty_print_region_distribution(distribution: &BTreeMap<u64, Vec<RegionId>>) {
88    for (node, regions) in distribution {
89        info!("Datanode: {node}, num of regions: {}", regions.len());
90    }
91}
92
93/// Waits until all regions are evicted from the specified datanode.
94///
95/// This function repeatedly checks the number of partitions on the specified datanode and waits until
96/// the count reaches zero or the timeout period elapses. It logs the number of partitions on each check.
97pub async fn wait_for_all_regions_evicted(
98    greptime: MySqlPool,
99    selected_datanode: u64,
100    timeout: Duration,
101) {
102    wait_condition_fn(
103        timeout,
104        || {
105            let greptime = greptime.clone();
106            Box::pin(async move {
107                let partition = count_partitions(&greptime, selected_datanode)
108                    .await
109                    .unwrap();
110                info!(
111                    "Datanode: {selected_datanode}, num of partitions: {}",
112                    partition.count
113                );
114                partition.count
115            })
116        },
117        |count| count == 0,
118        Duration::from_secs(5),
119    )
120    .await;
121}