1use std::any::Any;
16use std::collections::{HashMap, HashSet};
17
18use common_meta::ddl::create_table::executor::CreateTableExecutor;
19use common_meta::ddl::create_table::template::{
20 CreateRequestBuilder, build_template_from_raw_table_info_for_physical_table,
21};
22use common_meta::lock_key::TableLock;
23use common_meta::node_manager::NodeManagerRef;
24use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard};
25use common_meta::rpc::router::{RegionRoute, operating_leader_regions};
26use common_procedure::{Context as ProcedureContext, Status};
27use common_telemetry::info;
28use serde::{Deserialize, Serialize};
29use snafu::{OptionExt, ResultExt};
30use store_api::storage::{RegionNumber, TableId};
31use table::metadata::RawTableInfo;
32use table::table_reference::TableReference;
33use tokio::time::Instant;
34
35use crate::error::{self, Result};
36use crate::procedure::repartition::dispatch::Dispatch;
37use crate::procedure::repartition::plan::{
38 AllocationPlanEntry, RegionDescriptor, RepartitionPlanEntry,
39 convert_allocation_plan_to_repartition_plan,
40};
41use crate::procedure::repartition::{Context, State};
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct AllocateRegion {
45 plan_entries: Vec<AllocationPlanEntry>,
46}
47
48#[async_trait::async_trait]
49#[typetag::serde]
50impl State for AllocateRegion {
51 async fn next(
52 &mut self,
53 ctx: &mut Context,
54 procedure_ctx: &ProcedureContext,
55 ) -> Result<(Box<dyn State>, Status)> {
56 let timer = Instant::now();
57 let table_id = ctx.persistent_ctx.table_id;
58 let table_route_value = ctx.get_table_route_value().await?;
59 let region_routes = table_route_value.region_routes().unwrap();
61 let mut next_region_number =
62 Self::get_next_region_number(table_route_value.max_region_number().unwrap());
63
64 let repartition_plan_entries = Self::convert_to_repartition_plans(
66 table_id,
67 &mut next_region_number,
68 &self.plan_entries,
69 );
70 let plan_count = repartition_plan_entries.len();
71 let to_allocate = Self::count_regions_to_allocate(&repartition_plan_entries);
72 info!(
73 "Repartition allocate regions start, table_id: {}, groups: {}, regions_to_allocate: {}",
74 table_id, plan_count, to_allocate
75 );
76
77 if Self::count_regions_to_allocate(&repartition_plan_entries) == 0 {
79 ctx.persistent_ctx.plans = repartition_plan_entries;
80 ctx.update_allocate_region_elapsed(timer.elapsed());
81 return Ok((Box::new(Dispatch), Status::executing(true)));
82 }
83
84 let allocate_regions = Self::collect_allocate_regions(&repartition_plan_entries);
85 let region_number_and_partition_exprs =
86 Self::prepare_region_allocation_data(&allocate_regions)?;
87 let table_info_value = ctx.get_table_info_value().await?;
88 let new_allocated_region_routes = ctx
89 .region_routes_allocator
90 .allocate(
91 table_id,
92 ®ion_number_and_partition_exprs
93 .iter()
94 .map(|(n, p)| (*n, p.as_str()))
95 .collect::<Vec<_>>(),
96 )
97 .await
98 .context(error::AllocateRegionRoutesSnafu { table_id })?;
99 let wal_options = ctx
100 .wal_options_allocator
101 .allocate(
102 &allocate_regions
103 .iter()
104 .map(|r| r.region_id.region_number())
105 .collect::<Vec<_>>(),
106 table_info_value.table_info.meta.options.skip_wal,
107 )
108 .await
109 .context(error::AllocateWalOptionsSnafu { table_id })?;
110
111 let new_region_count = new_allocated_region_routes.len();
112 let new_regions_brief: Vec<_> = new_allocated_region_routes
113 .iter()
114 .map(|route| {
115 let region_id = route.region.id;
116 let peer = route.leader_peer.as_ref().map(|p| p.id).unwrap_or_default();
117 format!("region_id: {}, peer: {}", region_id, peer)
118 })
119 .collect();
120 info!(
121 "Allocated regions for repartition, table_id: {}, new_region_count: {}, new_regions: {:?}",
122 table_id, new_region_count, new_regions_brief
123 );
124
125 let _operating_guards = Self::register_operating_regions(
126 &ctx.memory_region_keeper,
127 &new_allocated_region_routes,
128 )?;
129 Self::allocate_regions(
131 &ctx.node_manager,
132 &table_info_value.table_info,
133 &new_allocated_region_routes,
134 &wal_options,
135 )
136 .await?;
137
138 let table_lock = TableLock::Write(table_id).into();
142 let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
143 let new_region_routes =
144 Self::generate_region_routes(region_routes, &new_allocated_region_routes);
145 ctx.update_table_route(&table_route_value, new_region_routes, wal_options)
146 .await?;
147 ctx.invalidate_table_cache().await?;
148
149 ctx.persistent_ctx.plans = repartition_plan_entries;
150 ctx.update_allocate_region_elapsed(timer.elapsed());
151 Ok((Box::new(Dispatch), Status::executing(true)))
152 }
153
154 fn as_any(&self) -> &dyn Any {
155 self
156 }
157}
158
159impl AllocateRegion {
160 pub fn new(plan_entries: Vec<AllocationPlanEntry>) -> Self {
161 Self { plan_entries }
162 }
163
164 fn register_operating_regions(
165 memory_region_keeper: &MemoryRegionKeeperRef,
166 region_routes: &[RegionRoute],
167 ) -> Result<Vec<OperatingRegionGuard>> {
168 let mut operating_guards = Vec::with_capacity(region_routes.len());
169 for (region_id, datanode_id) in operating_leader_regions(region_routes) {
170 let guard = memory_region_keeper
171 .register(datanode_id, region_id)
172 .context(error::RegionOperatingRaceSnafu {
173 peer_id: datanode_id,
174 region_id,
175 })?;
176 operating_guards.push(guard);
177 }
178 Ok(operating_guards)
179 }
180
181 fn generate_region_routes(
182 region_routes: &[RegionRoute],
183 new_allocated_region_ids: &[RegionRoute],
184 ) -> Vec<RegionRoute> {
185 let region_ids = region_routes
186 .iter()
187 .map(|r| r.region.id)
188 .collect::<HashSet<_>>();
189 let mut new_region_routes = region_routes.to_vec();
190 for new_allocated_region_id in new_allocated_region_ids {
191 if !region_ids.contains(&new_allocated_region_id.region.id) {
192 new_region_routes.push(new_allocated_region_id.clone());
193 }
194 }
195 new_region_routes
196 }
197
198 fn convert_to_repartition_plans(
203 table_id: TableId,
204 next_region_number: &mut RegionNumber,
205 plan_entries: &[AllocationPlanEntry],
206 ) -> Vec<RepartitionPlanEntry> {
207 plan_entries
208 .iter()
209 .map(|plan_entry| {
210 convert_allocation_plan_to_repartition_plan(
211 table_id,
212 next_region_number,
213 plan_entry,
214 )
215 })
216 .collect()
217 }
218
219 fn collect_allocate_regions(
221 repartition_plan_entries: &[RepartitionPlanEntry],
222 ) -> Vec<&RegionDescriptor> {
223 repartition_plan_entries
224 .iter()
225 .flat_map(|p| p.allocate_regions())
226 .collect()
227 }
228
229 fn prepare_region_allocation_data(
231 allocate_regions: &[&RegionDescriptor],
232 ) -> Result<Vec<(RegionNumber, String)>> {
233 allocate_regions
234 .iter()
235 .map(|r| {
236 Ok((
237 r.region_id.region_number(),
238 r.partition_expr
239 .as_json_str()
240 .context(error::SerializePartitionExprSnafu)?,
241 ))
242 })
243 .collect()
244 }
245
246 fn count_regions_to_allocate(repartition_plan_entries: &[RepartitionPlanEntry]) -> usize {
248 repartition_plan_entries
249 .iter()
250 .map(|p| p.allocated_region_ids.len())
251 .sum()
252 }
253
254 fn get_next_region_number(max_region_number: RegionNumber) -> RegionNumber {
256 max_region_number + 1
257 }
258
259 async fn allocate_regions(
260 node_manager: &NodeManagerRef,
261 raw_table_info: &RawTableInfo,
262 region_routes: &[RegionRoute],
263 wal_options: &HashMap<RegionNumber, String>,
264 ) -> Result<()> {
265 let table_ref = TableReference::full(
266 &raw_table_info.catalog_name,
267 &raw_table_info.schema_name,
268 &raw_table_info.name,
269 );
270 let table_id = raw_table_info.ident.table_id;
271 let request = build_template_from_raw_table_info_for_physical_table(raw_table_info)
274 .context(error::BuildCreateRequestSnafu { table_id })?;
275 common_telemetry::debug!(
276 "Allocating regions request, table_id: {}, request: {:?}",
277 table_id,
278 request
279 );
280 let builder = CreateRequestBuilder::new(request, None);
281 let region_count = region_routes.len();
282 let wal_region_count = wal_options.len();
283 info!(
284 "Allocating regions on datanodes, table_id: {}, region_count: {}, wal_regions: {}",
285 table_id, region_count, wal_region_count
286 );
287 let executor = CreateTableExecutor::new(table_ref.into(), false, builder);
288 executor
289 .on_create_regions(node_manager, table_id, region_routes, wal_options)
290 .await
291 .context(error::AllocateRegionsSnafu { table_id })?;
292
293 Ok(())
294 }
295}
296
297#[cfg(test)]
298mod tests {
299 use store_api::storage::RegionId;
300 use uuid::Uuid;
301
302 use super::*;
303 use crate::procedure::repartition::test_util::range_expr;
304
305 fn create_region_descriptor(
306 table_id: TableId,
307 region_number: u32,
308 col: &str,
309 start: i64,
310 end: i64,
311 ) -> RegionDescriptor {
312 RegionDescriptor {
313 region_id: RegionId::new(table_id, region_number),
314 partition_expr: range_expr(col, start, end),
315 }
316 }
317
318 fn create_allocation_plan_entry(
319 table_id: TableId,
320 source_region_numbers: &[u32],
321 target_ranges: &[(i64, i64)],
322 ) -> AllocationPlanEntry {
323 let source_regions = source_region_numbers
324 .iter()
325 .enumerate()
326 .map(|(i, &n)| {
327 let start = i as i64 * 100;
328 let end = (i + 1) as i64 * 100;
329 create_region_descriptor(table_id, n, "x", start, end)
330 })
331 .collect();
332
333 let target_partition_exprs = target_ranges
334 .iter()
335 .map(|&(start, end)| range_expr("x", start, end))
336 .collect();
337
338 AllocationPlanEntry {
339 group_id: Uuid::new_v4(),
340 source_regions,
341 target_partition_exprs,
342 transition_map: vec![],
343 }
344 }
345
346 #[test]
347 fn test_convert_to_repartition_plans_no_allocation() {
348 let table_id = 1024;
349 let mut next_region_number = 10;
350
351 let plan_entries = vec![create_allocation_plan_entry(
353 table_id,
354 &[1, 2],
355 &[(0, 50), (50, 200)],
356 )];
357
358 let result = AllocateRegion::convert_to_repartition_plans(
359 table_id,
360 &mut next_region_number,
361 &plan_entries,
362 );
363
364 assert_eq!(result.len(), 1);
365 assert_eq!(result[0].target_regions.len(), 2);
366 assert!(result[0].allocated_region_ids.is_empty());
367 assert_eq!(next_region_number, 10);
369 }
370
371 #[test]
372 fn test_convert_to_repartition_plans_with_allocation() {
373 let table_id = 1024;
374 let mut next_region_number = 10;
375
376 let plan_entries = vec![create_allocation_plan_entry(
378 table_id,
379 &[1, 2],
380 &[(0, 50), (50, 100), (100, 150), (150, 200)],
381 )];
382
383 let result = AllocateRegion::convert_to_repartition_plans(
384 table_id,
385 &mut next_region_number,
386 &plan_entries,
387 );
388
389 assert_eq!(result.len(), 1);
390 assert_eq!(result[0].target_regions.len(), 4);
391 assert_eq!(result[0].allocated_region_ids.len(), 2);
392 assert_eq!(
393 result[0].allocated_region_ids[0],
394 RegionId::new(table_id, 10)
395 );
396 assert_eq!(
397 result[0].allocated_region_ids[1],
398 RegionId::new(table_id, 11)
399 );
400 assert_eq!(next_region_number, 12);
402 }
403
404 #[test]
405 fn test_convert_to_repartition_plans_multiple_entries() {
406 let table_id = 1024;
407 let mut next_region_number = 10;
408
409 let plan_entries = vec![
411 create_allocation_plan_entry(table_id, &[1], &[(0, 50), (50, 100)]), create_allocation_plan_entry(table_id, &[2, 3], &[(100, 150), (150, 200)]), create_allocation_plan_entry(table_id, &[4], &[(200, 250), (250, 300), (300, 400)]), ];
415
416 let result = AllocateRegion::convert_to_repartition_plans(
417 table_id,
418 &mut next_region_number,
419 &plan_entries,
420 );
421
422 assert_eq!(result.len(), 3);
423 assert_eq!(result[0].allocated_region_ids.len(), 1);
424 assert_eq!(result[1].allocated_region_ids.len(), 0);
425 assert_eq!(result[2].allocated_region_ids.len(), 2);
426 assert_eq!(next_region_number, 13);
428 }
429
430 #[test]
431 fn test_count_regions_to_allocate() {
432 let table_id = 1024;
433 let mut next_region_number = 10;
434
435 let plan_entries = vec![
436 create_allocation_plan_entry(table_id, &[1], &[(0, 50), (50, 100)]), create_allocation_plan_entry(table_id, &[2, 3], &[(100, 200)]), create_allocation_plan_entry(table_id, &[4], &[(200, 250), (250, 300)]), ];
440
441 let repartition_plans = AllocateRegion::convert_to_repartition_plans(
442 table_id,
443 &mut next_region_number,
444 &plan_entries,
445 );
446
447 let count = AllocateRegion::count_regions_to_allocate(&repartition_plans);
448 assert_eq!(count, 2);
449 }
450
451 #[test]
452 fn test_collect_allocate_regions() {
453 let table_id = 1024;
454 let mut next_region_number = 10;
455
456 let plan_entries = vec![
457 create_allocation_plan_entry(table_id, &[1], &[(0, 50), (50, 100)]), create_allocation_plan_entry(table_id, &[2], &[(100, 150), (150, 200)]), ];
460
461 let repartition_plans = AllocateRegion::convert_to_repartition_plans(
462 table_id,
463 &mut next_region_number,
464 &plan_entries,
465 );
466
467 let allocate_regions = AllocateRegion::collect_allocate_regions(&repartition_plans);
468 assert_eq!(allocate_regions.len(), 2);
469 assert_eq!(allocate_regions[0].region_id, RegionId::new(table_id, 10));
470 assert_eq!(allocate_regions[1].region_id, RegionId::new(table_id, 11));
471 }
472
473 #[test]
474 fn test_prepare_region_allocation_data() {
475 let table_id = 1024;
476 let regions = [
477 create_region_descriptor(table_id, 10, "x", 0, 50),
478 create_region_descriptor(table_id, 11, "x", 50, 100),
479 ];
480 let region_refs: Vec<&RegionDescriptor> = regions.iter().collect();
481
482 let result = AllocateRegion::prepare_region_allocation_data(®ion_refs).unwrap();
483
484 assert_eq!(result.len(), 2);
485 assert_eq!(result[0].0, 10);
486 assert_eq!(result[1].0, 11);
487 assert!(!result[0].1.is_empty());
489 assert!(!result[1].1.is_empty());
490 }
491}