partition/
manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use api::v1::Rows;
19use common_meta::cache::{TableRoute, TableRouteCacheRef};
20use common_meta::key::table_route::{PhysicalTableRouteValue, TableRouteManager};
21use common_meta::kv_backend::KvBackendRef;
22use common_meta::peer::Peer;
23use common_meta::rpc::router::{self, RegionRoute};
24use snafu::{ensure, OptionExt, ResultExt};
25use store_api::storage::{RegionId, RegionNumber};
26use table::metadata::TableId;
27
28use crate::error::{FindLeaderSnafu, Result};
29use crate::multi_dim::MultiDimPartitionRule;
30use crate::partition::{PartitionBound, PartitionDef};
31use crate::splitter::RowSplitter;
32use crate::{error, PartitionRuleRef};
33
34#[async_trait::async_trait]
35pub trait TableRouteCacheInvalidator: Send + Sync {
36    async fn invalidate_table_route(&self, table: TableId);
37}
38
39pub type TableRouteCacheInvalidatorRef = Arc<dyn TableRouteCacheInvalidator>;
40
41pub type PartitionRuleManagerRef = Arc<PartitionRuleManager>;
42
43/// PartitionRuleManager manages the table routes and partition rules.
44/// It provides methods to find regions by:
45/// - values (in case of insertion)
46/// - filters (in case of select, deletion and update)
47pub struct PartitionRuleManager {
48    table_route_manager: TableRouteManager,
49    table_route_cache: TableRouteCacheRef,
50}
51
52#[derive(Debug)]
53pub struct PartitionInfo {
54    pub id: RegionId,
55    pub partition: PartitionDef,
56}
57
58impl PartitionRuleManager {
59    pub fn new(kv_backend: KvBackendRef, table_route_cache: TableRouteCacheRef) -> Self {
60        Self {
61            table_route_manager: TableRouteManager::new(kv_backend),
62            table_route_cache,
63        }
64    }
65
66    pub async fn find_physical_table_route(
67        &self,
68        table_id: TableId,
69    ) -> Result<Arc<PhysicalTableRouteValue>> {
70        match self
71            .table_route_cache
72            .get(table_id)
73            .await
74            .context(error::TableRouteManagerSnafu)?
75            .context(error::TableRouteNotFoundSnafu { table_id })?
76            .as_ref()
77        {
78            TableRoute::Physical(physical_table_route) => Ok(physical_table_route.clone()),
79            TableRoute::Logical(logical_table_route) => {
80                let physical_table_id = logical_table_route.physical_table_id();
81                let physical_table_route = self
82                    .table_route_cache
83                    .get(physical_table_id)
84                    .await
85                    .context(error::TableRouteManagerSnafu)?
86                    .context(error::TableRouteNotFoundSnafu { table_id })?;
87
88                let physical_table_route = physical_table_route
89                    .as_physical_table_route_ref()
90                    .context(error::UnexpectedSnafu{
91                        err_msg: format!(
92                            "Expected the physical table route, but got logical table route, table: {table_id}"
93                        ),
94                    })?;
95
96                Ok(physical_table_route.clone())
97            }
98        }
99    }
100
101    pub async fn batch_find_region_routes(
102        &self,
103        table_ids: &[TableId],
104    ) -> Result<HashMap<TableId, Vec<RegionRoute>>> {
105        let table_routes = self
106            .table_route_manager
107            .batch_get_physical_table_routes(table_ids)
108            .await
109            .context(error::TableRouteManagerSnafu)?;
110
111        let mut table_region_routes = HashMap::with_capacity(table_routes.len());
112
113        for (table_id, table_route) in table_routes {
114            let region_routes = table_route.region_routes;
115            table_region_routes.insert(table_id, region_routes);
116        }
117
118        Ok(table_region_routes)
119    }
120
121    pub async fn find_table_partitions(&self, table_id: TableId) -> Result<Vec<PartitionInfo>> {
122        let region_routes = &self
123            .find_physical_table_route(table_id)
124            .await?
125            .region_routes;
126        ensure!(
127            !region_routes.is_empty(),
128            error::FindTableRoutesSnafu { table_id }
129        );
130
131        create_partitions_from_region_routes(table_id, region_routes)
132    }
133
134    pub async fn batch_find_table_partitions(
135        &self,
136        table_ids: &[TableId],
137    ) -> Result<HashMap<TableId, Vec<PartitionInfo>>> {
138        let batch_region_routes = self.batch_find_region_routes(table_ids).await?;
139
140        let mut results = HashMap::with_capacity(table_ids.len());
141
142        for (table_id, region_routes) in batch_region_routes {
143            results.insert(
144                table_id,
145                create_partitions_from_region_routes(table_id, &region_routes)?,
146            );
147        }
148
149        Ok(results)
150    }
151
152    pub async fn find_table_partition_rule(&self, table_id: TableId) -> Result<PartitionRuleRef> {
153        let partitions = self.find_table_partitions(table_id).await?;
154
155        let partition_columns = partitions[0].partition.partition_columns();
156
157        let regions = partitions
158            .iter()
159            .map(|x| x.id.region_number())
160            .collect::<Vec<RegionNumber>>();
161
162        let exprs = partitions
163            .iter()
164            .filter_map(|x| match &x.partition.partition_bounds()[0] {
165                PartitionBound::Expr(e) => Some(e.clone()),
166                _ => None,
167            })
168            .collect::<Vec<_>>();
169
170        let rule =
171            MultiDimPartitionRule::try_new(partition_columns.clone(), regions, exprs, false)?;
172        Ok(Arc::new(rule) as _)
173    }
174
175    /// Find the leader of the region.
176    pub async fn find_region_leader(&self, region_id: RegionId) -> Result<Peer> {
177        let region_routes = &self
178            .find_physical_table_route(region_id.table_id())
179            .await?
180            .region_routes;
181
182        router::find_region_leader(region_routes, region_id.region_number()).context(
183            FindLeaderSnafu {
184                region_id,
185                table_id: region_id.table_id(),
186            },
187        )
188    }
189
190    pub async fn split_rows(
191        &self,
192        table_id: TableId,
193        rows: Rows,
194    ) -> Result<HashMap<RegionNumber, Rows>> {
195        let partition_rule = self.find_table_partition_rule(table_id).await?;
196        RowSplitter::new(partition_rule).split(rows)
197    }
198}
199
200fn create_partitions_from_region_routes(
201    table_id: TableId,
202    region_routes: &[RegionRoute],
203) -> Result<Vec<PartitionInfo>> {
204    let mut partitions = Vec::with_capacity(region_routes.len());
205    for r in region_routes {
206        let partition = r
207            .region
208            .partition
209            .as_ref()
210            .context(error::FindRegionRoutesSnafu {
211                region_id: r.region.id,
212                table_id,
213            })?;
214        let partition_def = PartitionDef::try_from(partition)?;
215
216        // The region routes belong to the physical table but are shared among all logical tables.
217        // That it to say, the region id points to the physical table, so we need to use the actual
218        // table id (which may be a logical table) to renew the region id.
219        let id = RegionId::new(table_id, r.region.id.region_number());
220        partitions.push(PartitionInfo {
221            id,
222            partition: partition_def,
223        });
224    }
225    partitions.sort_by(|a, b| {
226        a.partition
227            .partition_bounds()
228            .cmp(b.partition.partition_bounds())
229    });
230
231    ensure!(
232        partitions
233            .windows(2)
234            .all(|w| w[0].partition.partition_columns() == w[1].partition.partition_columns()),
235        error::InvalidTableRouteDataSnafu {
236            table_id,
237            err_msg: "partition columns of all regions are not the same"
238        }
239    );
240
241    Ok(partitions)
242}