Skip to main content

meta_srv/procedure/repartition/
update_partition_metadata.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16
17use common_meta::lock_key::TableLock;
18use common_procedure::{Context as ProcedureContext, Status};
19use serde::{Deserialize, Serialize};
20use snafu::ensure;
21use store_api::storage::TableId;
22
23use crate::error::{self, Result};
24use crate::procedure::repartition::allocate_region::AllocateRegion;
25use crate::procedure::repartition::plan::AllocationPlanEntry;
26use crate::procedure::repartition::{Context, State};
27
28#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
29pub struct PartitionMetadataUpdate {
30    #[serde(default)]
31    pub original_partition_key_indices: Vec<usize>,
32    #[serde(alias = "partition_key_indices")]
33    pub target_partition_key_indices: Vec<usize>,
34    #[serde(default)]
35    pub expect_empty_partition_key_indices: bool,
36}
37
38impl PartitionMetadataUpdate {
39    pub fn from_unpartitioned(target_partition_key_indices: Vec<usize>) -> Self {
40        Self {
41            original_partition_key_indices: vec![],
42            target_partition_key_indices,
43            expect_empty_partition_key_indices: true,
44        }
45    }
46
47    pub fn from_partitioned(
48        original_partition_key_indices: Vec<usize>,
49        target_partition_key_indices: Vec<usize>,
50    ) -> Self {
51        Self {
52            original_partition_key_indices,
53            target_partition_key_indices,
54            expect_empty_partition_key_indices: false,
55        }
56    }
57
58    fn validate_target_partition_key_indices(&self) -> Result<()> {
59        ensure!(
60            !self.target_partition_key_indices.is_empty(),
61            error::InvalidArgumentsSnafu {
62                err_msg:
63                    "Repartition partition metadata update expects non-empty target partition key indices"
64                        .to_string(),
65            }
66        );
67
68        Ok(())
69    }
70
71    pub fn target_partition_key_indices(
72        &self,
73        table_id: TableId,
74        current_partition_key_indices: &[usize],
75    ) -> Result<Option<Vec<usize>>> {
76        self.validate_target_partition_key_indices()?;
77        if current_partition_key_indices == self.target_partition_key_indices.as_slice() {
78            return Ok(None);
79        }
80        if current_partition_key_indices == self.original_partition_key_indices.as_slice() {
81            return Ok(Some(self.target_partition_key_indices.clone()));
82        }
83        if self.expect_empty_partition_key_indices {
84            ensure!(
85                current_partition_key_indices.is_empty(),
86                error::InvalidArgumentsSnafu {
87                    err_msg: format!(
88                        "Repartition partition metadata update expects an unpartitioned table, but table {} has partition key indices: {:?}",
89                        table_id, current_partition_key_indices
90                    ),
91                }
92            );
93        }
94
95        error::InvalidArgumentsSnafu {
96            err_msg: format!(
97                "Repartition partition metadata update expects partition key indices {:?} or {:?}, but table {} has partition key indices: {:?}",
98                self.original_partition_key_indices,
99                self.target_partition_key_indices,
100                table_id,
101                current_partition_key_indices
102            ),
103        }
104        .fail()
105    }
106
107    pub fn rollback_partition_key_indices(
108        &self,
109        table_id: TableId,
110        current_partition_key_indices: &[usize],
111    ) -> Result<Option<Vec<usize>>> {
112        self.validate_target_partition_key_indices()?;
113        if current_partition_key_indices == self.original_partition_key_indices.as_slice() {
114            return Ok(None);
115        }
116        if current_partition_key_indices == self.target_partition_key_indices.as_slice() {
117            return Ok(Some(self.original_partition_key_indices.clone()));
118        }
119
120        error::InvalidArgumentsSnafu {
121            err_msg: format!(
122                "Repartition partition metadata rollback expects partition key indices {:?} or {:?}, but table {} has partition key indices: {:?}",
123                self.original_partition_key_indices,
124                self.target_partition_key_indices,
125                table_id,
126                current_partition_key_indices
127            ),
128        }
129        .fail()
130    }
131}
132
133#[derive(Debug, Clone, Serialize, Deserialize)]
134pub struct UpdatePartitionMetadata {
135    plan_entries: Vec<AllocationPlanEntry>,
136}
137
138impl UpdatePartitionMetadata {
139    pub fn new(plan_entries: Vec<AllocationPlanEntry>) -> Self {
140        Self { plan_entries }
141    }
142}
143
144#[async_trait::async_trait]
145#[typetag::serde]
146impl State for UpdatePartitionMetadata {
147    async fn next(
148        &mut self,
149        ctx: &mut Context,
150        procedure_ctx: &ProcedureContext,
151    ) -> Result<(Box<dyn State>, Status)> {
152        let Some(update) = ctx.persistent_ctx.partition_metadata_update.as_ref() else {
153            return Ok((
154                Box::new(AllocateRegion::new(self.plan_entries.clone())),
155                Status::executing(false),
156            ));
157        };
158        let table_id = ctx.persistent_ctx.table_id;
159        let table_lock = TableLock::Write(table_id).into();
160        let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
161        let table_info_value = ctx.get_raw_table_info_value().await?;
162        let current_partition_key_indices = &table_info_value.table_info.meta.partition_key_indices;
163        let Some(target_partition_key_indices) =
164            update.target_partition_key_indices(table_id, current_partition_key_indices)?
165        else {
166            return Ok((
167                Box::new(AllocateRegion::new(self.plan_entries.clone())),
168                Status::executing(true),
169            ));
170        };
171
172        let mut new_table_info = table_info_value.table_info.clone();
173        new_table_info.meta.partition_key_indices = target_partition_key_indices;
174        common_telemetry::info!(
175            "Update table partition metadata, table_id: {}, partition_key_indices: {:?}, partition_columns: {:?}",
176            table_id,
177            new_table_info.meta.partition_key_indices,
178            new_table_info
179                .meta
180                .partition_column_names()
181                .collect::<Vec<_>>(),
182        );
183        ctx.update_table_info(&table_info_value, table_info_value.update(new_table_info))
184            .await?;
185        ctx.invalidate_table_cache().await?;
186
187        Ok((
188            Box::new(AllocateRegion::new(self.plan_entries.clone())),
189            Status::executing(true),
190        ))
191    }
192
193    fn as_any(&self) -> &dyn Any {
194        self
195    }
196}
197
198#[cfg(test)]
199mod tests {
200    use std::sync::Arc;
201
202    use common_meta::ddl::test_util::datanode_handler::NaiveDatanodeHandler;
203    use common_meta::test_util::MockDatanodeManager;
204    use store_api::storage::{RegionId, TableId};
205
206    use super::*;
207    use crate::procedure::repartition::test_util::{
208        TestingEnv, new_parent_context, range_expr, test_region_route, test_region_wal_options,
209    };
210
211    async fn new_test_context(env: &TestingEnv, table_id: TableId) -> Context {
212        env.create_physical_table_metadata_for_repartition(
213            table_id,
214            vec![test_region_route(RegionId::new(table_id, 1), "")],
215            test_region_wal_options(&[1]),
216        )
217        .await;
218        let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
219        let mut ctx = new_parent_context(env, node_manager, table_id);
220        ctx.persistent_ctx.partition_metadata_update =
221            Some(PartitionMetadataUpdate::from_unpartitioned(vec![0]));
222        ctx
223    }
224
225    async fn set_partition_key_indices(ctx: &Context, partition_key_indices: Vec<usize>) {
226        let current = ctx.get_raw_table_info_value().await.unwrap();
227        let mut table_info = current.table_info.clone();
228        table_info.meta.partition_key_indices = partition_key_indices;
229        ctx.update_table_info(&current, current.update(table_info))
230            .await
231            .unwrap();
232    }
233
234    async fn partition_key_indices(ctx: &Context) -> Vec<usize> {
235        ctx.get_table_info_value()
236            .await
237            .unwrap()
238            .table_info
239            .meta
240            .partition_key_indices
241    }
242
243    #[tokio::test]
244    async fn test_update_partition_metadata_applies_to_unpartitioned_table() {
245        let env = TestingEnv::new();
246        let table_id = 1024;
247        let mut ctx = new_test_context(&env, table_id).await;
248        let mut state = UpdatePartitionMetadata::new(vec![]);
249
250        let (next, status) = state
251            .next(&mut ctx, &TestingEnv::procedure_context())
252            .await
253            .unwrap();
254
255        assert!(status.need_persist());
256        assert!(next.as_any().is::<AllocateRegion>());
257        assert_eq!(partition_key_indices(&ctx).await, vec![0]);
258    }
259
260    #[tokio::test]
261    async fn test_update_partition_metadata_replay_is_noop() {
262        let env = TestingEnv::new();
263        let table_id = 1024;
264        let mut ctx = new_test_context(&env, table_id).await;
265        set_partition_key_indices(&ctx, vec![0]).await;
266        let mut state = UpdatePartitionMetadata::new(vec![]);
267
268        let (next, status) = state
269            .next(&mut ctx, &TestingEnv::procedure_context())
270            .await
271            .unwrap();
272
273        assert!(status.need_persist());
274        assert!(next.as_any().is::<AllocateRegion>());
275        assert_eq!(partition_key_indices(&ctx).await, vec![0]);
276    }
277
278    #[tokio::test]
279    async fn test_update_partition_metadata_rejects_empty_partition_key_indices() {
280        let env = TestingEnv::new();
281        let table_id = 1024;
282        let mut ctx = new_test_context(&env, table_id).await;
283        ctx.persistent_ctx.partition_metadata_update =
284            Some(PartitionMetadataUpdate::from_unpartitioned(vec![]));
285        let mut state = UpdatePartitionMetadata::new(vec![]);
286
287        let err = state
288            .next(&mut ctx, &TestingEnv::procedure_context())
289            .await
290            .unwrap_err();
291
292        assert!(
293            err.to_string()
294                .contains("non-empty target partition key indices")
295        );
296        assert!(partition_key_indices(&ctx).await.is_empty());
297    }
298
299    #[tokio::test]
300    async fn test_update_partition_metadata_rejects_other_partition_keys() {
301        let env = TestingEnv::new();
302        let table_id = 1024;
303        let mut ctx = new_test_context(&env, table_id).await;
304        set_partition_key_indices(&ctx, vec![1]).await;
305        let mut state = UpdatePartitionMetadata::new(vec![]);
306
307        let err = state
308            .next(&mut ctx, &TestingEnv::procedure_context())
309            .await
310            .unwrap_err();
311
312        assert!(err.to_string().contains("expects an unpartitioned table"));
313        assert_eq!(partition_key_indices(&ctx).await, vec![1]);
314    }
315
316    #[tokio::test]
317    async fn test_update_partition_metadata_preserves_plan_entries() {
318        let env = TestingEnv::new();
319        let table_id = 1024;
320        let mut ctx = new_test_context(&env, table_id).await;
321        let plan_entries = vec![crate::procedure::repartition::plan::AllocationPlanEntry {
322            group_id: uuid::Uuid::new_v4(),
323            source_regions: vec![
324                crate::procedure::repartition::plan::SourceRegionDescriptor::Default {
325                    region_id: RegionId::new(table_id, 1),
326                },
327            ],
328            target_partition_exprs: vec![range_expr("x", 0, 10)],
329            transition_map: vec![vec![0]],
330        }];
331        let mut state = UpdatePartitionMetadata::new(plan_entries);
332
333        let (next, _) = state
334            .next(&mut ctx, &TestingEnv::procedure_context())
335            .await
336            .unwrap();
337
338        assert!(next.as_any().is::<AllocateRegion>());
339    }
340
341    #[tokio::test]
342    async fn test_update_partition_metadata_overwrites_partitioned_table() {
343        let env = TestingEnv::new();
344        let table_id = 1024;
345        let mut ctx = new_test_context(&env, table_id).await;
346        set_partition_key_indices(&ctx, vec![0]).await;
347        ctx.persistent_ctx.partition_metadata_update = Some(
348            PartitionMetadataUpdate::from_partitioned(vec![0], vec![2, 1]),
349        );
350        let mut state = UpdatePartitionMetadata::new(vec![]);
351
352        let (next, status) = state
353            .next(&mut ctx, &TestingEnv::procedure_context())
354            .await
355            .unwrap();
356
357        assert!(status.need_persist());
358        assert!(next.as_any().is::<AllocateRegion>());
359        assert_eq!(partition_key_indices(&ctx).await, vec![2, 1]);
360    }
361
362    #[tokio::test]
363    async fn test_update_partition_metadata_rejects_unexpected_partition_keys() {
364        let env = TestingEnv::new();
365        let table_id = 1024;
366        let mut ctx = new_test_context(&env, table_id).await;
367        set_partition_key_indices(&ctx, vec![1]).await;
368        ctx.persistent_ctx.partition_metadata_update =
369            Some(PartitionMetadataUpdate::from_partitioned(vec![0], vec![2]));
370        let mut state = UpdatePartitionMetadata::new(vec![]);
371
372        let err = state
373            .next(&mut ctx, &TestingEnv::procedure_context())
374            .await
375            .unwrap_err();
376
377        assert!(err.to_string().contains("expects partition key indices"));
378        assert_eq!(partition_key_indices(&ctx).await, vec![1]);
379    }
380
381    #[test]
382    fn test_partition_metadata_update_rollback_target_to_original() {
383        let update = PartitionMetadataUpdate::from_partitioned(vec![0], vec![2, 1]);
384
385        let rollback_indices = update
386            .rollback_partition_key_indices(1024, &[2, 1])
387            .unwrap();
388
389        assert_eq!(rollback_indices, Some(vec![0]));
390    }
391
392    #[test]
393    fn test_partition_metadata_update_rollback_original_is_noop() {
394        let update = PartitionMetadataUpdate::from_partitioned(vec![0], vec![2, 1]);
395
396        let rollback_indices = update.rollback_partition_key_indices(1024, &[0]).unwrap();
397
398        assert_eq!(rollback_indices, None);
399    }
400
401    #[test]
402    fn test_partition_metadata_update_rollback_rejects_unexpected_state() {
403        let update = PartitionMetadataUpdate::from_partitioned(vec![0], vec![2, 1]);
404
405        let err = update
406            .rollback_partition_key_indices(1024, &[1])
407            .unwrap_err();
408
409        assert!(
410            err.to_string()
411                .contains("rollback expects partition key indices")
412        );
413    }
414}