meta_srv/procedure/repartition/
deallocate_region.rs1use std::any::Any;
16use std::collections::{HashMap, HashSet};
17
18use common_meta::ddl::drop_table::executor::DropTableExecutor;
19use common_meta::lock_key::TableLock;
20use common_meta::node_manager::NodeManagerRef;
21use common_meta::region_registry::LeaderRegionRegistryRef;
22use common_meta::rpc::router::RegionRoute;
23use common_procedure::{Context as ProcedureContext, Status};
24use common_telemetry::{info, warn};
25use serde::{Deserialize, Serialize};
26use snafu::ResultExt;
27use store_api::storage::{RegionId, TableId};
28use table::table_name::TableName;
29use table::table_reference::TableReference;
30use tokio::time::Instant;
31
32use crate::error::{self, Result};
33use crate::procedure::repartition::group::region_routes;
34use crate::procedure::repartition::repartition_end::RepartitionEnd;
35use crate::procedure::repartition::{Context, State};
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct DeallocateRegion;
39
40#[async_trait::async_trait]
41#[typetag::serde]
42impl State for DeallocateRegion {
43 async fn next(
44 &mut self,
45 ctx: &mut Context,
46 procedure_ctx: &ProcedureContext,
47 ) -> Result<(Box<dyn State>, Status)> {
48 let timer = Instant::now();
49 let region_to_deallocate = ctx
50 .persistent_ctx
51 .plans
52 .iter()
53 .map(|p| p.pending_deallocate_region_ids.len())
54 .sum::<usize>();
55 if region_to_deallocate == 0 {
56 ctx.update_deallocate_region_elapsed(timer.elapsed());
57 return Ok((Box::new(RepartitionEnd), Status::executing(false)));
58 }
59
60 let table_id = ctx.persistent_ctx.table_id;
61 let pending_deallocate_region_ids = ctx
62 .persistent_ctx
63 .plans
64 .iter()
65 .flat_map(|p| p.pending_deallocate_region_ids.iter())
66 .cloned()
67 .collect::<HashSet<_>>();
68 let dealloc_count = pending_deallocate_region_ids.len();
69 info!(
70 "Deallocating regions for repartition, table_id: {}, count: {}, regions: {:?}",
71 table_id, dealloc_count, pending_deallocate_region_ids
72 );
73
74 let table_lock = TableLock::Write(table_id).into();
75 let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
76 let table_route_value = ctx.get_table_route_value().await?;
77 let deallocating_regions = {
78 let region_routes = region_routes(table_id, &table_route_value)?;
79 Self::filter_deallocatable_region_routes(
80 table_id,
81 region_routes,
82 &pending_deallocate_region_ids,
83 )
84 };
85
86 let table_ref = TableReference::full(
87 &ctx.persistent_ctx.catalog_name,
88 &ctx.persistent_ctx.schema_name,
89 &ctx.persistent_ctx.table_name,
90 );
91 Self::deallocate_regions(
94 &ctx.node_manager,
95 &ctx.leader_region_registry,
96 table_ref.into(),
97 table_id,
98 &deallocating_regions,
99 )
100 .await?;
101
102 let region_routes = table_route_value.region_routes().unwrap();
104 let new_region_routes =
105 Self::generate_region_routes(region_routes, &pending_deallocate_region_ids);
106 ctx.update_table_route(&table_route_value, new_region_routes, HashMap::new())
107 .await?;
108 ctx.invalidate_table_cache().await?;
109
110 ctx.update_deallocate_region_elapsed(timer.elapsed());
111 Ok((Box::new(RepartitionEnd), Status::executing(true)))
112 }
113
114 fn as_any(&self) -> &dyn Any {
115 self
116 }
117}
118
119impl DeallocateRegion {
120 pub(crate) async fn deallocate_regions(
121 node_manager: &NodeManagerRef,
122 leader_region_registry: &LeaderRegionRegistryRef,
123 table: TableName,
124 table_id: TableId,
125 region_routes: &[RegionRoute],
126 ) -> Result<()> {
127 let executor = DropTableExecutor::new(table, table_id, false);
128 executor
131 .on_drop_regions(
132 node_manager,
133 leader_region_registry,
134 region_routes,
135 false,
136 true,
137 true,
138 )
139 .await
140 .context(error::DeallocateRegionsSnafu { table_id })?;
141
142 Ok(())
143 }
144
145 pub(crate) fn filter_deallocatable_region_routes(
146 table_id: TableId,
147 region_routes: &[RegionRoute],
148 pending_deallocate_region_ids: &HashSet<RegionId>,
149 ) -> Vec<RegionRoute> {
150 let region_routes_map = region_routes
151 .iter()
152 .map(|r| (r.region.id, r.clone()))
153 .collect::<HashMap<_, _>>();
154 pending_deallocate_region_ids
155 .iter()
156 .filter_map(|region_id| match region_routes_map.get(region_id) {
157 Some(region_route) => Some(region_route.clone()),
158 None => {
159 warn!(
160 "Region {} not found during deallocate regions for table {:?}",
161 region_id, table_id
162 );
163 None
164 }
165 })
166 .collect::<Vec<_>>()
167 }
168
169 pub(crate) fn generate_region_routes(
170 region_routes: &[RegionRoute],
171 pending_deallocate_region_ids: &HashSet<RegionId>,
172 ) -> Vec<RegionRoute> {
173 region_routes
175 .iter()
176 .filter(|r| !pending_deallocate_region_ids.contains(&r.region.id))
177 .cloned()
178 .collect()
179 }
180}
181
182#[cfg(test)]
183mod tests {
184 use std::collections::HashSet;
185 use std::sync::Arc;
186
187 use common_meta::ddl::test_util::datanode_handler::RetryErrorDatanodeHandler;
188 use common_meta::peer::Peer;
189 use common_meta::rpc::router::{Region, RegionRoute};
190 use common_meta::test_util::MockDatanodeManager;
191 use store_api::storage::{RegionId, TableId};
192
193 use crate::error::Error;
194 use crate::procedure::repartition::State;
195 use crate::procedure::repartition::deallocate_region::DeallocateRegion;
196 use crate::procedure::repartition::plan::RepartitionPlanEntry;
197 use crate::procedure::repartition::test_util::{
198 TestingEnv, current_parent_region_routes, new_parent_context,
199 };
200
201 fn test_region_routes(table_id: TableId) -> Vec<RegionRoute> {
202 vec![
203 RegionRoute {
204 region: Region {
205 id: RegionId::new(table_id, 1),
206 ..Default::default()
207 },
208 leader_peer: Some(Peer::empty(1)),
209 ..Default::default()
210 },
211 RegionRoute {
212 region: Region {
213 id: RegionId::new(table_id, 2),
214 ..Default::default()
215 },
216 leader_peer: Some(Peer::empty(2)),
217 ..Default::default()
218 },
219 ]
220 }
221
222 #[test]
223 fn test_filter_deallocatable_region_routes() {
224 let table_id = 1024;
225 let region_routes = test_region_routes(table_id);
226 let pending_deallocate_region_ids = HashSet::from([RegionId::new(table_id, 1)]);
227 let deallocatable_region_routes = DeallocateRegion::filter_deallocatable_region_routes(
228 table_id,
229 ®ion_routes,
230 &pending_deallocate_region_ids,
231 );
232 assert_eq!(deallocatable_region_routes.len(), 1);
233 assert_eq!(
234 deallocatable_region_routes[0].region.id,
235 RegionId::new(table_id, 1)
236 );
237 }
238
239 #[test]
240 fn test_generate_region_routes() {
241 let table_id = 1024;
242 let region_routes = test_region_routes(table_id);
243 let pending_deallocate_region_ids = HashSet::from([RegionId::new(table_id, 1)]);
244 let new_region_routes = DeallocateRegion::generate_region_routes(
245 ®ion_routes,
246 &pending_deallocate_region_ids,
247 );
248 assert_eq!(new_region_routes.len(), 1);
249 assert_eq!(new_region_routes[0].region.id, RegionId::new(table_id, 2));
250 }
251
252 #[tokio::test]
253 async fn test_next_retryable_when_deallocate_regions_retry_later() {
254 let env = TestingEnv::new();
255 let table_id = 1024;
256 let original_routes = test_region_routes(table_id);
257
258 env.create_physical_table_metadata(table_id, original_routes.clone())
259 .await;
260
261 let node_manager = Arc::new(MockDatanodeManager::new(RetryErrorDatanodeHandler));
262 let mut ctx = new_parent_context(&env, node_manager, table_id);
263 ctx.persistent_ctx.plans = vec![RepartitionPlanEntry {
264 group_id: uuid::Uuid::new_v4(),
265 source_regions: vec![],
266 target_regions: vec![],
267 allocated_region_ids: vec![],
268 pending_deallocate_region_ids: vec![RegionId::new(table_id, 1)],
269 transition_map: vec![],
270 }];
271
272 let mut state = DeallocateRegion;
273
274 let err = state
275 .next(&mut ctx, &TestingEnv::procedure_context())
276 .await
277 .unwrap_err();
278
279 assert!(matches!(err, Error::DeallocateRegions { .. }));
280 assert!(err.is_retryable());
281 assert_eq!(current_parent_region_routes(&ctx).await, original_routes);
282 }
283}