meta_srv/procedure/repartition/
group.rs1pub(crate) mod repartition_start;
16pub(crate) mod update_metadata;
17
18use std::any::Any;
19use std::fmt::Debug;
20
21use common_error::ext::BoxedError;
22use common_meta::DatanodeId;
23use common_meta::cache_invalidator::CacheInvalidatorRef;
24use common_meta::instruction::CacheIdent;
25use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue, RegionInfo};
26use common_meta::key::table_route::TableRouteValue;
27use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
28use common_meta::rpc::router::RegionRoute;
29use common_procedure::{Context as ProcedureContext, Status};
30use serde::{Deserialize, Serialize};
31use snafu::{OptionExt, ResultExt};
32use store_api::storage::{RegionId, TableId};
33use uuid::Uuid;
34
35use crate::error::{self, Result};
36use crate::procedure::repartition::plan::RegionDescriptor;
37
38pub type GroupId = Uuid;
39
40pub struct RepartitionGroupProcedure {}
41
42pub struct Context {
43 pub persistent_ctx: PersistentContext,
44
45 pub cache_invalidator: CacheInvalidatorRef,
46
47 pub table_metadata_manager: TableMetadataManagerRef,
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
51pub struct GroupPrepareResult {
52 pub source_routes: Vec<RegionRoute>,
53 pub target_routes: Vec<RegionRoute>,
54 pub central_region: RegionId,
55 pub central_region_datanode_id: DatanodeId,
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
59pub struct PersistentContext {
60 pub group_id: GroupId,
61 pub table_id: TableId,
63 pub sources: Vec<RegionDescriptor>,
65 pub targets: Vec<RegionDescriptor>,
67 pub group_prepare_result: Option<GroupPrepareResult>,
70}
71
72impl Context {
73 pub async fn get_table_route_value(
81 &self,
82 ) -> Result<DeserializedValueWithBytes<TableRouteValue>> {
83 let table_id = self.persistent_ctx.table_id;
84 let group_id = self.persistent_ctx.group_id;
85 let table_route_value = self
86 .table_metadata_manager
87 .table_route_manager()
88 .table_route_storage()
89 .get_with_raw_bytes(table_id)
90 .await
91 .map_err(BoxedError::new)
92 .with_context(|_| error::RetryLaterWithSourceSnafu {
93 reason: format!(
94 "Failed to get table route for table: {}, repartition group: {}",
95 table_id, group_id
96 ),
97 })?
98 .context(error::TableRouteNotFoundSnafu { table_id })?;
99
100 Ok(table_route_value)
101 }
102
103 pub async fn get_datanode_table_value(
108 &self,
109 table_id: TableId,
110 datanode_id: u64,
111 ) -> Result<DatanodeTableValue> {
112 let datanode_table_value = self
113 .table_metadata_manager
114 .datanode_table_manager()
115 .get(&DatanodeTableKey {
116 datanode_id,
117 table_id,
118 })
119 .await
120 .context(error::TableMetadataManagerSnafu)
121 .map_err(BoxedError::new)
122 .with_context(|_| error::RetryLaterWithSourceSnafu {
123 reason: format!("Failed to get DatanodeTable: {table_id}"),
124 })?
125 .context(error::DatanodeTableNotFoundSnafu {
126 table_id,
127 datanode_id,
128 })?;
129 Ok(datanode_table_value)
130 }
131
132 pub async fn invalidate_table_cache(&self) -> Result<()> {
134 let table_id = self.persistent_ctx.table_id;
135 let group_id = self.persistent_ctx.group_id;
136 let subject = format!(
137 "Invalidate table cache for repartition table, group: {}, table: {}",
138 group_id, table_id,
139 );
140 let ctx = common_meta::cache_invalidator::Context {
141 subject: Some(subject),
142 };
143 let _ = self
144 .cache_invalidator
145 .invalidate(&ctx, &[CacheIdent::TableId(table_id)])
146 .await;
147 Ok(())
148 }
149
150 pub async fn update_table_route(
159 &self,
160 current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
161 new_region_routes: Vec<RegionRoute>,
162 ) -> Result<()> {
163 let table_id = self.persistent_ctx.table_id;
164 let prepare_result = self.persistent_ctx.group_prepare_result.as_ref().unwrap();
166 let central_region_datanode_table_value = self
167 .get_datanode_table_value(table_id, prepare_result.central_region_datanode_id)
168 .await?;
169 let RegionInfo {
170 region_options,
171 region_wal_options,
172 ..
173 } = ¢ral_region_datanode_table_value.region_info;
174
175 self.table_metadata_manager
176 .update_table_route(
177 table_id,
178 central_region_datanode_table_value.region_info.clone(),
179 current_table_route_value,
180 new_region_routes,
181 region_options,
182 region_wal_options,
183 )
184 .await
185 .context(error::TableMetadataManagerSnafu)
186 }
187}
188
189pub fn region_routes(
194 table_id: TableId,
195 table_route_value: &TableRouteValue,
196) -> Result<&Vec<RegionRoute>> {
197 table_route_value
198 .region_routes()
199 .with_context(|_| error::UnexpectedLogicalRouteTableSnafu {
200 err_msg: format!(
201 "TableRoute({:?}) is a non-physical TableRouteValue.",
202 table_id
203 ),
204 })
205}
206
207#[async_trait::async_trait]
208#[typetag::serde(tag = "repartition_group_state")]
209pub(crate) trait State: Sync + Send + Debug {
210 fn name(&self) -> &'static str {
211 let type_name = std::any::type_name::<Self>();
212 type_name.split("::").last().unwrap_or(type_name)
214 }
215
216 async fn next(
218 &mut self,
219 ctx: &mut Context,
220 procedure_ctx: &ProcedureContext,
221 ) -> Result<(Box<dyn State>, Status)>;
222
223 fn as_any(&self) -> &dyn Any;
224}
225
226#[cfg(test)]
227mod tests {
228 use std::assert_matches::assert_matches;
229 use std::sync::Arc;
230
231 use common_meta::key::TableMetadataManager;
232 use common_meta::kv_backend::test_util::MockKvBackendBuilder;
233
234 use crate::error::Error;
235 use crate::procedure::repartition::test_util::{TestingEnv, new_persistent_context};
236
237 #[tokio::test]
238 async fn test_get_table_route_value_not_found_error() {
239 let env = TestingEnv::new();
240 let persistent_context = new_persistent_context(1024, vec![], vec![]);
241 let ctx = env.create_context(persistent_context);
242 let err = ctx.get_table_route_value().await.unwrap_err();
243 assert_matches!(err, Error::TableRouteNotFound { .. });
244 assert!(!err.is_retryable());
245 }
246
247 #[tokio::test]
248 async fn test_get_table_route_value_retry_error() {
249 let kv = MockKvBackendBuilder::default()
250 .range_fn(Arc::new(|_| {
251 common_meta::error::UnexpectedSnafu {
252 err_msg: "mock err",
253 }
254 .fail()
255 }))
256 .build()
257 .unwrap();
258 let mut env = TestingEnv::new();
259 env.table_metadata_manager = Arc::new(TableMetadataManager::new(Arc::new(kv)));
260 let persistent_context = new_persistent_context(1024, vec![], vec![]);
261 let ctx = env.create_context(persistent_context);
262 let err = ctx.get_table_route_value().await.unwrap_err();
263 assert!(err.is_retryable());
264 }
265
266 #[tokio::test]
267 async fn test_get_datanode_table_value_retry_error() {
268 let kv = MockKvBackendBuilder::default()
269 .range_fn(Arc::new(|_| {
270 common_meta::error::UnexpectedSnafu {
271 err_msg: "mock err",
272 }
273 .fail()
274 }))
275 .build()
276 .unwrap();
277 let mut env = TestingEnv::new();
278 env.table_metadata_manager = Arc::new(TableMetadataManager::new(Arc::new(kv)));
279 let persistent_context = new_persistent_context(1024, vec![], vec![]);
280 let ctx = env.create_context(persistent_context);
281 let err = ctx.get_datanode_table_value(1024, 1).await.unwrap_err();
282 assert!(err.is_retryable());
283 }
284}