1use std::collections::HashMap;
16use std::sync::Arc;
17
18use api::v1::Rows;
19use common_meta::cache::{TableRoute, TableRouteCacheRef};
20use common_meta::key::table_route::{PhysicalTableRouteValue, TableRouteManager};
21use common_meta::kv_backend::KvBackendRef;
22use common_meta::peer::Peer;
23use common_meta::rpc::router::{self, RegionRoute};
24use snafu::{OptionExt, ResultExt};
25use store_api::storage::{RegionId, RegionNumber};
26use table::metadata::{TableId, TableInfo};
27
28use crate::cache::{CachedPartitionInfo, PartitionInfoCacheRef, PhysicalPartitionInfo};
29use crate::error::{FindLeaderSnafu, Result};
30use crate::expr::PartitionExpr;
31use crate::multi_dim::MultiDimPartitionRule;
32use crate::splitter::RowSplitter;
33use crate::{PartitionRuleRef, error};
34
35pub type PartitionRuleManagerRef = Arc<PartitionRuleManager>;
36
37pub struct PartitionRuleManager {
42 table_route_manager: TableRouteManager,
43 table_route_cache: TableRouteCacheRef,
44 partition_info_cache: PartitionInfoCacheRef,
45}
46
47#[derive(Debug, Clone)]
48pub struct PartitionInfo {
49 pub id: RegionId,
50 pub partition_expr: Option<PartitionExpr>,
51}
52
53#[derive(Debug, Clone)]
54pub struct PartitionInfoWithVersion {
55 pub id: RegionId,
56 pub partition_expr: Option<PartitionExpr>,
57 pub partition_expr_version: Option<u64>,
58}
59
60impl PartitionRuleManager {
61 pub fn new(
62 kv_backend: KvBackendRef,
63 table_route_cache: TableRouteCacheRef,
64 partition_info_cache: PartitionInfoCacheRef,
65 ) -> Self {
66 Self {
67 table_route_manager: TableRouteManager::new(kv_backend),
68 table_route_cache,
69 partition_info_cache,
70 }
71 }
72
73 pub async fn find_physical_table_route(
74 &self,
75 table_id: TableId,
76 ) -> Result<Arc<PhysicalTableRouteValue>> {
77 match self
78 .table_route_cache
79 .get(table_id)
80 .await
81 .context(error::TableRouteManagerSnafu)?
82 .context(error::TableRouteNotFoundSnafu { table_id })?
83 .as_ref()
84 {
85 TableRoute::Physical(physical_table_route) => Ok(physical_table_route.clone()),
86 TableRoute::Logical(logical_table_route) => {
87 let physical_table_id = logical_table_route.physical_table_id();
88 let physical_table_route = self
89 .table_route_cache
90 .get(physical_table_id)
91 .await
92 .context(error::TableRouteManagerSnafu)?
93 .context(error::TableRouteNotFoundSnafu { table_id })?;
94
95 let physical_table_route = physical_table_route
96 .as_physical_table_route_ref()
97 .context(error::UnexpectedSnafu{
98 err_msg: format!(
99 "Expected the physical table route, but got logical table route, table: {table_id}"
100 ),
101 })?;
102
103 Ok(physical_table_route.clone())
104 }
105 }
106 }
107
108 pub async fn batch_find_region_routes(
109 &self,
110 table_ids: &[TableId],
111 ) -> Result<HashMap<TableId, Vec<RegionRoute>>> {
112 let table_routes = self
113 .table_route_manager
114 .batch_get_physical_table_routes(table_ids)
115 .await
116 .context(error::TableRouteManagerSnafu)?;
117
118 let mut table_region_routes = HashMap::with_capacity(table_routes.len());
119
120 for (table_id, table_route) in table_routes {
121 let region_routes = table_route.region_routes;
122 table_region_routes.insert(table_id, region_routes);
123 }
124
125 Ok(table_region_routes)
126 }
127
128 pub async fn find_physical_partition_info(
130 &self,
131 table_id: TableId,
132 ) -> Result<Arc<PhysicalPartitionInfo>> {
133 let cached = self
134 .partition_info_cache
135 .get(table_id)
136 .await
137 .context(error::GetPartitionInfoSnafu)?
138 .context(error::TableRouteNotFoundSnafu { table_id })?;
139 match cached {
140 CachedPartitionInfo::Physical(info) => Ok(info),
141 CachedPartitionInfo::Logical(physical_table_id) => {
142 let cached = self
143 .partition_info_cache
144 .get(physical_table_id)
145 .await
146 .context(error::GetPartitionInfoSnafu)?
147 .context(error::TableRouteNotFoundSnafu {
148 table_id: physical_table_id,
149 })?;
150 let info = cached.into_physical().context(error::UnexpectedSnafu{
151 err_msg: format!(
152 "Expected the physical partition info, but got logical partable route, table: {physical_table_id}"
153 )
154 })?;
155
156 Ok(info)
157 }
158 }
159 }
160
161 pub async fn batch_find_table_partitions(
162 &self,
163 table_ids: &[TableId],
164 ) -> Result<HashMap<TableId, Vec<PartitionInfo>>> {
165 let batch_region_routes = self.batch_find_region_routes(table_ids).await?;
166
167 let mut results = HashMap::with_capacity(table_ids.len());
168
169 for (table_id, region_routes) in batch_region_routes {
170 results.insert(
171 table_id,
172 create_partitions_from_region_routes(table_id, ®ion_routes)?,
173 );
174 }
175
176 Ok(results)
177 }
178
179 pub async fn find_table_partition_rule(
180 &self,
181 table_info: &TableInfo,
182 ) -> Result<(PartitionRuleRef, HashMap<RegionNumber, Option<u64>>)> {
183 let partition_columns = table_info
184 .meta
185 .partition_column_names()
186 .cloned()
187 .collect::<Vec<_>>();
188
189 let partition_info = self
190 .find_physical_partition_info(table_info.table_id())
191 .await?;
192 let partition_versions = partition_info
193 .partitions
194 .iter()
195 .map(|r| (r.id.region_number(), r.partition_expr_version))
196 .collect::<HashMap<RegionNumber, Option<u64>>>();
197 let regions = partition_info
198 .partitions
199 .iter()
200 .map(|x| x.id.region_number())
201 .collect::<Vec<RegionNumber>>();
202 let exprs = partition_info
203 .partitions
204 .iter()
205 .filter_map(|x| x.partition_expr.as_ref())
206 .cloned()
207 .collect::<Vec<_>>();
208 let partition_rule = Arc::new(MultiDimPartitionRule::try_new(
209 partition_columns,
210 regions,
211 exprs,
212 false,
213 )?) as _;
214 Ok((partition_rule, partition_versions))
215 }
216
217 pub async fn find_region_leader(&self, region_id: RegionId) -> Result<Peer> {
219 let region_routes = &self
220 .find_physical_table_route(region_id.table_id())
221 .await?
222 .region_routes;
223
224 router::find_region_leader(region_routes, region_id.region_number()).context(
225 FindLeaderSnafu {
226 region_id,
227 table_id: region_id.table_id(),
228 },
229 )
230 }
231
232 pub async fn split_rows(
233 &self,
234 table_info: &TableInfo,
235 rows: Rows,
236 ) -> Result<HashMap<RegionNumber, (Rows, Option<u64>)>> {
237 let (partition_rule, partition_versions) =
238 self.find_table_partition_rule(table_info).await?;
239
240 let result = RowSplitter::new(partition_rule)
241 .split(rows)?
242 .into_iter()
243 .map(|(region_number, rows)| {
244 (
245 region_number,
246 (
247 rows,
248 partition_versions
249 .get(®ion_number)
250 .copied()
251 .unwrap_or_default(),
252 ),
253 )
254 })
255 .collect::<HashMap<_, _>>();
256
257 Ok(result)
258 }
259}
260
261pub fn create_partitions_from_region_routes(
263 table_id: TableId,
264 region_routes: &[RegionRoute],
265) -> Result<Vec<PartitionInfo>> {
266 let mut partitions = Vec::with_capacity(region_routes.len());
267 for r in region_routes {
268 let partition_expr = PartitionExpr::from_json_str(&r.region.partition_expr())?;
269
270 let id = RegionId::new(table_id, r.region.id.region_number());
274 partitions.push(PartitionInfo { id, partition_expr });
275 }
276
277 Ok(partitions)
278}