1use std::collections::HashMap;
16use std::sync::Arc;
17
18use api::v1::Rows;
19use common_meta::cache::{TableRoute, TableRouteCacheRef};
20use common_meta::key::table_route::{PhysicalTableRouteValue, TableRouteManager};
21use common_meta::kv_backend::KvBackendRef;
22use common_meta::peer::Peer;
23use common_meta::rpc::router::{self, RegionRoute};
24use snafu::{ensure, OptionExt, ResultExt};
25use store_api::storage::{RegionId, RegionNumber};
26use table::metadata::TableId;
27
28use crate::error::{FindLeaderSnafu, Result};
29use crate::multi_dim::MultiDimPartitionRule;
30use crate::partition::{PartitionBound, PartitionDef};
31use crate::splitter::RowSplitter;
32use crate::{error, PartitionRuleRef};
33
34#[async_trait::async_trait]
35pub trait TableRouteCacheInvalidator: Send + Sync {
36 async fn invalidate_table_route(&self, table: TableId);
37}
38
39pub type TableRouteCacheInvalidatorRef = Arc<dyn TableRouteCacheInvalidator>;
40
41pub type PartitionRuleManagerRef = Arc<PartitionRuleManager>;
42
43pub struct PartitionRuleManager {
48 table_route_manager: TableRouteManager,
49 table_route_cache: TableRouteCacheRef,
50}
51
52#[derive(Debug)]
53pub struct PartitionInfo {
54 pub id: RegionId,
55 pub partition: PartitionDef,
56}
57
58impl PartitionRuleManager {
59 pub fn new(kv_backend: KvBackendRef, table_route_cache: TableRouteCacheRef) -> Self {
60 Self {
61 table_route_manager: TableRouteManager::new(kv_backend),
62 table_route_cache,
63 }
64 }
65
66 pub async fn find_physical_table_route(
67 &self,
68 table_id: TableId,
69 ) -> Result<Arc<PhysicalTableRouteValue>> {
70 match self
71 .table_route_cache
72 .get(table_id)
73 .await
74 .context(error::TableRouteManagerSnafu)?
75 .context(error::TableRouteNotFoundSnafu { table_id })?
76 .as_ref()
77 {
78 TableRoute::Physical(physical_table_route) => Ok(physical_table_route.clone()),
79 TableRoute::Logical(logical_table_route) => {
80 let physical_table_id = logical_table_route.physical_table_id();
81 let physical_table_route = self
82 .table_route_cache
83 .get(physical_table_id)
84 .await
85 .context(error::TableRouteManagerSnafu)?
86 .context(error::TableRouteNotFoundSnafu { table_id })?;
87
88 let physical_table_route = physical_table_route
89 .as_physical_table_route_ref()
90 .context(error::UnexpectedSnafu{
91 err_msg: format!(
92 "Expected the physical table route, but got logical table route, table: {table_id}"
93 ),
94 })?;
95
96 Ok(physical_table_route.clone())
97 }
98 }
99 }
100
101 pub async fn batch_find_region_routes(
102 &self,
103 table_ids: &[TableId],
104 ) -> Result<HashMap<TableId, Vec<RegionRoute>>> {
105 let table_routes = self
106 .table_route_manager
107 .batch_get_physical_table_routes(table_ids)
108 .await
109 .context(error::TableRouteManagerSnafu)?;
110
111 let mut table_region_routes = HashMap::with_capacity(table_routes.len());
112
113 for (table_id, table_route) in table_routes {
114 let region_routes = table_route.region_routes;
115 table_region_routes.insert(table_id, region_routes);
116 }
117
118 Ok(table_region_routes)
119 }
120
121 pub async fn find_table_partitions(&self, table_id: TableId) -> Result<Vec<PartitionInfo>> {
122 let region_routes = &self
123 .find_physical_table_route(table_id)
124 .await?
125 .region_routes;
126 ensure!(
127 !region_routes.is_empty(),
128 error::FindTableRoutesSnafu { table_id }
129 );
130
131 create_partitions_from_region_routes(table_id, region_routes)
132 }
133
134 pub async fn batch_find_table_partitions(
135 &self,
136 table_ids: &[TableId],
137 ) -> Result<HashMap<TableId, Vec<PartitionInfo>>> {
138 let batch_region_routes = self.batch_find_region_routes(table_ids).await?;
139
140 let mut results = HashMap::with_capacity(table_ids.len());
141
142 for (table_id, region_routes) in batch_region_routes {
143 results.insert(
144 table_id,
145 create_partitions_from_region_routes(table_id, ®ion_routes)?,
146 );
147 }
148
149 Ok(results)
150 }
151
152 pub async fn find_table_partition_rule(&self, table_id: TableId) -> Result<PartitionRuleRef> {
153 let partitions = self.find_table_partitions(table_id).await?;
154
155 let partition_columns = partitions[0].partition.partition_columns();
156
157 let regions = partitions
158 .iter()
159 .map(|x| x.id.region_number())
160 .collect::<Vec<RegionNumber>>();
161
162 let exprs = partitions
163 .iter()
164 .filter_map(|x| match &x.partition.partition_bounds()[0] {
165 PartitionBound::Expr(e) => Some(e.clone()),
166 _ => None,
167 })
168 .collect::<Vec<_>>();
169
170 let rule = MultiDimPartitionRule::try_new(partition_columns.clone(), regions, exprs)?;
171 Ok(Arc::new(rule) as _)
172 }
173
174 pub async fn find_region_leader(&self, region_id: RegionId) -> Result<Peer> {
176 let region_routes = &self
177 .find_physical_table_route(region_id.table_id())
178 .await?
179 .region_routes;
180
181 router::find_region_leader(region_routes, region_id.region_number()).context(
182 FindLeaderSnafu {
183 region_id,
184 table_id: region_id.table_id(),
185 },
186 )
187 }
188
189 pub async fn split_rows(
190 &self,
191 table_id: TableId,
192 rows: Rows,
193 ) -> Result<HashMap<RegionNumber, Rows>> {
194 let partition_rule = self.find_table_partition_rule(table_id).await?;
195 RowSplitter::new(partition_rule).split(rows)
196 }
197}
198
199fn create_partitions_from_region_routes(
200 table_id: TableId,
201 region_routes: &[RegionRoute],
202) -> Result<Vec<PartitionInfo>> {
203 let mut partitions = Vec::with_capacity(region_routes.len());
204 for r in region_routes {
205 let partition = r
206 .region
207 .partition
208 .as_ref()
209 .context(error::FindRegionRoutesSnafu {
210 region_id: r.region.id,
211 table_id,
212 })?;
213 let partition_def = PartitionDef::try_from(partition)?;
214
215 let id = RegionId::new(table_id, r.region.id.region_number());
219 partitions.push(PartitionInfo {
220 id,
221 partition: partition_def,
222 });
223 }
224 partitions.sort_by(|a, b| {
225 a.partition
226 .partition_bounds()
227 .cmp(b.partition.partition_bounds())
228 });
229
230 ensure!(
231 partitions
232 .windows(2)
233 .all(|w| w[0].partition.partition_columns() == w[1].partition.partition_columns()),
234 error::InvalidTableRouteDataSnafu {
235 table_id,
236 err_msg: "partition columns of all regions are not the same"
237 }
238 );
239
240 Ok(partitions)
241}