Skip to main content

meta_srv/procedure/repartition/
allocate_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::{HashMap, HashSet};
17
18use common_meta::ddl::create_table::executor::CreateTableExecutor;
19use common_meta::ddl::create_table::template::{
20    CreateRequestBuilder, build_template_from_raw_table_info_for_physical_table,
21};
22use common_meta::lock_key::TableLock;
23use common_meta::node_manager::NodeManagerRef;
24use common_meta::peer::PeerAllocContext;
25use common_meta::rpc::router::RegionRoute;
26use common_procedure::{Context as ProcedureContext, Status};
27use common_telemetry::{debug, info};
28use serde::{Deserialize, Deserializer, Serialize};
29use snafu::{OptionExt, ResultExt};
30use store_api::storage::{RegionId, RegionNumber, TableId};
31use table::metadata::TableInfo;
32use table::table_reference::TableReference;
33use tokio::time::Instant;
34
35use crate::error::{self, Result};
36use crate::procedure::repartition::dispatch::Dispatch;
37use crate::procedure::repartition::plan::{
38    AllocationPlanEntry, RegionDescriptor, RepartitionPlanEntry,
39    convert_allocation_plan_to_repartition_plan,
40};
41use crate::procedure::repartition::{Context, State};
42
43#[derive(Debug, Clone, Serialize)]
44pub enum AllocateRegion {
45    Build(BuildPlan),
46    Execute(ExecutePlan),
47}
48
49impl<'de> Deserialize<'de> for AllocateRegion {
50    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
51    where
52        D: Deserializer<'de>,
53    {
54        #[derive(Deserialize)]
55        enum CurrentAllocateRegion {
56            Build(BuildPlan),
57            Execute(ExecutePlan),
58        }
59
60        #[derive(Deserialize)]
61        struct LegacyAllocateRegion {
62            plan_entries: Vec<AllocationPlanEntry>,
63        }
64
65        #[derive(Deserialize)]
66        #[serde(untagged)]
67        enum AllocateRegionRepr {
68            Current(CurrentAllocateRegion),
69            Legacy(LegacyAllocateRegion),
70        }
71
72        match AllocateRegionRepr::deserialize(deserializer)? {
73            AllocateRegionRepr::Current(CurrentAllocateRegion::Build(build_plan)) => {
74                Ok(Self::Build(build_plan))
75            }
76            AllocateRegionRepr::Current(CurrentAllocateRegion::Execute(execute_plan)) => {
77                Ok(Self::Execute(execute_plan))
78            }
79            AllocateRegionRepr::Legacy(legacy) => Ok(Self::Build(BuildPlan {
80                plan_entries: legacy.plan_entries,
81            })),
82        }
83    }
84}
85
86#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct BuildPlan {
88    plan_entries: Vec<AllocationPlanEntry>,
89}
90
91impl BuildPlan {
92    async fn next(
93        &mut self,
94        ctx: &mut Context,
95        _procedure_ctx: &ProcedureContext,
96    ) -> Result<(Box<dyn State>, Status)> {
97        let timer = Instant::now();
98        let table_id = ctx.persistent_ctx.table_id;
99        let table_route_value = ctx.get_table_route_value().await?;
100        let mut next_region_number =
101            AllocateRegion::get_next_region_number(table_route_value.max_region_number().unwrap());
102
103        // Converts allocation plan to repartition plan.
104        let repartition_plan_entries = AllocateRegion::convert_to_repartition_plans(
105            table_id,
106            &mut next_region_number,
107            &self.plan_entries,
108            table_route_value.region_routes().unwrap(),
109        )?;
110        let plan_count = repartition_plan_entries.len();
111        let to_allocate = AllocateRegion::count_regions_to_allocate(&repartition_plan_entries);
112        info!(
113            "Repartition allocate regions start, table_id: {}, groups: {}, regions_to_allocate: {}",
114            table_id, plan_count, to_allocate
115        );
116
117        // If no region to allocate, directly dispatch the plan.
118        if AllocateRegion::count_regions_to_allocate(&repartition_plan_entries) == 0 {
119            ctx.persistent_ctx.plans = repartition_plan_entries;
120            ctx.update_allocate_region_elapsed(timer.elapsed());
121            return Ok((Box::new(Dispatch), Status::executing(true)));
122        }
123
124        ctx.persistent_ctx.plans = repartition_plan_entries;
125        debug!(
126            "Repartition allocate regions build plan completed, table_id: {}, elapsed: {:?}",
127            table_id,
128            timer.elapsed()
129        );
130        Ok((
131            Box::new(AllocateRegion::Execute(ExecutePlan)),
132            Status::executing(true),
133        ))
134    }
135}
136
137#[derive(Debug, Clone, Serialize, Deserialize)]
138pub struct ExecutePlan;
139
140impl ExecutePlan {
141    async fn next(
142        &mut self,
143        ctx: &mut Context,
144        procedure_ctx: &ProcedureContext,
145    ) -> Result<(Box<dyn State>, Status)> {
146        let timer = Instant::now();
147        let table_id = ctx.persistent_ctx.table_id;
148        let allocate_regions = AllocateRegion::collect_allocate_regions(&ctx.persistent_ctx.plans);
149        let region_number_and_partition_exprs =
150            AllocateRegion::prepare_region_allocation_data(&allocate_regions)?;
151        let table_info_value = ctx.get_table_info_value().await?;
152        let new_allocated_region_routes = ctx
153            .region_routes_allocator
154            .allocate(
155                table_id,
156                &region_number_and_partition_exprs
157                    .iter()
158                    .map(|(n, p)| (*n, p.as_str()))
159                    .collect::<Vec<_>>(),
160                &PeerAllocContext::default(),
161            )
162            .await
163            .context(error::AllocateRegionRoutesSnafu { table_id })?;
164        let wal_options = ctx
165            .wal_options_allocator
166            .allocate(
167                &allocate_regions
168                    .iter()
169                    .map(|r| r.region_id.region_number())
170                    .collect::<Vec<_>>(),
171                table_info_value.table_info.meta.options.skip_wal,
172            )
173            .await
174            .context(error::AllocateWalOptionsSnafu { table_id })?;
175
176        let new_region_count = new_allocated_region_routes.len();
177        let new_regions_brief: Vec<_> = new_allocated_region_routes
178            .iter()
179            .map(|route| {
180                let region_id = route.region.id;
181                let peer = route.leader_peer.as_ref().map(|p| p.id).unwrap_or_default();
182                format!("region_id: {}, peer: {}", region_id, peer)
183            })
184            .collect();
185        info!(
186            "Allocated regions for repartition, table_id: {}, new_region_count: {}, new_regions: {:?}",
187            table_id, new_region_count, new_regions_brief
188        );
189
190        // The table route metadata is not updated yet; register it in memory for region lease renewal.
191        let _operating_guards = Context::register_operating_regions(
192            &ctx.memory_region_keeper,
193            &new_allocated_region_routes,
194        )?;
195        // Allocates the regions on datanodes.
196        AllocateRegion::allocate_regions(
197            &ctx.node_manager,
198            &table_info_value.table_info,
199            &new_allocated_region_routes,
200            &wal_options,
201        )
202        .await?;
203
204        // Updates the table routes.
205        let table_lock = TableLock::Write(table_id).into();
206        let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
207        // MUST refresh the table route value after acquiring the lock to avoid lost update.
208        // Otherwise, the new allocated regions might be overridden by concurrent repartition procedures.
209        let table_route_value = ctx.get_table_route_value().await?;
210        // Safety: it is physical table route value.
211        let region_routes = table_route_value.region_routes().unwrap();
212        let new_region_routes =
213            AllocateRegion::generate_region_routes(region_routes, &new_allocated_region_routes);
214        ctx.update_table_route(&table_route_value, new_region_routes, wal_options)
215            .await?;
216        ctx.invalidate_table_cache().await?;
217
218        ctx.update_allocate_region_elapsed(timer.elapsed());
219        Ok((Box::new(Dispatch), Status::executing(true)))
220    }
221}
222
223#[async_trait::async_trait]
224#[typetag::serde]
225impl State for AllocateRegion {
226    async fn next(
227        &mut self,
228        ctx: &mut Context,
229        procedure_ctx: &ProcedureContext,
230    ) -> Result<(Box<dyn State>, Status)> {
231        match self {
232            AllocateRegion::Build(build_plan) => build_plan.next(ctx, procedure_ctx).await,
233            AllocateRegion::Execute(execute_plan) => execute_plan.next(ctx, procedure_ctx).await,
234        }
235    }
236
237    fn as_any(&self) -> &dyn Any {
238        self
239    }
240}
241
242impl AllocateRegion {
243    pub fn new(plan_entries: Vec<AllocationPlanEntry>) -> Self {
244        AllocateRegion::Build(BuildPlan { plan_entries })
245    }
246
247    fn generate_region_routes(
248        region_routes: &[RegionRoute],
249        new_allocated_region_ids: &[RegionRoute],
250    ) -> Vec<RegionRoute> {
251        let region_ids = region_routes
252            .iter()
253            .map(|r| r.region.id)
254            .collect::<HashSet<_>>();
255        let mut new_region_routes = region_routes.to_vec();
256        for new_allocated_region_id in new_allocated_region_ids {
257            if !region_ids.contains(&new_allocated_region_id.region.id) {
258                new_region_routes.push(new_allocated_region_id.clone());
259            }
260        }
261        new_region_routes
262    }
263
264    /// Converts allocation plan entries to repartition plan entries.
265    ///
266    /// This method converts allocation intents into concrete repartition plans,
267    /// updates `next_region_number` for newly allocated regions, and captures
268    /// each plan's `original_target_routes` from the current table-route view.
269    ///
270    /// This also persists each plan's pre-staging target routes for rollback.
271    fn convert_to_repartition_plans(
272        table_id: TableId,
273        next_region_number: &mut RegionNumber,
274        plan_entries: &[AllocationPlanEntry],
275        current_region_routes: &[RegionRoute],
276    ) -> Result<Vec<RepartitionPlanEntry>> {
277        let region_routes_map = current_region_routes
278            .iter()
279            .map(|route| (route.region.id, route))
280            .collect::<HashMap<_, _>>();
281
282        plan_entries
283            .iter()
284            .map(|plan_entry| {
285                let mut plan = convert_allocation_plan_to_repartition_plan(
286                    table_id,
287                    next_region_number,
288                    plan_entry,
289                );
290                Self::capture_plan_original_target_routes(&mut plan, &region_routes_map)?;
291                Ok(plan)
292            })
293            .collect()
294    }
295
296    fn capture_plan_original_target_routes(
297        plan: &mut RepartitionPlanEntry,
298        region_routes_map: &HashMap<RegionId, &RegionRoute>,
299    ) -> Result<()> {
300        // Persist the pre-staging target-route view on the parent plan.
301        // Newly allocated targets are skipped because rollback deletes their
302        // route metadata rather than restoring an original target route.
303        let mut original_target_routes = Vec::with_capacity(plan.target_regions.len());
304        for target in &plan.target_regions {
305            if plan.allocated_region_ids.contains(&target.region_id) {
306                // This target region is to be allocated, so it doesn't exist in current routes.
307                continue;
308            }
309            let route = region_routes_map.get(&target.region_id).context(
310                error::RepartitionTargetRegionMissingSnafu {
311                    group_id: plan.group_id,
312                    region_id: target.region_id,
313                },
314            )?;
315            {
316                original_target_routes.push((*route).clone());
317            }
318        }
319
320        plan.original_target_routes = original_target_routes;
321        Ok(())
322    }
323
324    /// Collects all regions that need to be allocated from the repartition plan entries.
325    fn collect_allocate_regions(
326        repartition_plan_entries: &[RepartitionPlanEntry],
327    ) -> Vec<&RegionDescriptor> {
328        repartition_plan_entries
329            .iter()
330            .flat_map(|p| p.allocate_regions())
331            .collect()
332    }
333
334    /// Prepares region allocation data: region numbers and their partition expressions.
335    fn prepare_region_allocation_data(
336        allocate_regions: &[&RegionDescriptor],
337    ) -> Result<Vec<(RegionNumber, String)>> {
338        allocate_regions
339            .iter()
340            .map(|r| {
341                Ok((
342                    r.region_id.region_number(),
343                    r.partition_expr
344                        .as_json_str()
345                        .context(error::SerializePartitionExprSnafu)?,
346                ))
347            })
348            .collect()
349    }
350
351    /// Calculates the total number of regions that need to be allocated.
352    fn count_regions_to_allocate(repartition_plan_entries: &[RepartitionPlanEntry]) -> usize {
353        repartition_plan_entries
354            .iter()
355            .map(|p| p.allocated_region_ids.len())
356            .sum()
357    }
358
359    /// Gets the next region number from the physical table route.
360    fn get_next_region_number(max_region_number: RegionNumber) -> RegionNumber {
361        max_region_number + 1
362    }
363
364    async fn allocate_regions(
365        node_manager: &NodeManagerRef,
366        raw_table_info: &TableInfo,
367        region_routes: &[RegionRoute],
368        wal_options: &HashMap<RegionNumber, String>,
369    ) -> Result<()> {
370        let table_ref = TableReference::full(
371            &raw_table_info.catalog_name,
372            &raw_table_info.schema_name,
373            &raw_table_info.name,
374        );
375        let table_id = raw_table_info.ident.table_id;
376        // Repartition allocation targets physical regions, so exclude metric internal columns
377        // and derive primary keys from tag semantics.
378        let request = build_template_from_raw_table_info_for_physical_table(raw_table_info)
379            .context(error::BuildCreateRequestSnafu { table_id })?;
380        common_telemetry::debug!(
381            "Allocating regions request, table_id: {}, request: {:?}",
382            table_id,
383            request
384        );
385        let builder = CreateRequestBuilder::new(request, None);
386        let region_count = region_routes.len();
387        let wal_region_count = wal_options.len();
388        info!(
389            "Allocating regions on datanodes, table_id: {}, region_count: {}, wal_regions: {}",
390            table_id, region_count, wal_region_count
391        );
392        let executor = CreateTableExecutor::new(table_ref.into(), false, builder);
393        executor
394            .on_create_regions(node_manager, table_id, region_routes, wal_options)
395            .await
396            .context(error::AllocateRegionsSnafu { table_id })?;
397
398        Ok(())
399    }
400}
401
402#[cfg(test)]
403mod tests {
404    use std::collections::HashMap;
405    use std::sync::Arc;
406
407    use common_meta::key::TableMetadataManagerRef;
408    use common_meta::key::datanode_table::DatanodeTableKey;
409    use common_meta::peer::Peer;
410    use common_meta::rpc::router::{Region, RegionRoute};
411    use common_meta::test_util::MockDatanodeManager;
412    use common_procedure::{ContextProvider, ProcedureId, ProcedureState};
413    use common_procedure_test::MockContextProvider;
414    use store_api::storage::RegionId;
415    use tokio::sync::watch;
416    use uuid::Uuid;
417
418    use super::*;
419    use crate::procedure::repartition::State;
420    use crate::procedure::repartition::test_util::{
421        TestingEnv, current_parent_region_routes, new_parent_context, range_expr,
422        test_region_wal_options,
423    };
424
425    fn create_region_descriptor(
426        table_id: TableId,
427        region_number: u32,
428        col: &str,
429        start: i64,
430        end: i64,
431    ) -> RegionDescriptor {
432        RegionDescriptor {
433            region_id: RegionId::new(table_id, region_number),
434            partition_expr: range_expr(col, start, end),
435        }
436    }
437
438    fn create_allocation_plan_entry(
439        table_id: TableId,
440        source_region_numbers: &[u32],
441        target_ranges: &[(i64, i64)],
442    ) -> AllocationPlanEntry {
443        let source_regions = source_region_numbers
444            .iter()
445            .enumerate()
446            .map(|(i, &n)| {
447                let start = i as i64 * 100;
448                let end = (i + 1) as i64 * 100;
449                create_region_descriptor(table_id, n, "x", start, end)
450            })
451            .collect();
452
453        let target_partition_exprs = target_ranges
454            .iter()
455            .map(|&(start, end)| range_expr("x", start, end))
456            .collect();
457
458        AllocationPlanEntry {
459            group_id: Uuid::new_v4(),
460            source_regions,
461            target_partition_exprs,
462            transition_map: vec![],
463        }
464    }
465
466    fn create_current_region_routes(table_id: TableId, region_numbers: &[u32]) -> Vec<RegionRoute> {
467        region_numbers
468            .iter()
469            .map(|region_number| RegionRoute {
470                region: Region {
471                    id: RegionId::new(table_id, *region_number),
472                    ..Default::default()
473                },
474                leader_peer: Some(Peer::empty(1)),
475                ..Default::default()
476            })
477            .collect()
478    }
479
480    struct ConcurrentTableRouteUpdateProvider {
481        inner: MockContextProvider,
482        table_metadata_manager: TableMetadataManagerRef,
483        table_id: TableId,
484        concurrent_region_route: RegionRoute,
485        region_wal_options: HashMap<RegionNumber, String>,
486    }
487
488    #[async_trait::async_trait]
489    impl ContextProvider for ConcurrentTableRouteUpdateProvider {
490        async fn procedure_state(
491            &self,
492            procedure_id: ProcedureId,
493        ) -> common_procedure::Result<Option<ProcedureState>> {
494            self.inner.procedure_state(procedure_id).await
495        }
496
497        async fn procedure_state_receiver(
498            &self,
499            procedure_id: ProcedureId,
500        ) -> common_procedure::Result<Option<watch::Receiver<ProcedureState>>> {
501            self.inner.procedure_state_receiver(procedure_id).await
502        }
503
504        async fn try_put_poison(
505            &self,
506            key: &common_procedure::PoisonKey,
507            procedure_id: ProcedureId,
508        ) -> common_procedure::Result<()> {
509            self.inner.try_put_poison(key, procedure_id).await
510        }
511
512        async fn acquire_lock(
513            &self,
514            key: &common_procedure::StringKey,
515        ) -> common_procedure::local::DynamicKeyLockGuard {
516            let current_table_route_value = self
517                .table_metadata_manager
518                .table_route_manager()
519                .table_route_storage()
520                .get_with_raw_bytes(self.table_id)
521                .await
522                .unwrap()
523                .unwrap();
524            let mut region_routes = current_table_route_value.region_routes().unwrap().clone();
525
526            if !region_routes
527                .iter()
528                .any(|route| route.region.id == self.concurrent_region_route.region.id)
529            {
530                region_routes.push(self.concurrent_region_route.clone());
531                let datanode_id = current_table_route_value.region_routes().unwrap()[0]
532                    .leader_peer
533                    .as_ref()
534                    .unwrap()
535                    .id;
536                let datanode_table_value = self
537                    .table_metadata_manager
538                    .datanode_table_manager()
539                    .get(&DatanodeTableKey::new(datanode_id, self.table_id))
540                    .await
541                    .unwrap()
542                    .unwrap();
543                let region_options = &datanode_table_value.region_info.region_options;
544
545                self.table_metadata_manager
546                    .update_table_route(
547                        self.table_id,
548                        datanode_table_value.region_info.clone(),
549                        &current_table_route_value,
550                        region_routes,
551                        region_options,
552                        &self.region_wal_options,
553                    )
554                    .await
555                    .unwrap();
556            }
557
558            self.inner.acquire_lock(key).await
559        }
560    }
561
562    #[test]
563    fn test_convert_to_repartition_plans_no_allocation() {
564        let table_id = 1024;
565        let mut next_region_number = 10;
566
567        // 2 source -> 2 target (no allocation needed)
568        let plan_entries = vec![create_allocation_plan_entry(
569            table_id,
570            &[1, 2],
571            &[(0, 50), (50, 200)],
572        )];
573
574        let result = AllocateRegion::convert_to_repartition_plans(
575            table_id,
576            &mut next_region_number,
577            &plan_entries,
578            &create_current_region_routes(table_id, &[1, 2]),
579        )
580        .unwrap();
581
582        assert_eq!(result.len(), 1);
583        assert_eq!(result[0].target_regions.len(), 2);
584        assert!(result[0].allocated_region_ids.is_empty());
585        // next_region_number should not change
586        assert_eq!(next_region_number, 10);
587    }
588
589    #[test]
590    fn test_convert_to_repartition_plans_with_allocation() {
591        let table_id = 1024;
592        let mut next_region_number = 10;
593
594        // 2 source -> 4 target (need to allocate 2 regions)
595        let plan_entries = vec![create_allocation_plan_entry(
596            table_id,
597            &[1, 2],
598            &[(0, 50), (50, 100), (100, 150), (150, 200)],
599        )];
600
601        let result = AllocateRegion::convert_to_repartition_plans(
602            table_id,
603            &mut next_region_number,
604            &plan_entries,
605            &create_current_region_routes(table_id, &[1, 2]),
606        )
607        .unwrap();
608
609        assert_eq!(result.len(), 1);
610        assert_eq!(result[0].target_regions.len(), 4);
611        assert_eq!(result[0].allocated_region_ids.len(), 2);
612        assert_eq!(
613            result[0].allocated_region_ids[0],
614            RegionId::new(table_id, 10)
615        );
616        assert_eq!(
617            result[0].allocated_region_ids[1],
618            RegionId::new(table_id, 11)
619        );
620        // next_region_number should be incremented by 2
621        assert_eq!(next_region_number, 12);
622    }
623
624    #[test]
625    fn test_convert_to_repartition_plans_multiple_entries() {
626        let table_id = 1024;
627        let mut next_region_number = 10;
628
629        // Multiple plan entries with different allocation needs
630        let plan_entries = vec![
631            create_allocation_plan_entry(table_id, &[1], &[(0, 50), (50, 100)]), // need 1 allocation
632            create_allocation_plan_entry(table_id, &[2, 3], &[(100, 150), (150, 200)]), // no allocation
633            create_allocation_plan_entry(table_id, &[4], &[(200, 250), (250, 300), (300, 400)]), // need 2 allocations
634        ];
635
636        let result = AllocateRegion::convert_to_repartition_plans(
637            table_id,
638            &mut next_region_number,
639            &plan_entries,
640            &create_current_region_routes(table_id, &[1, 2, 3, 4]),
641        )
642        .unwrap();
643
644        assert_eq!(result.len(), 3);
645        assert_eq!(result[0].allocated_region_ids.len(), 1);
646        assert_eq!(result[1].allocated_region_ids.len(), 0);
647        assert_eq!(result[2].allocated_region_ids.len(), 2);
648        // next_region_number should be incremented by 3 total
649        assert_eq!(next_region_number, 13);
650    }
651
652    #[test]
653    fn test_count_regions_to_allocate() {
654        let table_id = 1024;
655        let mut next_region_number = 10;
656
657        let plan_entries = vec![
658            create_allocation_plan_entry(table_id, &[1], &[(0, 50), (50, 100)]), // 1 allocation
659            create_allocation_plan_entry(table_id, &[2, 3], &[(100, 200)]), // 0 allocation (deallocate)
660            create_allocation_plan_entry(table_id, &[4], &[(200, 250), (250, 300)]), // 1 allocation
661        ];
662
663        let repartition_plans = AllocateRegion::convert_to_repartition_plans(
664            table_id,
665            &mut next_region_number,
666            &plan_entries,
667            &create_current_region_routes(table_id, &[1, 2, 3, 4]),
668        )
669        .unwrap();
670
671        let count = AllocateRegion::count_regions_to_allocate(&repartition_plans);
672        assert_eq!(count, 2);
673    }
674
675    #[test]
676    fn test_collect_allocate_regions() {
677        let table_id = 1024;
678        let mut next_region_number = 10;
679
680        let plan_entries = vec![
681            create_allocation_plan_entry(table_id, &[1], &[(0, 50), (50, 100)]), // 1 allocation
682            create_allocation_plan_entry(table_id, &[2], &[(100, 150), (150, 200)]), // 1 allocation
683        ];
684
685        let repartition_plans = AllocateRegion::convert_to_repartition_plans(
686            table_id,
687            &mut next_region_number,
688            &plan_entries,
689            &create_current_region_routes(table_id, &[1, 2]),
690        )
691        .unwrap();
692
693        let allocate_regions = AllocateRegion::collect_allocate_regions(&repartition_plans);
694        assert_eq!(allocate_regions.len(), 2);
695        assert_eq!(allocate_regions[0].region_id, RegionId::new(table_id, 10));
696        assert_eq!(allocate_regions[1].region_id, RegionId::new(table_id, 11));
697    }
698
699    #[test]
700    fn test_prepare_region_allocation_data() {
701        let table_id = 1024;
702        let regions = [
703            create_region_descriptor(table_id, 10, "x", 0, 50),
704            create_region_descriptor(table_id, 11, "x", 50, 100),
705        ];
706        let region_refs: Vec<&RegionDescriptor> = regions.iter().collect();
707
708        let result = AllocateRegion::prepare_region_allocation_data(&region_refs).unwrap();
709
710        assert_eq!(result.len(), 2);
711        assert_eq!(result[0].0, 10);
712        assert_eq!(result[1].0, 11);
713        // Verify partition expressions are serialized
714        assert!(!result[0].1.is_empty());
715        assert!(!result[1].1.is_empty());
716    }
717
718    #[tokio::test]
719    async fn test_execute_plan_uses_latest_table_route_after_lock() {
720        let env = TestingEnv::new();
721        let table_id = 1024;
722        let original_region_routes = create_current_region_routes(table_id, &[1]);
723        env.create_physical_table_metadata_for_repartition(
724            table_id,
725            original_region_routes,
726            test_region_wal_options(&[1]),
727        )
728        .await;
729
730        let node_manager = Arc::new(MockDatanodeManager::new(()));
731        let mut ctx = new_parent_context(&env, node_manager, table_id);
732        ctx.persistent_ctx.plans = vec![RepartitionPlanEntry {
733            group_id: Uuid::new_v4(),
734            source_regions: vec![],
735            target_regions: vec![create_region_descriptor(table_id, 3, "x", 0, 100)],
736            allocated_region_ids: vec![RegionId::new(table_id, 3)],
737            pending_deallocate_region_ids: vec![],
738            transition_map: vec![],
739            original_target_routes: vec![],
740        }];
741        let concurrent_region_route = create_current_region_routes(table_id, &[2])
742            .into_iter()
743            .next()
744            .unwrap();
745        let procedure_ctx = ProcedureContext {
746            procedure_id: ProcedureId::random(),
747            provider: Arc::new(ConcurrentTableRouteUpdateProvider {
748                inner: MockContextProvider::default(),
749                table_metadata_manager: env.table_metadata_manager.clone(),
750                table_id,
751                concurrent_region_route,
752                region_wal_options: test_region_wal_options(&[1, 2]),
753            }),
754        };
755        let mut state = ExecutePlan;
756
757        state.next(&mut ctx, &procedure_ctx).await.unwrap();
758
759        let region_ids = current_parent_region_routes(&ctx)
760            .await
761            .into_iter()
762            .map(|route| route.region.id)
763            .collect::<Vec<_>>();
764        assert_eq!(
765            region_ids,
766            vec![
767                RegionId::new(table_id, 1),
768                RegionId::new(table_id, 2),
769                RegionId::new(table_id, 3),
770            ]
771        );
772    }
773
774    #[test]
775    fn test_allocate_region_state_backward_compatibility() {
776        // Arrange
777        let serialized = r#"{"repartition_state":"AllocateRegion","plan_entries":[]}"#;
778
779        // Act
780        let state: Box<dyn State> = serde_json::from_str(serialized).unwrap();
781
782        // Assert
783        let allocate_region = state
784            .as_any()
785            .downcast_ref::<AllocateRegion>()
786            .expect("expected AllocateRegion state");
787        match allocate_region {
788            AllocateRegion::Build(build_plan) => assert!(build_plan.plan_entries.is_empty()),
789            AllocateRegion::Execute(_) => panic!("expected build plan"),
790        }
791    }
792
793    #[test]
794    fn test_allocate_region_state_round_trip() {
795        // Arrange
796        let state: Box<dyn State> = Box::new(AllocateRegion::new(vec![]));
797
798        // Act
799        let serialized = serde_json::to_string(&state).unwrap();
800        let deserialized: Box<dyn State> = serde_json::from_str(&serialized).unwrap();
801
802        // Assert
803        assert_eq!(
804            serialized,
805            r#"{"repartition_state":"AllocateRegion","Build":{"plan_entries":[]}}"#
806        );
807        let allocate_region = deserialized
808            .as_any()
809            .downcast_ref::<AllocateRegion>()
810            .expect("expected AllocateRegion state");
811        match allocate_region {
812            AllocateRegion::Build(build_plan) => assert!(build_plan.plan_entries.is_empty()),
813            AllocateRegion::Execute(_) => panic!("expected build plan"),
814        }
815    }
816
817    #[test]
818    fn test_allocate_region_execute_state_round_trip() {
819        // Arrange
820        let state: Box<dyn State> = Box::new(AllocateRegion::Execute(ExecutePlan));
821
822        // Act
823        let serialized = serde_json::to_string(&state).unwrap();
824        let deserialized: Box<dyn State> = serde_json::from_str(&serialized).unwrap();
825
826        // Assert
827        assert_eq!(
828            serialized,
829            r#"{"repartition_state":"AllocateRegion","Execute":null}"#
830        );
831        let allocate_region = deserialized
832            .as_any()
833            .downcast_ref::<AllocateRegion>()
834            .expect("expected AllocateRegion state");
835        match allocate_region {
836            AllocateRegion::Execute(_) => {}
837            AllocateRegion::Build(_) => panic!("expected execute plan"),
838        }
839    }
840}