meta_srv/procedure/repartition/
group.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) mod repartition_start;
16pub(crate) mod update_metadata;
17
18use std::any::Any;
19use std::fmt::Debug;
20
21use common_error::ext::BoxedError;
22use common_meta::DatanodeId;
23use common_meta::cache_invalidator::CacheInvalidatorRef;
24use common_meta::instruction::CacheIdent;
25use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue, RegionInfo};
26use common_meta::key::table_route::TableRouteValue;
27use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
28use common_meta::rpc::router::RegionRoute;
29use common_procedure::{Context as ProcedureContext, Status};
30use serde::{Deserialize, Serialize};
31use snafu::{OptionExt, ResultExt};
32use store_api::storage::{RegionId, TableId};
33use uuid::Uuid;
34
35use crate::error::{self, Result};
36use crate::procedure::repartition::plan::RegionDescriptor;
37
38pub type GroupId = Uuid;
39
40pub struct RepartitionGroupProcedure {}
41
42pub struct Context {
43    pub persistent_ctx: PersistentContext,
44
45    pub cache_invalidator: CacheInvalidatorRef,
46
47    pub table_metadata_manager: TableMetadataManagerRef,
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
51pub struct GroupPrepareResult {
52    pub source_routes: Vec<RegionRoute>,
53    pub target_routes: Vec<RegionRoute>,
54    pub central_region: RegionId,
55    pub central_region_datanode_id: DatanodeId,
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
59pub struct PersistentContext {
60    pub group_id: GroupId,
61    /// The table id of the repartition group.
62    pub table_id: TableId,
63    /// The source regions of the repartition group.
64    pub sources: Vec<RegionDescriptor>,
65    /// The target regions of the repartition group.
66    pub targets: Vec<RegionDescriptor>,
67    /// The result of group prepare.
68    /// The value will be set in [RepartitionStart](crate::procedure::repartition::group::repartition_start::RepartitionStart) state.
69    pub group_prepare_result: Option<GroupPrepareResult>,
70}
71
72impl Context {
73    /// Retrieves the table route value for the given table id.
74    ///
75    /// Retry:
76    /// - Failed to retrieve the metadata of table.
77    ///
78    /// Abort:
79    /// - Table route not found.
80    pub async fn get_table_route_value(
81        &self,
82    ) -> Result<DeserializedValueWithBytes<TableRouteValue>> {
83        let table_id = self.persistent_ctx.table_id;
84        let group_id = self.persistent_ctx.group_id;
85        let table_route_value = self
86            .table_metadata_manager
87            .table_route_manager()
88            .table_route_storage()
89            .get_with_raw_bytes(table_id)
90            .await
91            .map_err(BoxedError::new)
92            .with_context(|_| error::RetryLaterWithSourceSnafu {
93                reason: format!(
94                    "Failed to get table route for table: {}, repartition group: {}",
95                    table_id, group_id
96                ),
97            })?
98            .context(error::TableRouteNotFoundSnafu { table_id })?;
99
100        Ok(table_route_value)
101    }
102
103    /// Returns the `datanode_table_value`
104    ///
105    /// Retry:
106    /// - Failed to retrieve the metadata of datanode table.
107    pub async fn get_datanode_table_value(
108        &self,
109        table_id: TableId,
110        datanode_id: u64,
111    ) -> Result<DatanodeTableValue> {
112        let datanode_table_value = self
113            .table_metadata_manager
114            .datanode_table_manager()
115            .get(&DatanodeTableKey {
116                datanode_id,
117                table_id,
118            })
119            .await
120            .context(error::TableMetadataManagerSnafu)
121            .map_err(BoxedError::new)
122            .with_context(|_| error::RetryLaterWithSourceSnafu {
123                reason: format!("Failed to get DatanodeTable: {table_id}"),
124            })?
125            .context(error::DatanodeTableNotFoundSnafu {
126                table_id,
127                datanode_id,
128            })?;
129        Ok(datanode_table_value)
130    }
131
132    /// Broadcasts the invalidate table cache message.
133    pub async fn invalidate_table_cache(&self) -> Result<()> {
134        let table_id = self.persistent_ctx.table_id;
135        let group_id = self.persistent_ctx.group_id;
136        let subject = format!(
137            "Invalidate table cache for repartition table, group: {}, table: {}",
138            group_id, table_id,
139        );
140        let ctx = common_meta::cache_invalidator::Context {
141            subject: Some(subject),
142        };
143        let _ = self
144            .cache_invalidator
145            .invalidate(&ctx, &[CacheIdent::TableId(table_id)])
146            .await;
147        Ok(())
148    }
149
150    /// Updates the table route.
151    ///
152    /// Retry:
153    /// - Failed to retrieve the metadata of datanode table.
154    ///
155    /// Abort:
156    /// - Table route not found.
157    /// - Failed to update the table route.
158    pub async fn update_table_route(
159        &self,
160        current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
161        new_region_routes: Vec<RegionRoute>,
162    ) -> Result<()> {
163        let table_id = self.persistent_ctx.table_id;
164        // Safety: prepare result is set in [RepartitionStart] state.
165        let prepare_result = self.persistent_ctx.group_prepare_result.as_ref().unwrap();
166        let central_region_datanode_table_value = self
167            .get_datanode_table_value(table_id, prepare_result.central_region_datanode_id)
168            .await?;
169        let RegionInfo {
170            region_options,
171            region_wal_options,
172            ..
173        } = &central_region_datanode_table_value.region_info;
174
175        self.table_metadata_manager
176            .update_table_route(
177                table_id,
178                central_region_datanode_table_value.region_info.clone(),
179                current_table_route_value,
180                new_region_routes,
181                region_options,
182                region_wal_options,
183            )
184            .await
185            .context(error::TableMetadataManagerSnafu)
186    }
187}
188
189/// Returns the region routes of the given table route value.
190///
191/// Abort:
192/// - Table route value is not physical.
193pub fn region_routes(
194    table_id: TableId,
195    table_route_value: &TableRouteValue,
196) -> Result<&Vec<RegionRoute>> {
197    table_route_value
198        .region_routes()
199        .with_context(|_| error::UnexpectedLogicalRouteTableSnafu {
200            err_msg: format!(
201                "TableRoute({:?}) is a non-physical TableRouteValue.",
202                table_id
203            ),
204        })
205}
206
207#[async_trait::async_trait]
208#[typetag::serde(tag = "repartition_group_state")]
209pub(crate) trait State: Sync + Send + Debug {
210    fn name(&self) -> &'static str {
211        let type_name = std::any::type_name::<Self>();
212        // short name
213        type_name.split("::").last().unwrap_or(type_name)
214    }
215
216    /// Yields the next [State] and [Status].
217    async fn next(
218        &mut self,
219        ctx: &mut Context,
220        procedure_ctx: &ProcedureContext,
221    ) -> Result<(Box<dyn State>, Status)>;
222
223    fn as_any(&self) -> &dyn Any;
224}
225
226#[cfg(test)]
227mod tests {
228    use std::assert_matches::assert_matches;
229    use std::sync::Arc;
230
231    use common_meta::key::TableMetadataManager;
232    use common_meta::kv_backend::test_util::MockKvBackendBuilder;
233
234    use crate::error::Error;
235    use crate::procedure::repartition::test_util::{TestingEnv, new_persistent_context};
236
237    #[tokio::test]
238    async fn test_get_table_route_value_not_found_error() {
239        let env = TestingEnv::new();
240        let persistent_context = new_persistent_context(1024, vec![], vec![]);
241        let ctx = env.create_context(persistent_context);
242        let err = ctx.get_table_route_value().await.unwrap_err();
243        assert_matches!(err, Error::TableRouteNotFound { .. });
244        assert!(!err.is_retryable());
245    }
246
247    #[tokio::test]
248    async fn test_get_table_route_value_retry_error() {
249        let kv = MockKvBackendBuilder::default()
250            .range_fn(Arc::new(|_| {
251                common_meta::error::UnexpectedSnafu {
252                    err_msg: "mock err",
253                }
254                .fail()
255            }))
256            .build()
257            .unwrap();
258        let mut env = TestingEnv::new();
259        env.table_metadata_manager = Arc::new(TableMetadataManager::new(Arc::new(kv)));
260        let persistent_context = new_persistent_context(1024, vec![], vec![]);
261        let ctx = env.create_context(persistent_context);
262        let err = ctx.get_table_route_value().await.unwrap_err();
263        assert!(err.is_retryable());
264    }
265
266    #[tokio::test]
267    async fn test_get_datanode_table_value_retry_error() {
268        let kv = MockKvBackendBuilder::default()
269            .range_fn(Arc::new(|_| {
270                common_meta::error::UnexpectedSnafu {
271                    err_msg: "mock err",
272                }
273                .fail()
274            }))
275            .build()
276            .unwrap();
277        let mut env = TestingEnv::new();
278        env.table_metadata_manager = Arc::new(TableMetadataManager::new(Arc::new(kv)));
279        let persistent_context = new_persistent_context(1024, vec![], vec![]);
280        let ctx = env.create_context(persistent_context);
281        let err = ctx.get_datanode_table_value(1024, 1).await.unwrap_err();
282        assert!(err.is_retryable());
283    }
284}