meta_srv/procedure/repartition/
update_partition_metadata.rs1use std::any::Any;
16
17use common_meta::lock_key::TableLock;
18use common_procedure::{Context as ProcedureContext, Status};
19use serde::{Deserialize, Serialize};
20use snafu::ensure;
21use store_api::storage::TableId;
22
23use crate::error::{self, Result};
24use crate::procedure::repartition::allocate_region::AllocateRegion;
25use crate::procedure::repartition::plan::AllocationPlanEntry;
26use crate::procedure::repartition::{Context, State};
27
28#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
29pub struct PartitionMetadataUpdate {
30 #[serde(default)]
31 pub original_partition_key_indices: Vec<usize>,
32 #[serde(alias = "partition_key_indices")]
33 pub target_partition_key_indices: Vec<usize>,
34 #[serde(default)]
35 pub expect_empty_partition_key_indices: bool,
36}
37
38impl PartitionMetadataUpdate {
39 pub fn from_unpartitioned(target_partition_key_indices: Vec<usize>) -> Self {
40 Self {
41 original_partition_key_indices: vec![],
42 target_partition_key_indices,
43 expect_empty_partition_key_indices: true,
44 }
45 }
46
47 pub fn from_partitioned(
48 original_partition_key_indices: Vec<usize>,
49 target_partition_key_indices: Vec<usize>,
50 ) -> Self {
51 Self {
52 original_partition_key_indices,
53 target_partition_key_indices,
54 expect_empty_partition_key_indices: false,
55 }
56 }
57
58 fn validate_target_partition_key_indices(&self) -> Result<()> {
59 ensure!(
60 !self.target_partition_key_indices.is_empty(),
61 error::InvalidArgumentsSnafu {
62 err_msg:
63 "Repartition partition metadata update expects non-empty target partition key indices"
64 .to_string(),
65 }
66 );
67
68 Ok(())
69 }
70
71 pub fn target_partition_key_indices(
72 &self,
73 table_id: TableId,
74 current_partition_key_indices: &[usize],
75 ) -> Result<Option<Vec<usize>>> {
76 self.validate_target_partition_key_indices()?;
77 if current_partition_key_indices == self.target_partition_key_indices.as_slice() {
78 return Ok(None);
79 }
80 if current_partition_key_indices == self.original_partition_key_indices.as_slice() {
81 return Ok(Some(self.target_partition_key_indices.clone()));
82 }
83 if self.expect_empty_partition_key_indices {
84 ensure!(
85 current_partition_key_indices.is_empty(),
86 error::InvalidArgumentsSnafu {
87 err_msg: format!(
88 "Repartition partition metadata update expects an unpartitioned table, but table {} has partition key indices: {:?}",
89 table_id, current_partition_key_indices
90 ),
91 }
92 );
93 }
94
95 error::InvalidArgumentsSnafu {
96 err_msg: format!(
97 "Repartition partition metadata update expects partition key indices {:?} or {:?}, but table {} has partition key indices: {:?}",
98 self.original_partition_key_indices,
99 self.target_partition_key_indices,
100 table_id,
101 current_partition_key_indices
102 ),
103 }
104 .fail()
105 }
106
107 pub fn rollback_partition_key_indices(
108 &self,
109 table_id: TableId,
110 current_partition_key_indices: &[usize],
111 ) -> Result<Option<Vec<usize>>> {
112 self.validate_target_partition_key_indices()?;
113 if current_partition_key_indices == self.original_partition_key_indices.as_slice() {
114 return Ok(None);
115 }
116 if current_partition_key_indices == self.target_partition_key_indices.as_slice() {
117 return Ok(Some(self.original_partition_key_indices.clone()));
118 }
119
120 error::InvalidArgumentsSnafu {
121 err_msg: format!(
122 "Repartition partition metadata rollback expects partition key indices {:?} or {:?}, but table {} has partition key indices: {:?}",
123 self.original_partition_key_indices,
124 self.target_partition_key_indices,
125 table_id,
126 current_partition_key_indices
127 ),
128 }
129 .fail()
130 }
131}
132
133#[derive(Debug, Clone, Serialize, Deserialize)]
134pub struct UpdatePartitionMetadata {
135 plan_entries: Vec<AllocationPlanEntry>,
136}
137
138impl UpdatePartitionMetadata {
139 pub fn new(plan_entries: Vec<AllocationPlanEntry>) -> Self {
140 Self { plan_entries }
141 }
142}
143
144#[async_trait::async_trait]
145#[typetag::serde]
146impl State for UpdatePartitionMetadata {
147 async fn next(
148 &mut self,
149 ctx: &mut Context,
150 procedure_ctx: &ProcedureContext,
151 ) -> Result<(Box<dyn State>, Status)> {
152 let Some(update) = ctx.persistent_ctx.partition_metadata_update.as_ref() else {
153 return Ok((
154 Box::new(AllocateRegion::new(self.plan_entries.clone())),
155 Status::executing(false),
156 ));
157 };
158 let table_id = ctx.persistent_ctx.table_id;
159 let table_lock = TableLock::Write(table_id).into();
160 let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
161 let table_info_value = ctx.get_raw_table_info_value().await?;
162 let current_partition_key_indices = &table_info_value.table_info.meta.partition_key_indices;
163 let Some(target_partition_key_indices) =
164 update.target_partition_key_indices(table_id, current_partition_key_indices)?
165 else {
166 return Ok((
167 Box::new(AllocateRegion::new(self.plan_entries.clone())),
168 Status::executing(true),
169 ));
170 };
171
172 let mut new_table_info = table_info_value.table_info.clone();
173 new_table_info.meta.partition_key_indices = target_partition_key_indices;
174 common_telemetry::info!(
175 "Update table partition metadata, table_id: {}, partition_key_indices: {:?}, partition_columns: {:?}",
176 table_id,
177 new_table_info.meta.partition_key_indices,
178 new_table_info
179 .meta
180 .partition_column_names()
181 .collect::<Vec<_>>(),
182 );
183 ctx.update_table_info(&table_info_value, table_info_value.update(new_table_info))
184 .await?;
185 ctx.invalidate_table_cache().await?;
186
187 Ok((
188 Box::new(AllocateRegion::new(self.plan_entries.clone())),
189 Status::executing(true),
190 ))
191 }
192
193 fn as_any(&self) -> &dyn Any {
194 self
195 }
196}
197
198#[cfg(test)]
199mod tests {
200 use std::sync::Arc;
201
202 use common_meta::ddl::test_util::datanode_handler::NaiveDatanodeHandler;
203 use common_meta::test_util::MockDatanodeManager;
204 use store_api::storage::{RegionId, TableId};
205
206 use super::*;
207 use crate::procedure::repartition::test_util::{
208 TestingEnv, new_parent_context, range_expr, test_region_route, test_region_wal_options,
209 };
210
211 async fn new_test_context(env: &TestingEnv, table_id: TableId) -> Context {
212 env.create_physical_table_metadata_for_repartition(
213 table_id,
214 vec![test_region_route(RegionId::new(table_id, 1), "")],
215 test_region_wal_options(&[1]),
216 )
217 .await;
218 let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
219 let mut ctx = new_parent_context(env, node_manager, table_id);
220 ctx.persistent_ctx.partition_metadata_update =
221 Some(PartitionMetadataUpdate::from_unpartitioned(vec![0]));
222 ctx
223 }
224
225 async fn set_partition_key_indices(ctx: &Context, partition_key_indices: Vec<usize>) {
226 let current = ctx.get_raw_table_info_value().await.unwrap();
227 let mut table_info = current.table_info.clone();
228 table_info.meta.partition_key_indices = partition_key_indices;
229 ctx.update_table_info(¤t, current.update(table_info))
230 .await
231 .unwrap();
232 }
233
234 async fn partition_key_indices(ctx: &Context) -> Vec<usize> {
235 ctx.get_table_info_value()
236 .await
237 .unwrap()
238 .table_info
239 .meta
240 .partition_key_indices
241 }
242
243 #[tokio::test]
244 async fn test_update_partition_metadata_applies_to_unpartitioned_table() {
245 let env = TestingEnv::new();
246 let table_id = 1024;
247 let mut ctx = new_test_context(&env, table_id).await;
248 let mut state = UpdatePartitionMetadata::new(vec![]);
249
250 let (next, status) = state
251 .next(&mut ctx, &TestingEnv::procedure_context())
252 .await
253 .unwrap();
254
255 assert!(status.need_persist());
256 assert!(next.as_any().is::<AllocateRegion>());
257 assert_eq!(partition_key_indices(&ctx).await, vec![0]);
258 }
259
260 #[tokio::test]
261 async fn test_update_partition_metadata_replay_is_noop() {
262 let env = TestingEnv::new();
263 let table_id = 1024;
264 let mut ctx = new_test_context(&env, table_id).await;
265 set_partition_key_indices(&ctx, vec![0]).await;
266 let mut state = UpdatePartitionMetadata::new(vec![]);
267
268 let (next, status) = state
269 .next(&mut ctx, &TestingEnv::procedure_context())
270 .await
271 .unwrap();
272
273 assert!(status.need_persist());
274 assert!(next.as_any().is::<AllocateRegion>());
275 assert_eq!(partition_key_indices(&ctx).await, vec![0]);
276 }
277
278 #[tokio::test]
279 async fn test_update_partition_metadata_rejects_empty_partition_key_indices() {
280 let env = TestingEnv::new();
281 let table_id = 1024;
282 let mut ctx = new_test_context(&env, table_id).await;
283 ctx.persistent_ctx.partition_metadata_update =
284 Some(PartitionMetadataUpdate::from_unpartitioned(vec![]));
285 let mut state = UpdatePartitionMetadata::new(vec![]);
286
287 let err = state
288 .next(&mut ctx, &TestingEnv::procedure_context())
289 .await
290 .unwrap_err();
291
292 assert!(
293 err.to_string()
294 .contains("non-empty target partition key indices")
295 );
296 assert!(partition_key_indices(&ctx).await.is_empty());
297 }
298
299 #[tokio::test]
300 async fn test_update_partition_metadata_rejects_other_partition_keys() {
301 let env = TestingEnv::new();
302 let table_id = 1024;
303 let mut ctx = new_test_context(&env, table_id).await;
304 set_partition_key_indices(&ctx, vec![1]).await;
305 let mut state = UpdatePartitionMetadata::new(vec![]);
306
307 let err = state
308 .next(&mut ctx, &TestingEnv::procedure_context())
309 .await
310 .unwrap_err();
311
312 assert!(err.to_string().contains("expects an unpartitioned table"));
313 assert_eq!(partition_key_indices(&ctx).await, vec![1]);
314 }
315
316 #[tokio::test]
317 async fn test_update_partition_metadata_preserves_plan_entries() {
318 let env = TestingEnv::new();
319 let table_id = 1024;
320 let mut ctx = new_test_context(&env, table_id).await;
321 let plan_entries = vec![crate::procedure::repartition::plan::AllocationPlanEntry {
322 group_id: uuid::Uuid::new_v4(),
323 source_regions: vec![
324 crate::procedure::repartition::plan::SourceRegionDescriptor::Default {
325 region_id: RegionId::new(table_id, 1),
326 },
327 ],
328 target_partition_exprs: vec![range_expr("x", 0, 10)],
329 transition_map: vec![vec![0]],
330 }];
331 let mut state = UpdatePartitionMetadata::new(plan_entries);
332
333 let (next, _) = state
334 .next(&mut ctx, &TestingEnv::procedure_context())
335 .await
336 .unwrap();
337
338 assert!(next.as_any().is::<AllocateRegion>());
339 }
340
341 #[tokio::test]
342 async fn test_update_partition_metadata_overwrites_partitioned_table() {
343 let env = TestingEnv::new();
344 let table_id = 1024;
345 let mut ctx = new_test_context(&env, table_id).await;
346 set_partition_key_indices(&ctx, vec![0]).await;
347 ctx.persistent_ctx.partition_metadata_update = Some(
348 PartitionMetadataUpdate::from_partitioned(vec![0], vec![2, 1]),
349 );
350 let mut state = UpdatePartitionMetadata::new(vec![]);
351
352 let (next, status) = state
353 .next(&mut ctx, &TestingEnv::procedure_context())
354 .await
355 .unwrap();
356
357 assert!(status.need_persist());
358 assert!(next.as_any().is::<AllocateRegion>());
359 assert_eq!(partition_key_indices(&ctx).await, vec![2, 1]);
360 }
361
362 #[tokio::test]
363 async fn test_update_partition_metadata_rejects_unexpected_partition_keys() {
364 let env = TestingEnv::new();
365 let table_id = 1024;
366 let mut ctx = new_test_context(&env, table_id).await;
367 set_partition_key_indices(&ctx, vec![1]).await;
368 ctx.persistent_ctx.partition_metadata_update =
369 Some(PartitionMetadataUpdate::from_partitioned(vec![0], vec![2]));
370 let mut state = UpdatePartitionMetadata::new(vec![]);
371
372 let err = state
373 .next(&mut ctx, &TestingEnv::procedure_context())
374 .await
375 .unwrap_err();
376
377 assert!(err.to_string().contains("expects partition key indices"));
378 assert_eq!(partition_key_indices(&ctx).await, vec![1]);
379 }
380
381 #[test]
382 fn test_partition_metadata_update_rollback_target_to_original() {
383 let update = PartitionMetadataUpdate::from_partitioned(vec![0], vec![2, 1]);
384
385 let rollback_indices = update
386 .rollback_partition_key_indices(1024, &[2, 1])
387 .unwrap();
388
389 assert_eq!(rollback_indices, Some(vec![0]));
390 }
391
392 #[test]
393 fn test_partition_metadata_update_rollback_original_is_noop() {
394 let update = PartitionMetadataUpdate::from_partitioned(vec![0], vec![2, 1]);
395
396 let rollback_indices = update.rollback_partition_key_indices(1024, &[0]).unwrap();
397
398 assert_eq!(rollback_indices, None);
399 }
400
401 #[test]
402 fn test_partition_metadata_update_rollback_rejects_unexpected_state() {
403 let update = PartitionMetadataUpdate::from_partitioned(vec![0], vec![2, 1]);
404
405 let err = update
406 .rollback_partition_key_indices(1024, &[1])
407 .unwrap_err();
408
409 assert!(
410 err.to_string()
411 .contains("rollback expects partition key indices")
412 );
413 }
414}