Skip to main content

meta_srv/procedure/repartition/
deallocate_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::{HashMap, HashSet};
17
18use common_meta::ddl::drop_table::executor::DropTableExecutor;
19use common_meta::lock_key::TableLock;
20use common_meta::node_manager::NodeManagerRef;
21use common_meta::region_registry::LeaderRegionRegistryRef;
22use common_meta::rpc::router::RegionRoute;
23use common_procedure::{Context as ProcedureContext, Status};
24use common_telemetry::{info, warn};
25use serde::{Deserialize, Serialize};
26use snafu::ResultExt;
27use store_api::storage::{RegionId, TableId};
28use table::table_name::TableName;
29use table::table_reference::TableReference;
30use tokio::time::Instant;
31
32use crate::error::{self, Result};
33use crate::procedure::repartition::group::region_routes;
34use crate::procedure::repartition::repartition_end::RepartitionEnd;
35use crate::procedure::repartition::{Context, State};
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct DeallocateRegion;
39
40#[async_trait::async_trait]
41#[typetag::serde]
42impl State for DeallocateRegion {
43    async fn next(
44        &mut self,
45        ctx: &mut Context,
46        procedure_ctx: &ProcedureContext,
47    ) -> Result<(Box<dyn State>, Status)> {
48        let timer = Instant::now();
49        let region_to_deallocate = ctx
50            .persistent_ctx
51            .plans
52            .iter()
53            .map(|p| p.pending_deallocate_region_ids.len())
54            .sum::<usize>();
55        if region_to_deallocate == 0 {
56            ctx.update_deallocate_region_elapsed(timer.elapsed());
57            return Ok((Box::new(RepartitionEnd), Status::executing(false)));
58        }
59
60        let table_id = ctx.persistent_ctx.table_id;
61        let pending_deallocate_region_ids = ctx
62            .persistent_ctx
63            .plans
64            .iter()
65            .flat_map(|p| p.pending_deallocate_region_ids.iter())
66            .cloned()
67            .collect::<HashSet<_>>();
68        let dealloc_count = pending_deallocate_region_ids.len();
69        info!(
70            "Deallocating regions for repartition, table_id: {}, count: {}, regions: {:?}",
71            table_id, dealloc_count, pending_deallocate_region_ids
72        );
73
74        let table_lock = TableLock::Write(table_id).into();
75        let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
76        let table_route_value = ctx.get_table_route_value().await?;
77        let deallocating_regions = {
78            let region_routes = region_routes(table_id, &table_route_value)?;
79            Self::filter_deallocatable_region_routes(
80                table_id,
81                region_routes,
82                &pending_deallocate_region_ids,
83            )
84        };
85
86        let table_ref = TableReference::full(
87            &ctx.persistent_ctx.catalog_name,
88            &ctx.persistent_ctx.schema_name,
89            &ctx.persistent_ctx.table_name,
90        );
91        // Memory guards are not required here,
92        // because the table metadata still contains routes for the deallocating regions.
93        Self::deallocate_regions(
94            &ctx.node_manager,
95            &ctx.leader_region_registry,
96            table_ref.into(),
97            table_id,
98            &deallocating_regions,
99        )
100        .await?;
101
102        // Safety: the table route must be physical, so we can safely unwrap the region routes.
103        let region_routes = table_route_value.region_routes().unwrap();
104        let new_region_routes =
105            Self::generate_region_routes(region_routes, &pending_deallocate_region_ids);
106        ctx.update_table_route(&table_route_value, new_region_routes, HashMap::new())
107            .await?;
108        ctx.invalidate_table_cache().await?;
109
110        ctx.update_deallocate_region_elapsed(timer.elapsed());
111        Ok((Box::new(RepartitionEnd), Status::executing(true)))
112    }
113
114    fn as_any(&self) -> &dyn Any {
115        self
116    }
117}
118
119impl DeallocateRegion {
120    pub(crate) async fn deallocate_regions(
121        node_manager: &NodeManagerRef,
122        leader_region_registry: &LeaderRegionRegistryRef,
123        table: TableName,
124        table_id: TableId,
125        region_routes: &[RegionRoute],
126    ) -> Result<()> {
127        let executor = DropTableExecutor::new(table, table_id, false);
128        // Note: Consider adding an option to forcefully drop the physical region,
129        // which would involve dropping all logical regions associated with that physical region.
130        executor
131            .on_drop_regions(
132                node_manager,
133                leader_region_registry,
134                region_routes,
135                false,
136                true,
137                true,
138            )
139            .await
140            .context(error::DeallocateRegionsSnafu { table_id })?;
141
142        Ok(())
143    }
144
145    pub(crate) fn filter_deallocatable_region_routes(
146        table_id: TableId,
147        region_routes: &[RegionRoute],
148        pending_deallocate_region_ids: &HashSet<RegionId>,
149    ) -> Vec<RegionRoute> {
150        let region_routes_map = region_routes
151            .iter()
152            .map(|r| (r.region.id, r.clone()))
153            .collect::<HashMap<_, _>>();
154        pending_deallocate_region_ids
155            .iter()
156            .filter_map(|region_id| match region_routes_map.get(region_id) {
157                Some(region_route) => Some(region_route.clone()),
158                None => {
159                    warn!(
160                        "Region {} not found during deallocate regions for table {:?}",
161                        region_id, table_id
162                    );
163                    None
164                }
165            })
166            .collect::<Vec<_>>()
167    }
168
169    pub(crate) fn generate_region_routes(
170        region_routes: &[RegionRoute],
171        pending_deallocate_region_ids: &HashSet<RegionId>,
172    ) -> Vec<RegionRoute> {
173        // Safety: the table route must be physical, so we can safely unwrap the region routes.
174        region_routes
175            .iter()
176            .filter(|r| !pending_deallocate_region_ids.contains(&r.region.id))
177            .cloned()
178            .collect()
179    }
180}
181
182#[cfg(test)]
183mod tests {
184    use std::collections::HashSet;
185    use std::sync::Arc;
186
187    use common_meta::ddl::test_util::datanode_handler::RetryErrorDatanodeHandler;
188    use common_meta::peer::Peer;
189    use common_meta::rpc::router::{Region, RegionRoute};
190    use common_meta::test_util::MockDatanodeManager;
191    use store_api::storage::{RegionId, TableId};
192
193    use crate::error::Error;
194    use crate::procedure::repartition::State;
195    use crate::procedure::repartition::deallocate_region::DeallocateRegion;
196    use crate::procedure::repartition::plan::RepartitionPlanEntry;
197    use crate::procedure::repartition::test_util::{
198        TestingEnv, current_parent_region_routes, new_parent_context,
199    };
200
201    fn test_region_routes(table_id: TableId) -> Vec<RegionRoute> {
202        vec![
203            RegionRoute {
204                region: Region {
205                    id: RegionId::new(table_id, 1),
206                    ..Default::default()
207                },
208                leader_peer: Some(Peer::empty(1)),
209                ..Default::default()
210            },
211            RegionRoute {
212                region: Region {
213                    id: RegionId::new(table_id, 2),
214                    ..Default::default()
215                },
216                leader_peer: Some(Peer::empty(2)),
217                ..Default::default()
218            },
219        ]
220    }
221
222    #[test]
223    fn test_filter_deallocatable_region_routes() {
224        let table_id = 1024;
225        let region_routes = test_region_routes(table_id);
226        let pending_deallocate_region_ids = HashSet::from([RegionId::new(table_id, 1)]);
227        let deallocatable_region_routes = DeallocateRegion::filter_deallocatable_region_routes(
228            table_id,
229            &region_routes,
230            &pending_deallocate_region_ids,
231        );
232        assert_eq!(deallocatable_region_routes.len(), 1);
233        assert_eq!(
234            deallocatable_region_routes[0].region.id,
235            RegionId::new(table_id, 1)
236        );
237    }
238
239    #[test]
240    fn test_generate_region_routes() {
241        let table_id = 1024;
242        let region_routes = test_region_routes(table_id);
243        let pending_deallocate_region_ids = HashSet::from([RegionId::new(table_id, 1)]);
244        let new_region_routes = DeallocateRegion::generate_region_routes(
245            &region_routes,
246            &pending_deallocate_region_ids,
247        );
248        assert_eq!(new_region_routes.len(), 1);
249        assert_eq!(new_region_routes[0].region.id, RegionId::new(table_id, 2));
250    }
251
252    #[tokio::test]
253    async fn test_next_retryable_when_deallocate_regions_retry_later() {
254        let env = TestingEnv::new();
255        let table_id = 1024;
256        let original_routes = test_region_routes(table_id);
257
258        env.create_physical_table_metadata(table_id, original_routes.clone())
259            .await;
260
261        let node_manager = Arc::new(MockDatanodeManager::new(RetryErrorDatanodeHandler));
262        let mut ctx = new_parent_context(&env, node_manager, table_id);
263        ctx.persistent_ctx.plans = vec![RepartitionPlanEntry {
264            group_id: uuid::Uuid::new_v4(),
265            source_regions: vec![],
266            target_regions: vec![],
267            allocated_region_ids: vec![],
268            pending_deallocate_region_ids: vec![RegionId::new(table_id, 1)],
269            transition_map: vec![],
270        }];
271
272        let mut state = DeallocateRegion;
273
274        let err = state
275            .next(&mut ctx, &TestingEnv::procedure_context())
276            .await
277            .unwrap_err();
278
279        assert!(matches!(err, Error::DeallocateRegions { .. }));
280        assert!(err.is_retryable());
281        assert_eq!(current_parent_region_routes(&ctx).await, original_routes);
282    }
283}