meta_srv/procedure/repartition/
group.rs1pub(crate) mod enter_staging_region;
16pub(crate) mod repartition_start;
17pub(crate) mod update_metadata;
18pub(crate) mod utils;
19
20use std::any::Any;
21use std::fmt::Debug;
22use std::time::Duration;
23
24use common_error::ext::BoxedError;
25use common_meta::DatanodeId;
26use common_meta::cache_invalidator::CacheInvalidatorRef;
27use common_meta::instruction::CacheIdent;
28use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue, RegionInfo};
29use common_meta::key::table_route::TableRouteValue;
30use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
31use common_meta::rpc::router::RegionRoute;
32use common_procedure::{
33 Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
34 UserMetadata,
35};
36use serde::{Deserialize, Serialize};
37use snafu::{OptionExt, ResultExt};
38use store_api::storage::{RegionId, TableId};
39use uuid::Uuid;
40
41use crate::error::{self, Result};
42use crate::procedure::repartition::group::repartition_start::RepartitionStart;
43use crate::procedure::repartition::plan::RegionDescriptor;
44use crate::procedure::repartition::{self};
45use crate::service::mailbox::MailboxRef;
46
47pub type GroupId = Uuid;
48
49#[allow(dead_code)]
50pub struct RepartitionGroupProcedure {
51 state: Box<dyn State>,
52 context: Context,
53}
54
55impl RepartitionGroupProcedure {
56 const TYPE_NAME: &'static str = "metasrv-procedure::RepartitionGroup";
57
58 pub fn new(persistent_context: PersistentContext, context: &repartition::Context) -> Self {
59 let state = Box::new(RepartitionStart);
60
61 Self {
62 state,
63 context: Context {
64 persistent_ctx: persistent_context,
65 cache_invalidator: context.cache_invalidator.clone(),
66 table_metadata_manager: context.table_metadata_manager.clone(),
67 mailbox: context.mailbox.clone(),
68 server_addr: context.server_addr.clone(),
69 },
70 }
71 }
72}
73
74#[async_trait::async_trait]
75impl Procedure for RepartitionGroupProcedure {
76 fn type_name(&self) -> &str {
77 Self::TYPE_NAME
78 }
79
80 async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
81 todo!()
82 }
83
84 async fn rollback(&mut self, _: &ProcedureContext) -> ProcedureResult<()> {
85 todo!()
86 }
87
88 fn rollback_supported(&self) -> bool {
89 true
90 }
91
92 fn dump(&self) -> ProcedureResult<String> {
93 todo!()
94 }
95
96 fn lock_key(&self) -> LockKey {
97 todo!()
98 }
99
100 fn user_metadata(&self) -> Option<UserMetadata> {
101 todo!()
102 }
103}
104
105pub struct Context {
106 pub persistent_ctx: PersistentContext,
107
108 pub cache_invalidator: CacheInvalidatorRef,
109
110 pub table_metadata_manager: TableMetadataManagerRef,
111
112 pub mailbox: MailboxRef,
113
114 pub server_addr: String,
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
119pub struct GroupPrepareResult {
120 pub source_routes: Vec<RegionRoute>,
122 pub target_routes: Vec<RegionRoute>,
124 pub central_region: RegionId,
126 pub central_region_datanode_id: DatanodeId,
128}
129
130#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
131pub struct PersistentContext {
132 pub group_id: GroupId,
133 pub table_id: TableId,
135 pub sources: Vec<RegionDescriptor>,
137 pub targets: Vec<RegionDescriptor>,
139 pub group_prepare_result: Option<GroupPrepareResult>,
142}
143
144impl PersistentContext {
145 pub fn new(
146 group_id: GroupId,
147 table_id: TableId,
148 sources: Vec<RegionDescriptor>,
149 targets: Vec<RegionDescriptor>,
150 ) -> Self {
151 Self {
152 group_id,
153 table_id,
154 sources,
155 targets,
156 group_prepare_result: None,
157 }
158 }
159}
160
161impl Context {
162 pub async fn get_table_route_value(
170 &self,
171 ) -> Result<DeserializedValueWithBytes<TableRouteValue>> {
172 let table_id = self.persistent_ctx.table_id;
173 let group_id = self.persistent_ctx.group_id;
174 let table_route_value = self
175 .table_metadata_manager
176 .table_route_manager()
177 .table_route_storage()
178 .get_with_raw_bytes(table_id)
179 .await
180 .map_err(BoxedError::new)
181 .with_context(|_| error::RetryLaterWithSourceSnafu {
182 reason: format!(
183 "Failed to get table route for table: {}, repartition group: {}",
184 table_id, group_id
185 ),
186 })?
187 .context(error::TableRouteNotFoundSnafu { table_id })?;
188
189 Ok(table_route_value)
190 }
191
192 pub async fn get_datanode_table_value(
197 &self,
198 table_id: TableId,
199 datanode_id: u64,
200 ) -> Result<DatanodeTableValue> {
201 let datanode_table_value = self
202 .table_metadata_manager
203 .datanode_table_manager()
204 .get(&DatanodeTableKey {
205 datanode_id,
206 table_id,
207 })
208 .await
209 .context(error::TableMetadataManagerSnafu)
210 .map_err(BoxedError::new)
211 .with_context(|_| error::RetryLaterWithSourceSnafu {
212 reason: format!("Failed to get DatanodeTable: {table_id}"),
213 })?
214 .context(error::DatanodeTableNotFoundSnafu {
215 table_id,
216 datanode_id,
217 })?;
218 Ok(datanode_table_value)
219 }
220
221 pub async fn invalidate_table_cache(&self) -> Result<()> {
223 let table_id = self.persistent_ctx.table_id;
224 let group_id = self.persistent_ctx.group_id;
225 let subject = format!(
226 "Invalidate table cache for repartition table, group: {}, table: {}",
227 group_id, table_id,
228 );
229 let ctx = common_meta::cache_invalidator::Context {
230 subject: Some(subject),
231 };
232 let _ = self
233 .cache_invalidator
234 .invalidate(&ctx, &[CacheIdent::TableId(table_id)])
235 .await;
236 Ok(())
237 }
238
239 pub async fn update_table_route(
248 &self,
249 current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
250 new_region_routes: Vec<RegionRoute>,
251 ) -> Result<()> {
252 let table_id = self.persistent_ctx.table_id;
253 let prepare_result = self.persistent_ctx.group_prepare_result.as_ref().unwrap();
255 let central_region_datanode_table_value = self
256 .get_datanode_table_value(table_id, prepare_result.central_region_datanode_id)
257 .await?;
258 let RegionInfo {
259 region_options,
260 region_wal_options,
261 ..
262 } = ¢ral_region_datanode_table_value.region_info;
263
264 self.table_metadata_manager
265 .update_table_route(
266 table_id,
267 central_region_datanode_table_value.region_info.clone(),
268 current_table_route_value,
269 new_region_routes,
270 region_options,
271 region_wal_options,
272 )
273 .await
274 .context(error::TableMetadataManagerSnafu)
275 }
276
277 pub fn next_operation_timeout(&self) -> Option<Duration> {
281 Some(Duration::from_secs(10))
282 }
283}
284
285pub fn region_routes(
290 table_id: TableId,
291 table_route_value: &TableRouteValue,
292) -> Result<&Vec<RegionRoute>> {
293 table_route_value
294 .region_routes()
295 .with_context(|_| error::UnexpectedLogicalRouteTableSnafu {
296 err_msg: format!(
297 "TableRoute({:?}) is a non-physical TableRouteValue.",
298 table_id
299 ),
300 })
301}
302
303#[async_trait::async_trait]
304#[typetag::serde(tag = "repartition_group_state")]
305pub(crate) trait State: Sync + Send + Debug {
306 fn name(&self) -> &'static str {
307 let type_name = std::any::type_name::<Self>();
308 type_name.split("::").last().unwrap_or(type_name)
310 }
311
312 async fn next(
314 &mut self,
315 ctx: &mut Context,
316 procedure_ctx: &ProcedureContext,
317 ) -> Result<(Box<dyn State>, Status)>;
318
319 fn as_any(&self) -> &dyn Any;
320}
321
322#[cfg(test)]
323mod tests {
324 use std::assert_matches::assert_matches;
325 use std::sync::Arc;
326
327 use common_meta::key::TableMetadataManager;
328 use common_meta::kv_backend::test_util::MockKvBackendBuilder;
329
330 use crate::error::Error;
331 use crate::procedure::repartition::test_util::{TestingEnv, new_persistent_context};
332
333 #[tokio::test]
334 async fn test_get_table_route_value_not_found_error() {
335 let env = TestingEnv::new();
336 let persistent_context = new_persistent_context(1024, vec![], vec![]);
337 let ctx = env.create_context(persistent_context);
338 let err = ctx.get_table_route_value().await.unwrap_err();
339 assert_matches!(err, Error::TableRouteNotFound { .. });
340 assert!(!err.is_retryable());
341 }
342
343 #[tokio::test]
344 async fn test_get_table_route_value_retry_error() {
345 let kv = MockKvBackendBuilder::default()
346 .range_fn(Arc::new(|_| {
347 common_meta::error::UnexpectedSnafu {
348 err_msg: "mock err",
349 }
350 .fail()
351 }))
352 .build()
353 .unwrap();
354 let mut env = TestingEnv::new();
355 env.table_metadata_manager = Arc::new(TableMetadataManager::new(Arc::new(kv)));
356 let persistent_context = new_persistent_context(1024, vec![], vec![]);
357 let ctx = env.create_context(persistent_context);
358 let err = ctx.get_table_route_value().await.unwrap_err();
359 assert!(err.is_retryable());
360 }
361
362 #[tokio::test]
363 async fn test_get_datanode_table_value_retry_error() {
364 let kv = MockKvBackendBuilder::default()
365 .range_fn(Arc::new(|_| {
366 common_meta::error::UnexpectedSnafu {
367 err_msg: "mock err",
368 }
369 .fail()
370 }))
371 .build()
372 .unwrap();
373 let mut env = TestingEnv::new();
374 env.table_metadata_manager = Arc::new(TableMetadataManager::new(Arc::new(kv)));
375 let persistent_context = new_persistent_context(1024, vec![], vec![]);
376 let ctx = env.create_context(persistent_context);
377 let err = ctx.get_datanode_table_value(1024, 1).await.unwrap_err();
378 assert!(err.is_retryable());
379 }
380}