meta_srv/procedure/repartition/
deallocate_region.rs1use std::any::Any;
16use std::collections::{HashMap, HashSet};
17
18use common_meta::ddl::drop_table::executor::DropTableExecutor;
19use common_meta::lock_key::TableLock;
20use common_meta::node_manager::NodeManagerRef;
21use common_meta::region_registry::LeaderRegionRegistryRef;
22use common_meta::rpc::router::RegionRoute;
23use common_procedure::{Context as ProcedureContext, Status};
24use common_telemetry::{info, warn};
25use serde::{Deserialize, Serialize};
26use snafu::ResultExt;
27use store_api::storage::{RegionId, TableId};
28use table::table_name::TableName;
29use table::table_reference::TableReference;
30use tokio::time::Instant;
31
32use crate::error::{self, Result};
33use crate::procedure::repartition::group::region_routes;
34use crate::procedure::repartition::repartition_end::RepartitionEnd;
35use crate::procedure::repartition::{Context, State};
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct DeallocateRegion;
39
40#[async_trait::async_trait]
41#[typetag::serde]
42impl State for DeallocateRegion {
43 async fn next(
44 &mut self,
45 ctx: &mut Context,
46 procedure_ctx: &ProcedureContext,
47 ) -> Result<(Box<dyn State>, Status)> {
48 let timer = Instant::now();
49 let region_to_deallocate = ctx
50 .persistent_ctx
51 .plans
52 .iter()
53 .map(|p| p.pending_deallocate_region_ids.len())
54 .sum::<usize>();
55 if region_to_deallocate == 0 {
56 ctx.update_deallocate_region_elapsed(timer.elapsed());
57 return Ok((Box::new(RepartitionEnd), Status::executing(false)));
58 }
59
60 let table_id = ctx.persistent_ctx.table_id;
61 let pending_deallocate_region_ids = ctx
62 .persistent_ctx
63 .plans
64 .iter()
65 .flat_map(|p| p.pending_deallocate_region_ids.iter())
66 .cloned()
67 .collect::<HashSet<_>>();
68 let dealloc_count = pending_deallocate_region_ids.len();
69 info!(
70 "Deallocating regions for repartition, table_id: {}, count: {}, regions: {:?}",
71 table_id, dealloc_count, pending_deallocate_region_ids
72 );
73
74 let table_lock = TableLock::Write(table_id).into();
75 let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
76 let table_route_value = ctx.get_table_route_value().await?;
77 let deallocating_regions = {
78 let region_routes = region_routes(table_id, &table_route_value)?;
79 Self::filter_deallocatable_region_routes(
80 table_id,
81 region_routes,
82 &pending_deallocate_region_ids,
83 )
84 };
85
86 let table_ref = TableReference::full(
87 &ctx.persistent_ctx.catalog_name,
88 &ctx.persistent_ctx.schema_name,
89 &ctx.persistent_ctx.table_name,
90 );
91 Self::deallocate_regions(
93 &ctx.node_manager,
94 &ctx.leader_region_registry,
95 table_ref.into(),
96 table_id,
97 &deallocating_regions,
98 )
99 .await?;
100
101 let region_routes = table_route_value.region_routes().unwrap();
103 let new_region_routes =
104 Self::generate_region_routes(region_routes, &pending_deallocate_region_ids);
105 ctx.update_table_route(&table_route_value, new_region_routes, HashMap::new())
106 .await?;
107 ctx.invalidate_table_cache().await?;
108
109 ctx.update_deallocate_region_elapsed(timer.elapsed());
110 Ok((Box::new(RepartitionEnd), Status::executing(true)))
111 }
112
113 fn as_any(&self) -> &dyn Any {
114 self
115 }
116}
117
118impl DeallocateRegion {
119 async fn deallocate_regions(
120 node_manager: &NodeManagerRef,
121 leader_region_registry: &LeaderRegionRegistryRef,
122 table: TableName,
123 table_id: TableId,
124 region_routes: &[RegionRoute],
125 ) -> Result<()> {
126 let executor = DropTableExecutor::new(table, table_id, false);
127 executor
130 .on_drop_regions(
131 node_manager,
132 leader_region_registry,
133 region_routes,
134 false,
135 true,
136 true,
137 )
138 .await
139 .context(error::DeallocateRegionsSnafu { table_id })?;
140
141 Ok(())
142 }
143
144 fn filter_deallocatable_region_routes(
145 table_id: TableId,
146 region_routes: &[RegionRoute],
147 pending_deallocate_region_ids: &HashSet<RegionId>,
148 ) -> Vec<RegionRoute> {
149 let region_routes_map = region_routes
150 .iter()
151 .map(|r| (r.region.id, r.clone()))
152 .collect::<HashMap<_, _>>();
153 pending_deallocate_region_ids
154 .iter()
155 .filter_map(|region_id| match region_routes_map.get(region_id) {
156 Some(region_route) => Some(region_route.clone()),
157 None => {
158 warn!(
159 "Region {} not found during deallocate regions for table {:?}",
160 region_id, table_id
161 );
162 None
163 }
164 })
165 .collect::<Vec<_>>()
166 }
167
168 fn generate_region_routes(
169 region_routes: &[RegionRoute],
170 pending_deallocate_region_ids: &HashSet<RegionId>,
171 ) -> Vec<RegionRoute> {
172 region_routes
174 .iter()
175 .filter(|r| !pending_deallocate_region_ids.contains(&r.region.id))
176 .cloned()
177 .collect()
178 }
179}
180
181#[cfg(test)]
182mod tests {
183 use std::collections::HashSet;
184
185 use common_meta::peer::Peer;
186 use common_meta::rpc::router::{Region, RegionRoute};
187 use store_api::storage::{RegionId, TableId};
188
189 use crate::procedure::repartition::deallocate_region::DeallocateRegion;
190
191 fn test_region_routes(table_id: TableId) -> Vec<RegionRoute> {
192 vec![
193 RegionRoute {
194 region: Region {
195 id: RegionId::new(table_id, 1),
196 ..Default::default()
197 },
198 leader_peer: Some(Peer::empty(1)),
199 ..Default::default()
200 },
201 RegionRoute {
202 region: Region {
203 id: RegionId::new(table_id, 2),
204 ..Default::default()
205 },
206 leader_peer: Some(Peer::empty(2)),
207 ..Default::default()
208 },
209 ]
210 }
211
212 #[test]
213 fn test_filter_deallocatable_region_routes() {
214 let table_id = 1024;
215 let region_routes = test_region_routes(table_id);
216 let pending_deallocate_region_ids = HashSet::from([RegionId::new(table_id, 1)]);
217 let deallocatable_region_routes = DeallocateRegion::filter_deallocatable_region_routes(
218 table_id,
219 ®ion_routes,
220 &pending_deallocate_region_ids,
221 );
222 assert_eq!(deallocatable_region_routes.len(), 1);
223 assert_eq!(
224 deallocatable_region_routes[0].region.id,
225 RegionId::new(table_id, 1)
226 );
227 }
228
229 #[test]
230 fn test_generate_region_routes() {
231 let table_id = 1024;
232 let region_routes = test_region_routes(table_id);
233 let pending_deallocate_region_ids = HashSet::from([RegionId::new(table_id, 1)]);
234 let new_region_routes = DeallocateRegion::generate_region_routes(
235 ®ion_routes,
236 &pending_deallocate_region_ids,
237 );
238 assert_eq!(new_region_routes.len(), 1);
239 assert_eq!(new_region_routes[0].region.id, RegionId::new(table_id, 2));
240 }
241}