meta_srv/procedure/repartition/
group.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) mod enter_staging_region;
16pub(crate) mod repartition_start;
17pub(crate) mod update_metadata;
18pub(crate) mod utils;
19
20use std::any::Any;
21use std::fmt::Debug;
22use std::time::Duration;
23
24use common_error::ext::BoxedError;
25use common_meta::DatanodeId;
26use common_meta::cache_invalidator::CacheInvalidatorRef;
27use common_meta::instruction::CacheIdent;
28use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue, RegionInfo};
29use common_meta::key::table_route::TableRouteValue;
30use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
31use common_meta::rpc::router::RegionRoute;
32use common_procedure::{
33    Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
34    UserMetadata,
35};
36use serde::{Deserialize, Serialize};
37use snafu::{OptionExt, ResultExt};
38use store_api::storage::{RegionId, TableId};
39use uuid::Uuid;
40
41use crate::error::{self, Result};
42use crate::procedure::repartition::group::repartition_start::RepartitionStart;
43use crate::procedure::repartition::plan::RegionDescriptor;
44use crate::procedure::repartition::{self};
45use crate::service::mailbox::MailboxRef;
46
47pub type GroupId = Uuid;
48
49#[allow(dead_code)]
50pub struct RepartitionGroupProcedure {
51    state: Box<dyn State>,
52    context: Context,
53}
54
55impl RepartitionGroupProcedure {
56    const TYPE_NAME: &'static str = "metasrv-procedure::RepartitionGroup";
57
58    pub fn new(persistent_context: PersistentContext, context: &repartition::Context) -> Self {
59        let state = Box::new(RepartitionStart);
60
61        Self {
62            state,
63            context: Context {
64                persistent_ctx: persistent_context,
65                cache_invalidator: context.cache_invalidator.clone(),
66                table_metadata_manager: context.table_metadata_manager.clone(),
67                mailbox: context.mailbox.clone(),
68                server_addr: context.server_addr.clone(),
69            },
70        }
71    }
72}
73
74#[async_trait::async_trait]
75impl Procedure for RepartitionGroupProcedure {
76    fn type_name(&self) -> &str {
77        Self::TYPE_NAME
78    }
79
80    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
81        todo!()
82    }
83
84    async fn rollback(&mut self, _: &ProcedureContext) -> ProcedureResult<()> {
85        todo!()
86    }
87
88    fn rollback_supported(&self) -> bool {
89        true
90    }
91
92    fn dump(&self) -> ProcedureResult<String> {
93        todo!()
94    }
95
96    fn lock_key(&self) -> LockKey {
97        todo!()
98    }
99
100    fn user_metadata(&self) -> Option<UserMetadata> {
101        todo!()
102    }
103}
104
105pub struct Context {
106    pub persistent_ctx: PersistentContext,
107
108    pub cache_invalidator: CacheInvalidatorRef,
109
110    pub table_metadata_manager: TableMetadataManagerRef,
111
112    pub mailbox: MailboxRef,
113
114    pub server_addr: String,
115}
116
117/// The result of the group preparation phase, containing validated region routes.
118#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
119pub struct GroupPrepareResult {
120    /// The validated source region routes.
121    pub source_routes: Vec<RegionRoute>,
122    /// The validated target region routes.
123    pub target_routes: Vec<RegionRoute>,
124    /// The primary source region id (first source region), used for retrieving region options.
125    pub central_region: RegionId,
126    /// The datanode id where the primary source region is located.
127    pub central_region_datanode_id: DatanodeId,
128}
129
130#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
131pub struct PersistentContext {
132    pub group_id: GroupId,
133    /// The table id of the repartition group.
134    pub table_id: TableId,
135    /// The source regions of the repartition group.
136    pub sources: Vec<RegionDescriptor>,
137    /// The target regions of the repartition group.
138    pub targets: Vec<RegionDescriptor>,
139    /// The result of group prepare.
140    /// The value will be set in [RepartitionStart](crate::procedure::repartition::group::repartition_start::RepartitionStart) state.
141    pub group_prepare_result: Option<GroupPrepareResult>,
142}
143
144impl PersistentContext {
145    pub fn new(
146        group_id: GroupId,
147        table_id: TableId,
148        sources: Vec<RegionDescriptor>,
149        targets: Vec<RegionDescriptor>,
150    ) -> Self {
151        Self {
152            group_id,
153            table_id,
154            sources,
155            targets,
156            group_prepare_result: None,
157        }
158    }
159}
160
161impl Context {
162    /// Retrieves the table route value for the given table id.
163    ///
164    /// Retry:
165    /// - Failed to retrieve the metadata of table.
166    ///
167    /// Abort:
168    /// - Table route not found.
169    pub async fn get_table_route_value(
170        &self,
171    ) -> Result<DeserializedValueWithBytes<TableRouteValue>> {
172        let table_id = self.persistent_ctx.table_id;
173        let group_id = self.persistent_ctx.group_id;
174        let table_route_value = self
175            .table_metadata_manager
176            .table_route_manager()
177            .table_route_storage()
178            .get_with_raw_bytes(table_id)
179            .await
180            .map_err(BoxedError::new)
181            .with_context(|_| error::RetryLaterWithSourceSnafu {
182                reason: format!(
183                    "Failed to get table route for table: {}, repartition group: {}",
184                    table_id, group_id
185                ),
186            })?
187            .context(error::TableRouteNotFoundSnafu { table_id })?;
188
189        Ok(table_route_value)
190    }
191
192    /// Returns the `datanode_table_value`
193    ///
194    /// Retry:
195    /// - Failed to retrieve the metadata of datanode table.
196    pub async fn get_datanode_table_value(
197        &self,
198        table_id: TableId,
199        datanode_id: u64,
200    ) -> Result<DatanodeTableValue> {
201        let datanode_table_value = self
202            .table_metadata_manager
203            .datanode_table_manager()
204            .get(&DatanodeTableKey {
205                datanode_id,
206                table_id,
207            })
208            .await
209            .context(error::TableMetadataManagerSnafu)
210            .map_err(BoxedError::new)
211            .with_context(|_| error::RetryLaterWithSourceSnafu {
212                reason: format!("Failed to get DatanodeTable: {table_id}"),
213            })?
214            .context(error::DatanodeTableNotFoundSnafu {
215                table_id,
216                datanode_id,
217            })?;
218        Ok(datanode_table_value)
219    }
220
221    /// Broadcasts the invalidate table cache message.
222    pub async fn invalidate_table_cache(&self) -> Result<()> {
223        let table_id = self.persistent_ctx.table_id;
224        let group_id = self.persistent_ctx.group_id;
225        let subject = format!(
226            "Invalidate table cache for repartition table, group: {}, table: {}",
227            group_id, table_id,
228        );
229        let ctx = common_meta::cache_invalidator::Context {
230            subject: Some(subject),
231        };
232        let _ = self
233            .cache_invalidator
234            .invalidate(&ctx, &[CacheIdent::TableId(table_id)])
235            .await;
236        Ok(())
237    }
238
239    /// Updates the table route.
240    ///
241    /// Retry:
242    /// - Failed to retrieve the metadata of datanode table.
243    ///
244    /// Abort:
245    /// - Table route not found.
246    /// - Failed to update the table route.
247    pub async fn update_table_route(
248        &self,
249        current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
250        new_region_routes: Vec<RegionRoute>,
251    ) -> Result<()> {
252        let table_id = self.persistent_ctx.table_id;
253        // Safety: prepare result is set in [RepartitionStart] state.
254        let prepare_result = self.persistent_ctx.group_prepare_result.as_ref().unwrap();
255        let central_region_datanode_table_value = self
256            .get_datanode_table_value(table_id, prepare_result.central_region_datanode_id)
257            .await?;
258        let RegionInfo {
259            region_options,
260            region_wal_options,
261            ..
262        } = &central_region_datanode_table_value.region_info;
263
264        self.table_metadata_manager
265            .update_table_route(
266                table_id,
267                central_region_datanode_table_value.region_info.clone(),
268                current_table_route_value,
269                new_region_routes,
270                region_options,
271                region_wal_options,
272            )
273            .await
274            .context(error::TableMetadataManagerSnafu)
275    }
276
277    /// Returns the next operation timeout.
278    ///
279    /// If the next operation timeout is not set, it will return `None`.
280    pub fn next_operation_timeout(&self) -> Option<Duration> {
281        Some(Duration::from_secs(10))
282    }
283}
284
285/// Returns the region routes of the given table route value.
286///
287/// Abort:
288/// - Table route value is not physical.
289pub fn region_routes(
290    table_id: TableId,
291    table_route_value: &TableRouteValue,
292) -> Result<&Vec<RegionRoute>> {
293    table_route_value
294        .region_routes()
295        .with_context(|_| error::UnexpectedLogicalRouteTableSnafu {
296            err_msg: format!(
297                "TableRoute({:?}) is a non-physical TableRouteValue.",
298                table_id
299            ),
300        })
301}
302
303#[async_trait::async_trait]
304#[typetag::serde(tag = "repartition_group_state")]
305pub(crate) trait State: Sync + Send + Debug {
306    fn name(&self) -> &'static str {
307        let type_name = std::any::type_name::<Self>();
308        // short name
309        type_name.split("::").last().unwrap_or(type_name)
310    }
311
312    /// Yields the next [State] and [Status].
313    async fn next(
314        &mut self,
315        ctx: &mut Context,
316        procedure_ctx: &ProcedureContext,
317    ) -> Result<(Box<dyn State>, Status)>;
318
319    fn as_any(&self) -> &dyn Any;
320}
321
322#[cfg(test)]
323mod tests {
324    use std::assert_matches::assert_matches;
325    use std::sync::Arc;
326
327    use common_meta::key::TableMetadataManager;
328    use common_meta::kv_backend::test_util::MockKvBackendBuilder;
329
330    use crate::error::Error;
331    use crate::procedure::repartition::test_util::{TestingEnv, new_persistent_context};
332
333    #[tokio::test]
334    async fn test_get_table_route_value_not_found_error() {
335        let env = TestingEnv::new();
336        let persistent_context = new_persistent_context(1024, vec![], vec![]);
337        let ctx = env.create_context(persistent_context);
338        let err = ctx.get_table_route_value().await.unwrap_err();
339        assert_matches!(err, Error::TableRouteNotFound { .. });
340        assert!(!err.is_retryable());
341    }
342
343    #[tokio::test]
344    async fn test_get_table_route_value_retry_error() {
345        let kv = MockKvBackendBuilder::default()
346            .range_fn(Arc::new(|_| {
347                common_meta::error::UnexpectedSnafu {
348                    err_msg: "mock err",
349                }
350                .fail()
351            }))
352            .build()
353            .unwrap();
354        let mut env = TestingEnv::new();
355        env.table_metadata_manager = Arc::new(TableMetadataManager::new(Arc::new(kv)));
356        let persistent_context = new_persistent_context(1024, vec![], vec![]);
357        let ctx = env.create_context(persistent_context);
358        let err = ctx.get_table_route_value().await.unwrap_err();
359        assert!(err.is_retryable());
360    }
361
362    #[tokio::test]
363    async fn test_get_datanode_table_value_retry_error() {
364        let kv = MockKvBackendBuilder::default()
365            .range_fn(Arc::new(|_| {
366                common_meta::error::UnexpectedSnafu {
367                    err_msg: "mock err",
368                }
369                .fail()
370            }))
371            .build()
372            .unwrap();
373        let mut env = TestingEnv::new();
374        env.table_metadata_manager = Arc::new(TableMetadataManager::new(Arc::new(kv)));
375        let persistent_context = new_persistent_context(1024, vec![], vec![]);
376        let ctx = env.create_context(persistent_context);
377        let err = ctx.get_datanode_table_value(1024, 1).await.unwrap_err();
378        assert!(err.is_retryable());
379    }
380}