Skip to main content

meta_srv/procedure/repartition/
allocate_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::{HashMap, HashSet};
17
18use common_meta::ddl::create_table::executor::CreateTableExecutor;
19use common_meta::ddl::create_table::template::{
20    CreateRequestBuilder, build_template_from_raw_table_info_for_physical_table,
21};
22use common_meta::lock_key::TableLock;
23use common_meta::node_manager::NodeManagerRef;
24use common_meta::peer::PeerAllocContext;
25use common_meta::rpc::router::RegionRoute;
26use common_meta::wal_provider::{
27    RegionWalOptions, acquire_remote_wal_read_locks, refresh_initial_pruned_entry_ids,
28};
29use common_procedure::{Context as ProcedureContext, Status};
30use common_telemetry::{debug, info};
31use serde::{Deserialize, Deserializer, Serialize};
32use snafu::{OptionExt, ResultExt};
33use store_api::region_request::RegionRequirements;
34use store_api::storage::{RegionId, RegionNumber, TableId};
35use table::metadata::TableInfo;
36use table::table_reference::TableReference;
37use tokio::time::Instant;
38
39use crate::error::{self, Result};
40use crate::procedure::repartition::dispatch::Dispatch;
41use crate::procedure::repartition::plan::{
42    AllocationPlanEntry, RepartitionPlanEntry, TargetRegionDescriptor,
43    convert_allocation_plan_to_repartition_plan,
44};
45use crate::procedure::repartition::{Context, State};
46
47#[derive(Debug, Clone, Serialize)]
48pub enum AllocateRegion {
49    Build(BuildPlan),
50    Execute(ExecutePlan),
51}
52
53impl<'de> Deserialize<'de> for AllocateRegion {
54    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
55    where
56        D: Deserializer<'de>,
57    {
58        #[derive(Deserialize)]
59        enum CurrentAllocateRegion {
60            Build(BuildPlan),
61            Execute(ExecutePlan),
62        }
63
64        #[derive(Deserialize)]
65        struct LegacyAllocateRegion {
66            plan_entries: Vec<AllocationPlanEntry>,
67        }
68
69        #[derive(Deserialize)]
70        #[serde(untagged)]
71        enum AllocateRegionRepr {
72            Current(CurrentAllocateRegion),
73            Legacy(LegacyAllocateRegion),
74        }
75
76        match AllocateRegionRepr::deserialize(deserializer)? {
77            AllocateRegionRepr::Current(CurrentAllocateRegion::Build(build_plan)) => {
78                Ok(Self::Build(build_plan))
79            }
80            AllocateRegionRepr::Current(CurrentAllocateRegion::Execute(execute_plan)) => {
81                Ok(Self::Execute(execute_plan))
82            }
83            AllocateRegionRepr::Legacy(legacy) => Ok(Self::Build(BuildPlan {
84                plan_entries: legacy.plan_entries,
85            })),
86        }
87    }
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct BuildPlan {
92    plan_entries: Vec<AllocationPlanEntry>,
93}
94
95impl BuildPlan {
96    async fn next(
97        &mut self,
98        ctx: &mut Context,
99        _procedure_ctx: &ProcedureContext,
100    ) -> Result<(Box<dyn State>, Status)> {
101        let timer = Instant::now();
102        let table_id = ctx.persistent_ctx.table_id;
103        let table_route_value = ctx.get_table_route_value().await?;
104        let mut next_region_number =
105            AllocateRegion::get_next_region_number(table_route_value.max_region_number().unwrap());
106
107        // Converts allocation plan to repartition plan.
108        let repartition_plan_entries = AllocateRegion::convert_to_repartition_plans(
109            table_id,
110            &mut next_region_number,
111            &self.plan_entries,
112            table_route_value.region_routes().unwrap(),
113        )?;
114        let plan_count = repartition_plan_entries.len();
115        let to_allocate = AllocateRegion::count_regions_to_allocate(&repartition_plan_entries);
116        info!(
117            "Repartition allocate regions start, table_id: {}, groups: {}, regions_to_allocate: {}",
118            table_id, plan_count, to_allocate
119        );
120
121        // If no region to allocate, directly dispatch the plan.
122        if AllocateRegion::count_regions_to_allocate(&repartition_plan_entries) == 0 {
123            ctx.persistent_ctx.plans = repartition_plan_entries;
124            ctx.update_allocate_region_elapsed(timer.elapsed());
125            return Ok((Box::new(Dispatch), Status::executing(true)));
126        }
127
128        ctx.persistent_ctx.plans = repartition_plan_entries;
129        debug!(
130            "Repartition allocate regions build plan completed, table_id: {}, elapsed: {:?}",
131            table_id,
132            timer.elapsed()
133        );
134        Ok((
135            Box::new(AllocateRegion::Execute(ExecutePlan)),
136            Status::executing(true),
137        ))
138    }
139}
140
141#[derive(Debug, Clone, Serialize, Deserialize)]
142pub struct ExecutePlan;
143
144impl ExecutePlan {
145    async fn next(
146        &mut self,
147        ctx: &mut Context,
148        procedure_ctx: &ProcedureContext,
149    ) -> Result<(Box<dyn State>, Status)> {
150        let timer = Instant::now();
151        let table_id = ctx.persistent_ctx.table_id;
152        let allocate_regions = AllocateRegion::collect_allocate_regions(&ctx.persistent_ctx.plans);
153        let region_number_and_partition_exprs =
154            AllocateRegion::prepare_region_allocation_data(&allocate_regions)?;
155        let table_info_value = ctx.get_table_info_value().await?;
156        let new_allocated_region_routes = ctx
157            .region_routes_allocator
158            .allocate(
159                table_id,
160                &region_number_and_partition_exprs
161                    .iter()
162                    .map(|(n, p)| (*n, p.as_str()))
163                    .collect::<Vec<_>>(),
164                &PeerAllocContext::default(),
165            )
166            .await
167            .context(error::AllocateRegionRoutesSnafu { table_id })?;
168        let mut wal_options = ctx
169            .wal_options_allocator
170            .allocate(
171                &allocate_regions
172                    .iter()
173                    .map(|r| r.region_id.region_number())
174                    .collect::<Vec<_>>(),
175                table_info_value.table_info.meta.options.skip_wal,
176            )
177            .await
178            .context(error::AllocateWalOptionsSnafu { table_id })?;
179        let _remote_wal_lock_guards =
180            acquire_remote_wal_read_locks(procedure_ctx, &wal_options).await;
181        refresh_initial_pruned_entry_ids(&ctx.table_metadata_manager, &mut wal_options)
182            .await
183            .context(error::AllocateWalOptionsSnafu { table_id })?;
184
185        let new_region_count = new_allocated_region_routes.len();
186        let new_regions_brief: Vec<_> = new_allocated_region_routes
187            .iter()
188            .map(|route| {
189                let region_id = route.region.id;
190                let peer = route.leader_peer.as_ref().map(|p| p.id).unwrap_or_default();
191                format!("region_id: {}, peer: {}", region_id, peer)
192            })
193            .collect();
194        info!(
195            "Allocated regions for repartition, table_id: {}, new_region_count: {}, new_regions: {:?}",
196            table_id, new_region_count, new_regions_brief
197        );
198
199        // The table route metadata is not updated yet; register it in memory for region lease renewal.
200        let _operating_guards = Context::register_operating_regions(
201            &ctx.memory_region_keeper,
202            &new_allocated_region_routes,
203        )?;
204        // Allocates the regions on datanodes.
205        AllocateRegion::allocate_regions(
206            &ctx.node_manager,
207            &table_info_value.table_info,
208            &new_allocated_region_routes,
209            &wal_options,
210        )
211        .await?;
212
213        // Updates the table routes.
214        let table_lock = TableLock::Write(table_id).into();
215        let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
216        // MUST refresh the table route value after acquiring the lock to avoid lost update.
217        // Otherwise, the new allocated regions might be overridden by concurrent repartition procedures.
218        let table_route_value = ctx.get_table_route_value().await?;
219        // Safety: it is physical table route value.
220        let region_routes = table_route_value.region_routes().unwrap();
221        let new_region_routes =
222            AllocateRegion::generate_region_routes(region_routes, &new_allocated_region_routes);
223        ctx.update_table_route(&table_route_value, new_region_routes, wal_options)
224            .await?;
225        ctx.invalidate_table_cache().await?;
226
227        ctx.update_allocate_region_elapsed(timer.elapsed());
228        Ok((Box::new(Dispatch), Status::executing(true)))
229    }
230}
231
232#[async_trait::async_trait]
233#[typetag::serde]
234impl State for AllocateRegion {
235    async fn next(
236        &mut self,
237        ctx: &mut Context,
238        procedure_ctx: &ProcedureContext,
239    ) -> Result<(Box<dyn State>, Status)> {
240        match self {
241            AllocateRegion::Build(build_plan) => build_plan.next(ctx, procedure_ctx).await,
242            AllocateRegion::Execute(execute_plan) => execute_plan.next(ctx, procedure_ctx).await,
243        }
244    }
245
246    fn as_any(&self) -> &dyn Any {
247        self
248    }
249}
250
251impl AllocateRegion {
252    pub fn new(plan_entries: Vec<AllocationPlanEntry>) -> Self {
253        AllocateRegion::Build(BuildPlan { plan_entries })
254    }
255
256    fn generate_region_routes(
257        region_routes: &[RegionRoute],
258        new_allocated_region_ids: &[RegionRoute],
259    ) -> Vec<RegionRoute> {
260        let region_ids = region_routes
261            .iter()
262            .map(|r| r.region.id)
263            .collect::<HashSet<_>>();
264        let mut new_region_routes = region_routes.to_vec();
265        for new_allocated_region_id in new_allocated_region_ids {
266            if !region_ids.contains(&new_allocated_region_id.region.id) {
267                new_region_routes.push(new_allocated_region_id.clone());
268            }
269        }
270        new_region_routes
271    }
272
273    /// Converts allocation plan entries to repartition plan entries.
274    ///
275    /// This method converts allocation intents into concrete repartition plans,
276    /// updates `next_region_number` for newly allocated regions, and captures
277    /// each plan's `original_target_routes` from the current table-route view.
278    ///
279    /// This also persists each plan's pre-staging target routes for rollback.
280    fn convert_to_repartition_plans(
281        table_id: TableId,
282        next_region_number: &mut RegionNumber,
283        plan_entries: &[AllocationPlanEntry],
284        current_region_routes: &[RegionRoute],
285    ) -> Result<Vec<RepartitionPlanEntry>> {
286        let region_routes_map = current_region_routes
287            .iter()
288            .map(|route| (route.region.id, route))
289            .collect::<HashMap<_, _>>();
290
291        plan_entries
292            .iter()
293            .map(|plan_entry| {
294                let mut plan = convert_allocation_plan_to_repartition_plan(
295                    table_id,
296                    next_region_number,
297                    plan_entry,
298                );
299                Self::capture_plan_original_target_routes(&mut plan, &region_routes_map)?;
300                Ok(plan)
301            })
302            .collect()
303    }
304
305    fn capture_plan_original_target_routes(
306        plan: &mut RepartitionPlanEntry,
307        region_routes_map: &HashMap<RegionId, &RegionRoute>,
308    ) -> Result<()> {
309        // Persist the pre-staging target-route view on the parent plan.
310        // Newly allocated targets are skipped because rollback deletes their
311        // route metadata rather than restoring an original target route.
312        let mut original_target_routes = Vec::with_capacity(plan.target_regions.len());
313        for target in &plan.target_regions {
314            if plan.allocated_region_ids.contains(&target.region_id) {
315                // This target region is to be allocated, so it doesn't exist in current routes.
316                continue;
317            }
318            let route = region_routes_map.get(&target.region_id).context(
319                error::RepartitionTargetRegionMissingSnafu {
320                    group_id: plan.group_id,
321                    region_id: target.region_id,
322                },
323            )?;
324            {
325                original_target_routes.push((*route).clone());
326            }
327        }
328
329        plan.original_target_routes = original_target_routes;
330        Ok(())
331    }
332
333    /// Collects all regions that need to be allocated from the repartition plan entries.
334    fn collect_allocate_regions(
335        repartition_plan_entries: &[RepartitionPlanEntry],
336    ) -> Vec<&TargetRegionDescriptor> {
337        repartition_plan_entries
338            .iter()
339            .flat_map(|p| p.allocate_regions())
340            .collect()
341    }
342
343    /// Prepares region allocation data: region numbers and their partition expressions.
344    fn prepare_region_allocation_data(
345        allocate_regions: &[&TargetRegionDescriptor],
346    ) -> Result<Vec<(RegionNumber, String)>> {
347        allocate_regions
348            .iter()
349            .map(|r| {
350                Ok((
351                    r.region_id.region_number(),
352                    r.partition_expr
353                        .as_json_str()
354                        .context(error::SerializePartitionExprSnafu)?,
355                ))
356            })
357            .collect()
358    }
359
360    /// Calculates the total number of regions that need to be allocated.
361    fn count_regions_to_allocate(repartition_plan_entries: &[RepartitionPlanEntry]) -> usize {
362        repartition_plan_entries
363            .iter()
364            .map(|p| p.allocated_region_ids.len())
365            .sum()
366    }
367
368    /// Gets the next region number from the physical table route.
369    fn get_next_region_number(max_region_number: RegionNumber) -> RegionNumber {
370        max_region_number + 1
371    }
372
373    async fn allocate_regions(
374        node_manager: &NodeManagerRef,
375        raw_table_info: &TableInfo,
376        region_routes: &[RegionRoute],
377        wal_options: &RegionWalOptions,
378    ) -> Result<()> {
379        let table_ref = TableReference::full(
380            &raw_table_info.catalog_name,
381            &raw_table_info.schema_name,
382            &raw_table_info.name,
383        );
384        let table_id = raw_table_info.ident.table_id;
385        // Repartition allocation targets physical regions, so exclude metric internal columns
386        // and derive primary keys from tag semantics.
387        let request = build_template_from_raw_table_info_for_physical_table(raw_table_info)
388            .context(error::BuildCreateRequestSnafu { table_id })?;
389        common_telemetry::debug!(
390            "Allocating regions request, table_id: {}, request: {:?}",
391            table_id,
392            request
393        );
394        let builder = CreateRequestBuilder::new(request, None)
395            .with_requirements(RegionRequirements::object_storage());
396        let region_count = region_routes.len();
397        let wal_region_count = wal_options.len();
398        info!(
399            "Allocating regions on datanodes, table_id: {}, region_count: {}, wal_regions: {}",
400            table_id, region_count, wal_region_count
401        );
402        let executor = CreateTableExecutor::new(table_ref.into(), false, builder);
403        executor
404            .on_create_regions(node_manager, table_id, region_routes, wal_options)
405            .await
406            .context(error::AllocateRegionsSnafu { table_id })?;
407
408        Ok(())
409    }
410}
411
412#[cfg(test)]
413mod tests {
414    use std::sync::Arc;
415
416    use api::v1::region::region_request::Body;
417    use common_meta::ddl::test_util::datanode_handler::DatanodeWatcher;
418    use common_meta::key::TableMetadataManagerRef;
419    use common_meta::key::datanode_table::DatanodeTableKey;
420    use common_meta::peer::Peer;
421    use common_meta::rpc::router::{Region, RegionRoute};
422    use common_meta::test_util::MockDatanodeManager;
423    use common_procedure::{ContextProvider, ProcedureId, ProcedureState};
424    use common_procedure_test::MockContextProvider;
425    use store_api::storage::RegionId;
426    use tokio::sync::{mpsc, watch};
427    use uuid::Uuid;
428
429    use super::*;
430    use crate::procedure::repartition::State;
431    use crate::procedure::repartition::plan::SourceRegionDescriptor;
432    use crate::procedure::repartition::test_util::{
433        TestingEnv, current_parent_region_routes, new_parent_context, range_expr,
434        test_region_wal_options,
435    };
436
437    fn create_region_descriptor(
438        table_id: TableId,
439        region_number: u32,
440        col: &str,
441        start: i64,
442        end: i64,
443    ) -> SourceRegionDescriptor {
444        SourceRegionDescriptor::partitioned(
445            RegionId::new(table_id, region_number),
446            range_expr(col, start, end),
447        )
448    }
449
450    fn create_target_region_descriptor(
451        table_id: TableId,
452        region_number: u32,
453        col: &str,
454        start: i64,
455        end: i64,
456    ) -> TargetRegionDescriptor {
457        TargetRegionDescriptor {
458            region_id: RegionId::new(table_id, region_number),
459            partition_expr: range_expr(col, start, end),
460        }
461    }
462
463    fn create_allocation_plan_entry(
464        table_id: TableId,
465        source_region_numbers: &[u32],
466        target_ranges: &[(i64, i64)],
467    ) -> AllocationPlanEntry {
468        let source_regions = source_region_numbers
469            .iter()
470            .enumerate()
471            .map(|(i, &n)| {
472                let start = i as i64 * 100;
473                let end = (i + 1) as i64 * 100;
474                create_region_descriptor(table_id, n, "x", start, end)
475            })
476            .collect();
477
478        let target_partition_exprs = target_ranges
479            .iter()
480            .map(|&(start, end)| range_expr("x", start, end))
481            .collect();
482
483        AllocationPlanEntry {
484            group_id: Uuid::new_v4(),
485            source_regions,
486            target_partition_exprs,
487            transition_map: vec![],
488        }
489    }
490
491    fn create_current_region_routes(table_id: TableId, region_numbers: &[u32]) -> Vec<RegionRoute> {
492        region_numbers
493            .iter()
494            .map(|region_number| RegionRoute {
495                region: Region {
496                    id: RegionId::new(table_id, *region_number),
497                    ..Default::default()
498                },
499                leader_peer: Some(Peer::empty(1)),
500                ..Default::default()
501            })
502            .collect()
503    }
504
505    struct ConcurrentTableRouteUpdateProvider {
506        inner: MockContextProvider,
507        table_metadata_manager: TableMetadataManagerRef,
508        table_id: TableId,
509        concurrent_region_route: RegionRoute,
510        region_wal_options: RegionWalOptions,
511    }
512
513    #[async_trait::async_trait]
514    impl ContextProvider for ConcurrentTableRouteUpdateProvider {
515        async fn procedure_state(
516            &self,
517            procedure_id: ProcedureId,
518        ) -> common_procedure::Result<Option<ProcedureState>> {
519            self.inner.procedure_state(procedure_id).await
520        }
521
522        async fn procedure_state_receiver(
523            &self,
524            procedure_id: ProcedureId,
525        ) -> common_procedure::Result<Option<watch::Receiver<ProcedureState>>> {
526            self.inner.procedure_state_receiver(procedure_id).await
527        }
528
529        async fn try_put_poison(
530            &self,
531            key: &common_procedure::PoisonKey,
532            procedure_id: ProcedureId,
533        ) -> common_procedure::Result<()> {
534            self.inner.try_put_poison(key, procedure_id).await
535        }
536
537        async fn acquire_lock(
538            &self,
539            key: &common_procedure::StringKey,
540        ) -> common_procedure::local::DynamicKeyLockGuard {
541            let current_table_route_value = self
542                .table_metadata_manager
543                .table_route_manager()
544                .table_route_storage()
545                .get_with_raw_bytes(self.table_id)
546                .await
547                .unwrap()
548                .unwrap();
549            let mut region_routes = current_table_route_value.region_routes().unwrap().clone();
550
551            if !region_routes
552                .iter()
553                .any(|route| route.region.id == self.concurrent_region_route.region.id)
554            {
555                region_routes.push(self.concurrent_region_route.clone());
556                let datanode_id = current_table_route_value.region_routes().unwrap()[0]
557                    .leader_peer
558                    .as_ref()
559                    .unwrap()
560                    .id;
561                let datanode_table_value = self
562                    .table_metadata_manager
563                    .datanode_table_manager()
564                    .get(&DatanodeTableKey::new(datanode_id, self.table_id))
565                    .await
566                    .unwrap()
567                    .unwrap();
568                let region_options = &datanode_table_value.region_info.region_options;
569
570                self.table_metadata_manager
571                    .update_table_route(
572                        self.table_id,
573                        datanode_table_value.region_info.clone(),
574                        &current_table_route_value,
575                        region_routes,
576                        region_options,
577                        &self.region_wal_options,
578                    )
579                    .await
580                    .unwrap();
581            }
582
583            self.inner.acquire_lock(key).await
584        }
585    }
586
587    #[test]
588    fn test_convert_to_repartition_plans_no_allocation() {
589        let table_id = 1024;
590        let mut next_region_number = 10;
591
592        // 2 source -> 2 target (no allocation needed)
593        let plan_entries = vec![create_allocation_plan_entry(
594            table_id,
595            &[1, 2],
596            &[(0, 50), (50, 200)],
597        )];
598
599        let result = AllocateRegion::convert_to_repartition_plans(
600            table_id,
601            &mut next_region_number,
602            &plan_entries,
603            &create_current_region_routes(table_id, &[1, 2]),
604        )
605        .unwrap();
606
607        assert_eq!(result.len(), 1);
608        assert_eq!(result[0].target_regions.len(), 2);
609        assert!(result[0].allocated_region_ids.is_empty());
610        // next_region_number should not change
611        assert_eq!(next_region_number, 10);
612    }
613
614    #[test]
615    fn test_convert_to_repartition_plans_with_allocation() {
616        let table_id = 1024;
617        let mut next_region_number = 10;
618
619        // 2 source -> 4 target (need to allocate 2 regions)
620        let plan_entries = vec![create_allocation_plan_entry(
621            table_id,
622            &[1, 2],
623            &[(0, 50), (50, 100), (100, 150), (150, 200)],
624        )];
625
626        let result = AllocateRegion::convert_to_repartition_plans(
627            table_id,
628            &mut next_region_number,
629            &plan_entries,
630            &create_current_region_routes(table_id, &[1, 2]),
631        )
632        .unwrap();
633
634        assert_eq!(result.len(), 1);
635        assert_eq!(result[0].target_regions.len(), 4);
636        assert_eq!(result[0].allocated_region_ids.len(), 2);
637        assert_eq!(
638            result[0].allocated_region_ids[0],
639            RegionId::new(table_id, 10)
640        );
641        assert_eq!(
642            result[0].allocated_region_ids[1],
643            RegionId::new(table_id, 11)
644        );
645        // next_region_number should be incremented by 2
646        assert_eq!(next_region_number, 12);
647    }
648
649    #[test]
650    fn test_convert_to_repartition_plans_multiple_entries() {
651        let table_id = 1024;
652        let mut next_region_number = 10;
653
654        // Multiple plan entries with different allocation needs
655        let plan_entries = vec![
656            create_allocation_plan_entry(table_id, &[1], &[(0, 50), (50, 100)]), // need 1 allocation
657            create_allocation_plan_entry(table_id, &[2, 3], &[(100, 150), (150, 200)]), // no allocation
658            create_allocation_plan_entry(table_id, &[4], &[(200, 250), (250, 300), (300, 400)]), // need 2 allocations
659        ];
660
661        let result = AllocateRegion::convert_to_repartition_plans(
662            table_id,
663            &mut next_region_number,
664            &plan_entries,
665            &create_current_region_routes(table_id, &[1, 2, 3, 4]),
666        )
667        .unwrap();
668
669        assert_eq!(result.len(), 3);
670        assert_eq!(result[0].allocated_region_ids.len(), 1);
671        assert_eq!(result[1].allocated_region_ids.len(), 0);
672        assert_eq!(result[2].allocated_region_ids.len(), 2);
673        // next_region_number should be incremented by 3 total
674        assert_eq!(next_region_number, 13);
675    }
676
677    #[test]
678    fn test_count_regions_to_allocate() {
679        let table_id = 1024;
680        let mut next_region_number = 10;
681
682        let plan_entries = vec![
683            create_allocation_plan_entry(table_id, &[1], &[(0, 50), (50, 100)]), // 1 allocation
684            create_allocation_plan_entry(table_id, &[2, 3], &[(100, 200)]), // 0 allocation (deallocate)
685            create_allocation_plan_entry(table_id, &[4], &[(200, 250), (250, 300)]), // 1 allocation
686        ];
687
688        let repartition_plans = AllocateRegion::convert_to_repartition_plans(
689            table_id,
690            &mut next_region_number,
691            &plan_entries,
692            &create_current_region_routes(table_id, &[1, 2, 3, 4]),
693        )
694        .unwrap();
695
696        let count = AllocateRegion::count_regions_to_allocate(&repartition_plans);
697        assert_eq!(count, 2);
698    }
699
700    #[test]
701    fn test_collect_allocate_regions() {
702        let table_id = 1024;
703        let mut next_region_number = 10;
704
705        let plan_entries = vec![
706            create_allocation_plan_entry(table_id, &[1], &[(0, 50), (50, 100)]), // 1 allocation
707            create_allocation_plan_entry(table_id, &[2], &[(100, 150), (150, 200)]), // 1 allocation
708        ];
709
710        let repartition_plans = AllocateRegion::convert_to_repartition_plans(
711            table_id,
712            &mut next_region_number,
713            &plan_entries,
714            &create_current_region_routes(table_id, &[1, 2]),
715        )
716        .unwrap();
717
718        let allocate_regions = AllocateRegion::collect_allocate_regions(&repartition_plans);
719        assert_eq!(allocate_regions.len(), 2);
720        assert_eq!(allocate_regions[0].region_id, RegionId::new(table_id, 10));
721        assert_eq!(allocate_regions[1].region_id, RegionId::new(table_id, 11));
722    }
723
724    #[test]
725    fn test_prepare_region_allocation_data() {
726        let table_id = 1024;
727        let regions = [
728            create_target_region_descriptor(table_id, 10, "x", 0, 50),
729            create_target_region_descriptor(table_id, 11, "x", 50, 100),
730        ];
731        let region_refs: Vec<&TargetRegionDescriptor> = regions.iter().collect();
732
733        let result = AllocateRegion::prepare_region_allocation_data(&region_refs).unwrap();
734
735        assert_eq!(result.len(), 2);
736        assert_eq!(result[0].0, 10);
737        assert_eq!(result[1].0, 11);
738        // Verify partition expressions are serialized
739        assert!(!result[0].1.is_empty());
740        assert!(!result[1].1.is_empty());
741    }
742
743    #[tokio::test]
744    async fn test_execute_plan_uses_latest_table_route_after_lock() {
745        let env = TestingEnv::new();
746        let table_id = 1024;
747        let original_region_routes = create_current_region_routes(table_id, &[1]);
748        env.create_physical_table_metadata_for_repartition(
749            table_id,
750            original_region_routes,
751            test_region_wal_options(&[1]),
752        )
753        .await;
754
755        let (sender, mut receiver) = mpsc::channel(1);
756        let node_manager = Arc::new(MockDatanodeManager::new(DatanodeWatcher::new(sender)));
757        let mut ctx = new_parent_context(&env, node_manager, table_id);
758        ctx.persistent_ctx.plans = vec![RepartitionPlanEntry {
759            group_id: Uuid::new_v4(),
760            source_regions: vec![],
761            target_regions: vec![create_target_region_descriptor(table_id, 3, "x", 0, 100)],
762            allocated_region_ids: vec![RegionId::new(table_id, 3)],
763            pending_deallocate_region_ids: vec![],
764            transition_map: vec![],
765            original_target_routes: vec![],
766        }];
767        let concurrent_region_route = create_current_region_routes(table_id, &[2])
768            .into_iter()
769            .next()
770            .unwrap();
771        let procedure_ctx = ProcedureContext {
772            procedure_id: ProcedureId::random(),
773            provider: Arc::new(ConcurrentTableRouteUpdateProvider {
774                inner: MockContextProvider::default(),
775                table_metadata_manager: env.table_metadata_manager.clone(),
776                table_id,
777                concurrent_region_route,
778                region_wal_options: test_region_wal_options(&[1, 2]),
779            }),
780        };
781        let mut state = ExecutePlan;
782
783        state.next(&mut ctx, &procedure_ctx).await.unwrap();
784
785        let (_, request) = receiver.recv().await.unwrap();
786        let Some(Body::Create(create)) = request.body else {
787            unreachable!()
788        };
789        assert!(create.requirements.unwrap().object_storage);
790
791        let region_ids = current_parent_region_routes(&ctx)
792            .await
793            .into_iter()
794            .map(|route| route.region.id)
795            .collect::<Vec<_>>();
796        assert_eq!(
797            region_ids,
798            vec![
799                RegionId::new(table_id, 1),
800                RegionId::new(table_id, 2),
801                RegionId::new(table_id, 3),
802            ]
803        );
804    }
805
806    #[test]
807    fn test_allocate_region_state_backward_compatibility() {
808        // Arrange
809        let serialized = r#"{"repartition_state":"AllocateRegion","plan_entries":[]}"#;
810
811        // Act
812        let state: Box<dyn State> = serde_json::from_str(serialized).unwrap();
813
814        // Assert
815        let allocate_region = state
816            .as_any()
817            .downcast_ref::<AllocateRegion>()
818            .expect("expected AllocateRegion state");
819        match allocate_region {
820            AllocateRegion::Build(build_plan) => assert!(build_plan.plan_entries.is_empty()),
821            AllocateRegion::Execute(_) => panic!("expected build plan"),
822        }
823    }
824
825    #[test]
826    fn test_allocate_region_state_round_trip() {
827        // Arrange
828        let state: Box<dyn State> = Box::new(AllocateRegion::new(vec![]));
829
830        // Act
831        let serialized = serde_json::to_string(&state).unwrap();
832        let deserialized: Box<dyn State> = serde_json::from_str(&serialized).unwrap();
833
834        // Assert
835        assert_eq!(
836            serialized,
837            r#"{"repartition_state":"AllocateRegion","Build":{"plan_entries":[]}}"#
838        );
839        let allocate_region = deserialized
840            .as_any()
841            .downcast_ref::<AllocateRegion>()
842            .expect("expected AllocateRegion state");
843        match allocate_region {
844            AllocateRegion::Build(build_plan) => assert!(build_plan.plan_entries.is_empty()),
845            AllocateRegion::Execute(_) => panic!("expected build plan"),
846        }
847    }
848
849    #[test]
850    fn test_allocate_region_execute_state_round_trip() {
851        // Arrange
852        let state: Box<dyn State> = Box::new(AllocateRegion::Execute(ExecutePlan));
853
854        // Act
855        let serialized = serde_json::to_string(&state).unwrap();
856        let deserialized: Box<dyn State> = serde_json::from_str(&serialized).unwrap();
857
858        // Assert
859        assert_eq!(
860            serialized,
861            r#"{"repartition_state":"AllocateRegion","Execute":null}"#
862        );
863        let allocate_region = deserialized
864            .as_any()
865            .downcast_ref::<AllocateRegion>()
866            .expect("expected AllocateRegion state");
867        match allocate_region {
868            AllocateRegion::Execute(_) => {}
869            AllocateRegion::Build(_) => panic!("expected execute plan"),
870        }
871    }
872}