1use std::collections::HashMap;
16use std::sync::Arc;
17
18use api::v1::Rows;
19use common_meta::cache::{TableRoute, TableRouteCacheRef};
20use common_meta::key::table_route::{PhysicalTableRouteValue, TableRouteManager};
21use common_meta::kv_backend::KvBackendRef;
22use common_meta::peer::Peer;
23use common_meta::rpc::router::{self, RegionRoute};
24use snafu::{ensure, OptionExt, ResultExt};
25use store_api::storage::{RegionId, RegionNumber};
26use table::metadata::TableId;
27
28use crate::error::{FindLeaderSnafu, Result};
29use crate::multi_dim::MultiDimPartitionRule;
30use crate::partition::{PartitionBound, PartitionDef};
31use crate::splitter::RowSplitter;
32use crate::{error, PartitionRuleRef};
33
34#[async_trait::async_trait]
35pub trait TableRouteCacheInvalidator: Send + Sync {
36 async fn invalidate_table_route(&self, table: TableId);
37}
38
39pub type TableRouteCacheInvalidatorRef = Arc<dyn TableRouteCacheInvalidator>;
40
41pub type PartitionRuleManagerRef = Arc<PartitionRuleManager>;
42
43pub struct PartitionRuleManager {
48 table_route_manager: TableRouteManager,
49 table_route_cache: TableRouteCacheRef,
50}
51
52#[derive(Debug)]
53pub struct PartitionInfo {
54 pub id: RegionId,
55 pub partition: PartitionDef,
56}
57
58impl PartitionRuleManager {
59 pub fn new(kv_backend: KvBackendRef, table_route_cache: TableRouteCacheRef) -> Self {
60 Self {
61 table_route_manager: TableRouteManager::new(kv_backend),
62 table_route_cache,
63 }
64 }
65
66 pub async fn find_physical_table_route(
67 &self,
68 table_id: TableId,
69 ) -> Result<Arc<PhysicalTableRouteValue>> {
70 match self
71 .table_route_cache
72 .get(table_id)
73 .await
74 .context(error::TableRouteManagerSnafu)?
75 .context(error::TableRouteNotFoundSnafu { table_id })?
76 .as_ref()
77 {
78 TableRoute::Physical(physical_table_route) => Ok(physical_table_route.clone()),
79 TableRoute::Logical(logical_table_route) => {
80 let physical_table_id = logical_table_route.physical_table_id();
81 let physical_table_route = self
82 .table_route_cache
83 .get(physical_table_id)
84 .await
85 .context(error::TableRouteManagerSnafu)?
86 .context(error::TableRouteNotFoundSnafu { table_id })?;
87
88 let physical_table_route = physical_table_route
89 .as_physical_table_route_ref()
90 .context(error::UnexpectedSnafu{
91 err_msg: format!(
92 "Expected the physical table route, but got logical table route, table: {table_id}"
93 ),
94 })?;
95
96 Ok(physical_table_route.clone())
97 }
98 }
99 }
100
101 pub async fn batch_find_region_routes(
102 &self,
103 table_ids: &[TableId],
104 ) -> Result<HashMap<TableId, Vec<RegionRoute>>> {
105 let table_routes = self
106 .table_route_manager
107 .batch_get_physical_table_routes(table_ids)
108 .await
109 .context(error::TableRouteManagerSnafu)?;
110
111 let mut table_region_routes = HashMap::with_capacity(table_routes.len());
112
113 for (table_id, table_route) in table_routes {
114 let region_routes = table_route.region_routes;
115 table_region_routes.insert(table_id, region_routes);
116 }
117
118 Ok(table_region_routes)
119 }
120
121 pub async fn find_table_partitions(&self, table_id: TableId) -> Result<Vec<PartitionInfo>> {
122 let region_routes = &self
123 .find_physical_table_route(table_id)
124 .await?
125 .region_routes;
126 ensure!(
127 !region_routes.is_empty(),
128 error::FindTableRoutesSnafu { table_id }
129 );
130
131 create_partitions_from_region_routes(table_id, region_routes)
132 }
133
134 pub async fn batch_find_table_partitions(
135 &self,
136 table_ids: &[TableId],
137 ) -> Result<HashMap<TableId, Vec<PartitionInfo>>> {
138 let batch_region_routes = self.batch_find_region_routes(table_ids).await?;
139
140 let mut results = HashMap::with_capacity(table_ids.len());
141
142 for (table_id, region_routes) in batch_region_routes {
143 results.insert(
144 table_id,
145 create_partitions_from_region_routes(table_id, ®ion_routes)?,
146 );
147 }
148
149 Ok(results)
150 }
151
152 pub async fn find_table_partition_rule(&self, table_id: TableId) -> Result<PartitionRuleRef> {
153 let partitions = self.find_table_partitions(table_id).await?;
154
155 let partition_columns = partitions[0].partition.partition_columns();
156
157 let regions = partitions
158 .iter()
159 .map(|x| x.id.region_number())
160 .collect::<Vec<RegionNumber>>();
161
162 let exprs = partitions
163 .iter()
164 .filter_map(|x| match &x.partition.partition_bounds()[0] {
165 PartitionBound::Expr(e) => Some(e.clone()),
166 _ => None,
167 })
168 .collect::<Vec<_>>();
169
170 let rule =
171 MultiDimPartitionRule::try_new(partition_columns.clone(), regions, exprs, false)?;
172 Ok(Arc::new(rule) as _)
173 }
174
175 pub async fn find_region_leader(&self, region_id: RegionId) -> Result<Peer> {
177 let region_routes = &self
178 .find_physical_table_route(region_id.table_id())
179 .await?
180 .region_routes;
181
182 router::find_region_leader(region_routes, region_id.region_number()).context(
183 FindLeaderSnafu {
184 region_id,
185 table_id: region_id.table_id(),
186 },
187 )
188 }
189
190 pub async fn split_rows(
191 &self,
192 table_id: TableId,
193 rows: Rows,
194 ) -> Result<HashMap<RegionNumber, Rows>> {
195 let partition_rule = self.find_table_partition_rule(table_id).await?;
196 RowSplitter::new(partition_rule).split(rows)
197 }
198}
199
200fn create_partitions_from_region_routes(
201 table_id: TableId,
202 region_routes: &[RegionRoute],
203) -> Result<Vec<PartitionInfo>> {
204 let mut partitions = Vec::with_capacity(region_routes.len());
205 for r in region_routes {
206 let partition = r
207 .region
208 .partition
209 .as_ref()
210 .context(error::FindRegionRoutesSnafu {
211 region_id: r.region.id,
212 table_id,
213 })?;
214 let partition_def = PartitionDef::try_from(partition)?;
215
216 let id = RegionId::new(table_id, r.region.id.region_number());
220 partitions.push(PartitionInfo {
221 id,
222 partition: partition_def,
223 });
224 }
225 partitions.sort_by(|a, b| {
226 a.partition
227 .partition_bounds()
228 .cmp(b.partition.partition_bounds())
229 });
230
231 ensure!(
232 partitions
233 .windows(2)
234 .all(|w| w[0].partition.partition_columns() == w[1].partition.partition_columns()),
235 error::InvalidTableRouteDataSnafu {
236 table_id,
237 err_msg: "partition columns of all regions are not the same"
238 }
239 );
240
241 Ok(partitions)
242}