1use std::collections::HashMap;
16use std::sync::Arc;
17
18use api::v1::Rows;
19use common_meta::cache::{TableRoute, TableRouteCacheRef};
20use common_meta::key::table_route::{PhysicalTableRouteValue, TableRouteManager};
21use common_meta::kv_backend::KvBackendRef;
22use common_meta::peer::Peer;
23use common_meta::rpc::router::{self, RegionRoute};
24use snafu::{ensure, OptionExt, ResultExt};
25use store_api::storage::{RegionId, RegionNumber};
26use table::metadata::{TableId, TableInfo};
27
28use crate::error::{FindLeaderSnafu, Result};
29use crate::expr::PartitionExpr;
30use crate::multi_dim::MultiDimPartitionRule;
31use crate::splitter::RowSplitter;
32use crate::{error, PartitionRuleRef};
33
34#[async_trait::async_trait]
35pub trait TableRouteCacheInvalidator: Send + Sync {
36 async fn invalidate_table_route(&self, table: TableId);
37}
38
39pub type TableRouteCacheInvalidatorRef = Arc<dyn TableRouteCacheInvalidator>;
40
41pub type PartitionRuleManagerRef = Arc<PartitionRuleManager>;
42
43pub struct PartitionRuleManager {
48 table_route_manager: TableRouteManager,
49 table_route_cache: TableRouteCacheRef,
50}
51
52#[derive(Debug)]
53pub struct PartitionInfo {
54 pub id: RegionId,
55 pub partition_expr: Option<PartitionExpr>,
56}
57
58impl PartitionRuleManager {
59 pub fn new(kv_backend: KvBackendRef, table_route_cache: TableRouteCacheRef) -> Self {
60 Self {
61 table_route_manager: TableRouteManager::new(kv_backend),
62 table_route_cache,
63 }
64 }
65
66 pub async fn find_physical_table_route(
67 &self,
68 table_id: TableId,
69 ) -> Result<Arc<PhysicalTableRouteValue>> {
70 match self
71 .table_route_cache
72 .get(table_id)
73 .await
74 .context(error::TableRouteManagerSnafu)?
75 .context(error::TableRouteNotFoundSnafu { table_id })?
76 .as_ref()
77 {
78 TableRoute::Physical(physical_table_route) => Ok(physical_table_route.clone()),
79 TableRoute::Logical(logical_table_route) => {
80 let physical_table_id = logical_table_route.physical_table_id();
81 let physical_table_route = self
82 .table_route_cache
83 .get(physical_table_id)
84 .await
85 .context(error::TableRouteManagerSnafu)?
86 .context(error::TableRouteNotFoundSnafu { table_id })?;
87
88 let physical_table_route = physical_table_route
89 .as_physical_table_route_ref()
90 .context(error::UnexpectedSnafu{
91 err_msg: format!(
92 "Expected the physical table route, but got logical table route, table: {table_id}"
93 ),
94 })?;
95
96 Ok(physical_table_route.clone())
97 }
98 }
99 }
100
101 pub async fn batch_find_region_routes(
102 &self,
103 table_ids: &[TableId],
104 ) -> Result<HashMap<TableId, Vec<RegionRoute>>> {
105 let table_routes = self
106 .table_route_manager
107 .batch_get_physical_table_routes(table_ids)
108 .await
109 .context(error::TableRouteManagerSnafu)?;
110
111 let mut table_region_routes = HashMap::with_capacity(table_routes.len());
112
113 for (table_id, table_route) in table_routes {
114 let region_routes = table_route.region_routes;
115 table_region_routes.insert(table_id, region_routes);
116 }
117
118 Ok(table_region_routes)
119 }
120
121 pub async fn find_table_partitions(&self, table_id: TableId) -> Result<Vec<PartitionInfo>> {
122 let region_routes = &self
123 .find_physical_table_route(table_id)
124 .await?
125 .region_routes;
126 ensure!(
127 !region_routes.is_empty(),
128 error::FindTableRoutesSnafu { table_id }
129 );
130
131 create_partitions_from_region_routes(table_id, region_routes)
132 }
133
134 pub async fn batch_find_table_partitions(
135 &self,
136 table_ids: &[TableId],
137 ) -> Result<HashMap<TableId, Vec<PartitionInfo>>> {
138 let batch_region_routes = self.batch_find_region_routes(table_ids).await?;
139
140 let mut results = HashMap::with_capacity(table_ids.len());
141
142 for (table_id, region_routes) in batch_region_routes {
143 results.insert(
144 table_id,
145 create_partitions_from_region_routes(table_id, ®ion_routes)?,
146 );
147 }
148
149 Ok(results)
150 }
151
152 pub async fn find_table_partition_rule(
153 &self,
154 table_info: &TableInfo,
155 ) -> Result<PartitionRuleRef> {
156 let partitions = self.find_table_partitions(table_info.table_id()).await?;
157 let regions = partitions
158 .iter()
159 .map(|x| x.id.region_number())
160 .collect::<Vec<RegionNumber>>();
161
162 let exprs = partitions
163 .iter()
164 .filter_map(|x| x.partition_expr.as_ref())
165 .cloned()
166 .collect::<Vec<_>>();
167
168 let partition_columns = table_info
169 .meta
170 .partition_column_names()
171 .cloned()
172 .collect::<Vec<_>>();
173 let rule = MultiDimPartitionRule::try_new(partition_columns, regions, exprs, false)?;
174 Ok(Arc::new(rule) as _)
175 }
176
177 pub async fn find_region_leader(&self, region_id: RegionId) -> Result<Peer> {
179 let region_routes = &self
180 .find_physical_table_route(region_id.table_id())
181 .await?
182 .region_routes;
183
184 router::find_region_leader(region_routes, region_id.region_number()).context(
185 FindLeaderSnafu {
186 region_id,
187 table_id: region_id.table_id(),
188 },
189 )
190 }
191
192 pub async fn split_rows(
193 &self,
194 table_info: &TableInfo,
195 rows: Rows,
196 ) -> Result<HashMap<RegionNumber, Rows>> {
197 let partition_rule = self.find_table_partition_rule(table_info).await?;
198 RowSplitter::new(partition_rule).split(rows)
199 }
200}
201
202fn create_partitions_from_region_routes(
203 table_id: TableId,
204 region_routes: &[RegionRoute],
205) -> Result<Vec<PartitionInfo>> {
206 let mut partitions = Vec::with_capacity(region_routes.len());
207 for r in region_routes {
208 let partition_expr = PartitionExpr::from_json_str(&r.region.partition_expr())?;
209
210 let id = RegionId::new(table_id, r.region.id.region_number());
214 partitions.push(PartitionInfo { id, partition_expr });
215 }
216
217 Ok(partitions)
218}