meta_srv/procedure/repartition/
deallocate_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::{HashMap, HashSet};
17
18use common_meta::ddl::drop_table::executor::DropTableExecutor;
19use common_meta::lock_key::TableLock;
20use common_meta::node_manager::NodeManagerRef;
21use common_meta::region_registry::LeaderRegionRegistryRef;
22use common_meta::rpc::router::RegionRoute;
23use common_procedure::{Context as ProcedureContext, Status};
24use common_telemetry::{info, warn};
25use serde::{Deserialize, Serialize};
26use snafu::ResultExt;
27use store_api::storage::{RegionId, TableId};
28use table::table_name::TableName;
29use table::table_reference::TableReference;
30use tokio::time::Instant;
31
32use crate::error::{self, Result};
33use crate::procedure::repartition::group::region_routes;
34use crate::procedure::repartition::repartition_end::RepartitionEnd;
35use crate::procedure::repartition::{Context, State};
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct DeallocateRegion;
39
40#[async_trait::async_trait]
41#[typetag::serde]
42impl State for DeallocateRegion {
43    async fn next(
44        &mut self,
45        ctx: &mut Context,
46        procedure_ctx: &ProcedureContext,
47    ) -> Result<(Box<dyn State>, Status)> {
48        let timer = Instant::now();
49        let region_to_deallocate = ctx
50            .persistent_ctx
51            .plans
52            .iter()
53            .map(|p| p.pending_deallocate_region_ids.len())
54            .sum::<usize>();
55        if region_to_deallocate == 0 {
56            ctx.update_deallocate_region_elapsed(timer.elapsed());
57            return Ok((Box::new(RepartitionEnd), Status::executing(false)));
58        }
59
60        let table_id = ctx.persistent_ctx.table_id;
61        let pending_deallocate_region_ids = ctx
62            .persistent_ctx
63            .plans
64            .iter()
65            .flat_map(|p| p.pending_deallocate_region_ids.iter())
66            .cloned()
67            .collect::<HashSet<_>>();
68        let dealloc_count = pending_deallocate_region_ids.len();
69        info!(
70            "Deallocating regions for repartition, table_id: {}, count: {}, regions: {:?}",
71            table_id, dealloc_count, pending_deallocate_region_ids
72        );
73
74        let table_lock = TableLock::Write(table_id).into();
75        let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
76        let table_route_value = ctx.get_table_route_value().await?;
77        let deallocating_regions = {
78            let region_routes = region_routes(table_id, &table_route_value)?;
79            Self::filter_deallocatable_region_routes(
80                table_id,
81                region_routes,
82                &pending_deallocate_region_ids,
83            )
84        };
85
86        let table_ref = TableReference::full(
87            &ctx.persistent_ctx.catalog_name,
88            &ctx.persistent_ctx.schema_name,
89            &ctx.persistent_ctx.table_name,
90        );
91        // Deallocates the regions on datanodes.
92        Self::deallocate_regions(
93            &ctx.node_manager,
94            &ctx.leader_region_registry,
95            table_ref.into(),
96            table_id,
97            &deallocating_regions,
98        )
99        .await?;
100
101        // Safety: the table route must be physical, so we can safely unwrap the region routes.
102        let region_routes = table_route_value.region_routes().unwrap();
103        let new_region_routes =
104            Self::generate_region_routes(region_routes, &pending_deallocate_region_ids);
105        ctx.update_table_route(&table_route_value, new_region_routes, HashMap::new())
106            .await?;
107        ctx.invalidate_table_cache().await?;
108
109        ctx.update_deallocate_region_elapsed(timer.elapsed());
110        Ok((Box::new(RepartitionEnd), Status::executing(true)))
111    }
112
113    fn as_any(&self) -> &dyn Any {
114        self
115    }
116}
117
118impl DeallocateRegion {
119    async fn deallocate_regions(
120        node_manager: &NodeManagerRef,
121        leader_region_registry: &LeaderRegionRegistryRef,
122        table: TableName,
123        table_id: TableId,
124        region_routes: &[RegionRoute],
125    ) -> Result<()> {
126        let executor = DropTableExecutor::new(table, table_id, false);
127        // Note: Consider adding an option to forcefully drop the physical region,
128        // which would involve dropping all logical regions associated with that physical region.
129        executor
130            .on_drop_regions(
131                node_manager,
132                leader_region_registry,
133                region_routes,
134                false,
135                true,
136                true,
137            )
138            .await
139            .context(error::DeallocateRegionsSnafu { table_id })?;
140
141        Ok(())
142    }
143
144    fn filter_deallocatable_region_routes(
145        table_id: TableId,
146        region_routes: &[RegionRoute],
147        pending_deallocate_region_ids: &HashSet<RegionId>,
148    ) -> Vec<RegionRoute> {
149        let region_routes_map = region_routes
150            .iter()
151            .map(|r| (r.region.id, r.clone()))
152            .collect::<HashMap<_, _>>();
153        pending_deallocate_region_ids
154            .iter()
155            .filter_map(|region_id| match region_routes_map.get(region_id) {
156                Some(region_route) => Some(region_route.clone()),
157                None => {
158                    warn!(
159                        "Region {} not found during deallocate regions for table {:?}",
160                        region_id, table_id
161                    );
162                    None
163                }
164            })
165            .collect::<Vec<_>>()
166    }
167
168    fn generate_region_routes(
169        region_routes: &[RegionRoute],
170        pending_deallocate_region_ids: &HashSet<RegionId>,
171    ) -> Vec<RegionRoute> {
172        // Safety: the table route must be physical, so we can safely unwrap the region routes.
173        region_routes
174            .iter()
175            .filter(|r| !pending_deallocate_region_ids.contains(&r.region.id))
176            .cloned()
177            .collect()
178    }
179}
180
181#[cfg(test)]
182mod tests {
183    use std::collections::HashSet;
184
185    use common_meta::peer::Peer;
186    use common_meta::rpc::router::{Region, RegionRoute};
187    use store_api::storage::{RegionId, TableId};
188
189    use crate::procedure::repartition::deallocate_region::DeallocateRegion;
190
191    fn test_region_routes(table_id: TableId) -> Vec<RegionRoute> {
192        vec![
193            RegionRoute {
194                region: Region {
195                    id: RegionId::new(table_id, 1),
196                    ..Default::default()
197                },
198                leader_peer: Some(Peer::empty(1)),
199                ..Default::default()
200            },
201            RegionRoute {
202                region: Region {
203                    id: RegionId::new(table_id, 2),
204                    ..Default::default()
205                },
206                leader_peer: Some(Peer::empty(2)),
207                ..Default::default()
208            },
209        ]
210    }
211
212    #[test]
213    fn test_filter_deallocatable_region_routes() {
214        let table_id = 1024;
215        let region_routes = test_region_routes(table_id);
216        let pending_deallocate_region_ids = HashSet::from([RegionId::new(table_id, 1)]);
217        let deallocatable_region_routes = DeallocateRegion::filter_deallocatable_region_routes(
218            table_id,
219            &region_routes,
220            &pending_deallocate_region_ids,
221        );
222        assert_eq!(deallocatable_region_routes.len(), 1);
223        assert_eq!(
224            deallocatable_region_routes[0].region.id,
225            RegionId::new(table_id, 1)
226        );
227    }
228
229    #[test]
230    fn test_generate_region_routes() {
231        let table_id = 1024;
232        let region_routes = test_region_routes(table_id);
233        let pending_deallocate_region_ids = HashSet::from([RegionId::new(table_id, 1)]);
234        let new_region_routes = DeallocateRegion::generate_region_routes(
235            &region_routes,
236            &pending_deallocate_region_ids,
237        );
238        assert_eq!(new_region_routes.len(), 1);
239        assert_eq!(new_region_routes[0].region.id, RegionId::new(table_id, 2));
240    }
241}