Skip to main content

meta_srv/procedure/repartition/
allocate_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::{HashMap, HashSet};
17
18use common_meta::ddl::create_table::executor::CreateTableExecutor;
19use common_meta::ddl::create_table::template::{
20    CreateRequestBuilder, build_template_from_raw_table_info_for_physical_table,
21};
22use common_meta::lock_key::TableLock;
23use common_meta::node_manager::NodeManagerRef;
24use common_meta::rpc::router::RegionRoute;
25use common_procedure::{Context as ProcedureContext, Status};
26use common_telemetry::{debug, info};
27use serde::{Deserialize, Deserializer, Serialize};
28use snafu::{OptionExt, ResultExt};
29use store_api::storage::{RegionId, RegionNumber, TableId};
30use table::metadata::TableInfo;
31use table::table_reference::TableReference;
32use tokio::time::Instant;
33
34use crate::error::{self, Result};
35use crate::procedure::repartition::dispatch::Dispatch;
36use crate::procedure::repartition::plan::{
37    AllocationPlanEntry, RegionDescriptor, RepartitionPlanEntry,
38    convert_allocation_plan_to_repartition_plan,
39};
40use crate::procedure::repartition::{Context, State};
41
42#[derive(Debug, Clone, Serialize)]
43pub enum AllocateRegion {
44    Build(BuildPlan),
45    Execute(ExecutePlan),
46}
47
48impl<'de> Deserialize<'de> for AllocateRegion {
49    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
50    where
51        D: Deserializer<'de>,
52    {
53        #[derive(Deserialize)]
54        enum CurrentAllocateRegion {
55            Build(BuildPlan),
56            Execute(ExecutePlan),
57        }
58
59        #[derive(Deserialize)]
60        struct LegacyAllocateRegion {
61            plan_entries: Vec<AllocationPlanEntry>,
62        }
63
64        #[derive(Deserialize)]
65        #[serde(untagged)]
66        enum AllocateRegionRepr {
67            Current(CurrentAllocateRegion),
68            Legacy(LegacyAllocateRegion),
69        }
70
71        match AllocateRegionRepr::deserialize(deserializer)? {
72            AllocateRegionRepr::Current(CurrentAllocateRegion::Build(build_plan)) => {
73                Ok(Self::Build(build_plan))
74            }
75            AllocateRegionRepr::Current(CurrentAllocateRegion::Execute(execute_plan)) => {
76                Ok(Self::Execute(execute_plan))
77            }
78            AllocateRegionRepr::Legacy(legacy) => Ok(Self::Build(BuildPlan {
79                plan_entries: legacy.plan_entries,
80            })),
81        }
82    }
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct BuildPlan {
87    plan_entries: Vec<AllocationPlanEntry>,
88}
89
90impl BuildPlan {
91    async fn next(
92        &mut self,
93        ctx: &mut Context,
94        _procedure_ctx: &ProcedureContext,
95    ) -> Result<(Box<dyn State>, Status)> {
96        let timer = Instant::now();
97        let table_id = ctx.persistent_ctx.table_id;
98        let table_route_value = ctx.get_table_route_value().await?;
99        let mut next_region_number =
100            AllocateRegion::get_next_region_number(table_route_value.max_region_number().unwrap());
101
102        // Converts allocation plan to repartition plan.
103        let repartition_plan_entries = AllocateRegion::convert_to_repartition_plans(
104            table_id,
105            &mut next_region_number,
106            &self.plan_entries,
107            table_route_value.region_routes().unwrap(),
108        )?;
109        let plan_count = repartition_plan_entries.len();
110        let to_allocate = AllocateRegion::count_regions_to_allocate(&repartition_plan_entries);
111        info!(
112            "Repartition allocate regions start, table_id: {}, groups: {}, regions_to_allocate: {}",
113            table_id, plan_count, to_allocate
114        );
115
116        // If no region to allocate, directly dispatch the plan.
117        if AllocateRegion::count_regions_to_allocate(&repartition_plan_entries) == 0 {
118            ctx.persistent_ctx.plans = repartition_plan_entries;
119            ctx.update_allocate_region_elapsed(timer.elapsed());
120            return Ok((Box::new(Dispatch), Status::executing(true)));
121        }
122
123        ctx.persistent_ctx.plans = repartition_plan_entries;
124        debug!(
125            "Repartition allocate regions build plan completed, table_id: {}, elapsed: {:?}",
126            table_id,
127            timer.elapsed()
128        );
129        Ok((
130            Box::new(AllocateRegion::Execute(ExecutePlan)),
131            Status::executing(true),
132        ))
133    }
134}
135
136#[derive(Debug, Clone, Serialize, Deserialize)]
137pub struct ExecutePlan;
138
139impl ExecutePlan {
140    async fn next(
141        &mut self,
142        ctx: &mut Context,
143        procedure_ctx: &ProcedureContext,
144    ) -> Result<(Box<dyn State>, Status)> {
145        let timer = Instant::now();
146        let table_id = ctx.persistent_ctx.table_id;
147        let allocate_regions = AllocateRegion::collect_allocate_regions(&ctx.persistent_ctx.plans);
148        let region_number_and_partition_exprs =
149            AllocateRegion::prepare_region_allocation_data(&allocate_regions)?;
150        let table_info_value = ctx.get_table_info_value().await?;
151        let table_route_value = ctx.get_table_route_value().await?;
152        // Safety: it is physical table route value.
153        let region_routes = table_route_value.region_routes().unwrap();
154        let new_allocated_region_routes = ctx
155            .region_routes_allocator
156            .allocate(
157                table_id,
158                &region_number_and_partition_exprs
159                    .iter()
160                    .map(|(n, p)| (*n, p.as_str()))
161                    .collect::<Vec<_>>(),
162            )
163            .await
164            .context(error::AllocateRegionRoutesSnafu { table_id })?;
165        let wal_options = ctx
166            .wal_options_allocator
167            .allocate(
168                &allocate_regions
169                    .iter()
170                    .map(|r| r.region_id.region_number())
171                    .collect::<Vec<_>>(),
172                table_info_value.table_info.meta.options.skip_wal,
173            )
174            .await
175            .context(error::AllocateWalOptionsSnafu { table_id })?;
176
177        let new_region_count = new_allocated_region_routes.len();
178        let new_regions_brief: Vec<_> = new_allocated_region_routes
179            .iter()
180            .map(|route| {
181                let region_id = route.region.id;
182                let peer = route.leader_peer.as_ref().map(|p| p.id).unwrap_or_default();
183                format!("region_id: {}, peer: {}", region_id, peer)
184            })
185            .collect();
186        info!(
187            "Allocated regions for repartition, table_id: {}, new_region_count: {}, new_regions: {:?}",
188            table_id, new_region_count, new_regions_brief
189        );
190
191        // The table route metadata is not updated yet; register it in memory for region lease renewal.
192        let _operating_guards = Context::register_operating_regions(
193            &ctx.memory_region_keeper,
194            &new_allocated_region_routes,
195        )?;
196        // Allocates the regions on datanodes.
197        AllocateRegion::allocate_regions(
198            &ctx.node_manager,
199            &table_info_value.table_info,
200            &new_allocated_region_routes,
201            &wal_options,
202        )
203        .await?;
204
205        // Updates the table routes.
206        let table_lock = TableLock::Write(table_id).into();
207        let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
208        let new_region_routes =
209            AllocateRegion::generate_region_routes(region_routes, &new_allocated_region_routes);
210        ctx.update_table_route(&table_route_value, new_region_routes, wal_options)
211            .await?;
212        ctx.invalidate_table_cache().await?;
213
214        ctx.update_allocate_region_elapsed(timer.elapsed());
215        Ok((Box::new(Dispatch), Status::executing(true)))
216    }
217}
218
219#[async_trait::async_trait]
220#[typetag::serde]
221impl State for AllocateRegion {
222    async fn next(
223        &mut self,
224        ctx: &mut Context,
225        procedure_ctx: &ProcedureContext,
226    ) -> Result<(Box<dyn State>, Status)> {
227        match self {
228            AllocateRegion::Build(build_plan) => build_plan.next(ctx, procedure_ctx).await,
229            AllocateRegion::Execute(execute_plan) => execute_plan.next(ctx, procedure_ctx).await,
230        }
231    }
232
233    fn as_any(&self) -> &dyn Any {
234        self
235    }
236}
237
238impl AllocateRegion {
239    pub fn new(plan_entries: Vec<AllocationPlanEntry>) -> Self {
240        AllocateRegion::Build(BuildPlan { plan_entries })
241    }
242
243    fn generate_region_routes(
244        region_routes: &[RegionRoute],
245        new_allocated_region_ids: &[RegionRoute],
246    ) -> Vec<RegionRoute> {
247        let region_ids = region_routes
248            .iter()
249            .map(|r| r.region.id)
250            .collect::<HashSet<_>>();
251        let mut new_region_routes = region_routes.to_vec();
252        for new_allocated_region_id in new_allocated_region_ids {
253            if !region_ids.contains(&new_allocated_region_id.region.id) {
254                new_region_routes.push(new_allocated_region_id.clone());
255            }
256        }
257        new_region_routes
258    }
259
260    /// Converts allocation plan entries to repartition plan entries.
261    ///
262    /// This method converts allocation intents into concrete repartition plans,
263    /// updates `next_region_number` for newly allocated regions, and captures
264    /// each plan's `original_target_routes` from the current table-route view.
265    ///
266    /// This also persists each plan's pre-staging target routes for rollback.
267    fn convert_to_repartition_plans(
268        table_id: TableId,
269        next_region_number: &mut RegionNumber,
270        plan_entries: &[AllocationPlanEntry],
271        current_region_routes: &[RegionRoute],
272    ) -> Result<Vec<RepartitionPlanEntry>> {
273        let region_routes_map = current_region_routes
274            .iter()
275            .map(|route| (route.region.id, route))
276            .collect::<HashMap<_, _>>();
277
278        plan_entries
279            .iter()
280            .map(|plan_entry| {
281                let mut plan = convert_allocation_plan_to_repartition_plan(
282                    table_id,
283                    next_region_number,
284                    plan_entry,
285                );
286                Self::capture_plan_original_target_routes(&mut plan, &region_routes_map)?;
287                Ok(plan)
288            })
289            .collect()
290    }
291
292    fn capture_plan_original_target_routes(
293        plan: &mut RepartitionPlanEntry,
294        region_routes_map: &HashMap<RegionId, &RegionRoute>,
295    ) -> Result<()> {
296        // Persist the pre-staging target-route view on the parent plan.
297        // Newly allocated targets are skipped because rollback deletes their
298        // route metadata rather than restoring an original target route.
299        let mut original_target_routes = Vec::with_capacity(plan.target_regions.len());
300        for target in &plan.target_regions {
301            if plan.allocated_region_ids.contains(&target.region_id) {
302                // This target region is to be allocated, so it doesn't exist in current routes.
303                continue;
304            }
305            let route = region_routes_map.get(&target.region_id).context(
306                error::RepartitionTargetRegionMissingSnafu {
307                    group_id: plan.group_id,
308                    region_id: target.region_id,
309                },
310            )?;
311            {
312                original_target_routes.push((*route).clone());
313            }
314        }
315
316        plan.original_target_routes = original_target_routes;
317        Ok(())
318    }
319
320    /// Collects all regions that need to be allocated from the repartition plan entries.
321    fn collect_allocate_regions(
322        repartition_plan_entries: &[RepartitionPlanEntry],
323    ) -> Vec<&RegionDescriptor> {
324        repartition_plan_entries
325            .iter()
326            .flat_map(|p| p.allocate_regions())
327            .collect()
328    }
329
330    /// Prepares region allocation data: region numbers and their partition expressions.
331    fn prepare_region_allocation_data(
332        allocate_regions: &[&RegionDescriptor],
333    ) -> Result<Vec<(RegionNumber, String)>> {
334        allocate_regions
335            .iter()
336            .map(|r| {
337                Ok((
338                    r.region_id.region_number(),
339                    r.partition_expr
340                        .as_json_str()
341                        .context(error::SerializePartitionExprSnafu)?,
342                ))
343            })
344            .collect()
345    }
346
347    /// Calculates the total number of regions that need to be allocated.
348    fn count_regions_to_allocate(repartition_plan_entries: &[RepartitionPlanEntry]) -> usize {
349        repartition_plan_entries
350            .iter()
351            .map(|p| p.allocated_region_ids.len())
352            .sum()
353    }
354
355    /// Gets the next region number from the physical table route.
356    fn get_next_region_number(max_region_number: RegionNumber) -> RegionNumber {
357        max_region_number + 1
358    }
359
360    async fn allocate_regions(
361        node_manager: &NodeManagerRef,
362        raw_table_info: &TableInfo,
363        region_routes: &[RegionRoute],
364        wal_options: &HashMap<RegionNumber, String>,
365    ) -> Result<()> {
366        let table_ref = TableReference::full(
367            &raw_table_info.catalog_name,
368            &raw_table_info.schema_name,
369            &raw_table_info.name,
370        );
371        let table_id = raw_table_info.ident.table_id;
372        // Repartition allocation targets physical regions, so exclude metric internal columns
373        // and derive primary keys from tag semantics.
374        let request = build_template_from_raw_table_info_for_physical_table(raw_table_info)
375            .context(error::BuildCreateRequestSnafu { table_id })?;
376        common_telemetry::debug!(
377            "Allocating regions request, table_id: {}, request: {:?}",
378            table_id,
379            request
380        );
381        let builder = CreateRequestBuilder::new(request, None);
382        let region_count = region_routes.len();
383        let wal_region_count = wal_options.len();
384        info!(
385            "Allocating regions on datanodes, table_id: {}, region_count: {}, wal_regions: {}",
386            table_id, region_count, wal_region_count
387        );
388        let executor = CreateTableExecutor::new(table_ref.into(), false, builder);
389        executor
390            .on_create_regions(node_manager, table_id, region_routes, wal_options)
391            .await
392            .context(error::AllocateRegionsSnafu { table_id })?;
393
394        Ok(())
395    }
396}
397
398#[cfg(test)]
399mod tests {
400    use common_meta::peer::Peer;
401    use common_meta::rpc::router::{Region, RegionRoute};
402    use store_api::storage::RegionId;
403    use uuid::Uuid;
404
405    use super::*;
406    use crate::procedure::repartition::State;
407    use crate::procedure::repartition::test_util::range_expr;
408
409    fn create_region_descriptor(
410        table_id: TableId,
411        region_number: u32,
412        col: &str,
413        start: i64,
414        end: i64,
415    ) -> RegionDescriptor {
416        RegionDescriptor {
417            region_id: RegionId::new(table_id, region_number),
418            partition_expr: range_expr(col, start, end),
419        }
420    }
421
422    fn create_allocation_plan_entry(
423        table_id: TableId,
424        source_region_numbers: &[u32],
425        target_ranges: &[(i64, i64)],
426    ) -> AllocationPlanEntry {
427        let source_regions = source_region_numbers
428            .iter()
429            .enumerate()
430            .map(|(i, &n)| {
431                let start = i as i64 * 100;
432                let end = (i + 1) as i64 * 100;
433                create_region_descriptor(table_id, n, "x", start, end)
434            })
435            .collect();
436
437        let target_partition_exprs = target_ranges
438            .iter()
439            .map(|&(start, end)| range_expr("x", start, end))
440            .collect();
441
442        AllocationPlanEntry {
443            group_id: Uuid::new_v4(),
444            source_regions,
445            target_partition_exprs,
446            transition_map: vec![],
447        }
448    }
449
450    fn create_current_region_routes(table_id: TableId, region_numbers: &[u32]) -> Vec<RegionRoute> {
451        region_numbers
452            .iter()
453            .map(|region_number| RegionRoute {
454                region: Region {
455                    id: RegionId::new(table_id, *region_number),
456                    ..Default::default()
457                },
458                leader_peer: Some(Peer::empty(1)),
459                ..Default::default()
460            })
461            .collect()
462    }
463
464    #[test]
465    fn test_convert_to_repartition_plans_no_allocation() {
466        let table_id = 1024;
467        let mut next_region_number = 10;
468
469        // 2 source -> 2 target (no allocation needed)
470        let plan_entries = vec![create_allocation_plan_entry(
471            table_id,
472            &[1, 2],
473            &[(0, 50), (50, 200)],
474        )];
475
476        let result = AllocateRegion::convert_to_repartition_plans(
477            table_id,
478            &mut next_region_number,
479            &plan_entries,
480            &create_current_region_routes(table_id, &[1, 2]),
481        )
482        .unwrap();
483
484        assert_eq!(result.len(), 1);
485        assert_eq!(result[0].target_regions.len(), 2);
486        assert!(result[0].allocated_region_ids.is_empty());
487        // next_region_number should not change
488        assert_eq!(next_region_number, 10);
489    }
490
491    #[test]
492    fn test_convert_to_repartition_plans_with_allocation() {
493        let table_id = 1024;
494        let mut next_region_number = 10;
495
496        // 2 source -> 4 target (need to allocate 2 regions)
497        let plan_entries = vec![create_allocation_plan_entry(
498            table_id,
499            &[1, 2],
500            &[(0, 50), (50, 100), (100, 150), (150, 200)],
501        )];
502
503        let result = AllocateRegion::convert_to_repartition_plans(
504            table_id,
505            &mut next_region_number,
506            &plan_entries,
507            &create_current_region_routes(table_id, &[1, 2]),
508        )
509        .unwrap();
510
511        assert_eq!(result.len(), 1);
512        assert_eq!(result[0].target_regions.len(), 4);
513        assert_eq!(result[0].allocated_region_ids.len(), 2);
514        assert_eq!(
515            result[0].allocated_region_ids[0],
516            RegionId::new(table_id, 10)
517        );
518        assert_eq!(
519            result[0].allocated_region_ids[1],
520            RegionId::new(table_id, 11)
521        );
522        // next_region_number should be incremented by 2
523        assert_eq!(next_region_number, 12);
524    }
525
526    #[test]
527    fn test_convert_to_repartition_plans_multiple_entries() {
528        let table_id = 1024;
529        let mut next_region_number = 10;
530
531        // Multiple plan entries with different allocation needs
532        let plan_entries = vec![
533            create_allocation_plan_entry(table_id, &[1], &[(0, 50), (50, 100)]), // need 1 allocation
534            create_allocation_plan_entry(table_id, &[2, 3], &[(100, 150), (150, 200)]), // no allocation
535            create_allocation_plan_entry(table_id, &[4], &[(200, 250), (250, 300), (300, 400)]), // need 2 allocations
536        ];
537
538        let result = AllocateRegion::convert_to_repartition_plans(
539            table_id,
540            &mut next_region_number,
541            &plan_entries,
542            &create_current_region_routes(table_id, &[1, 2, 3, 4]),
543        )
544        .unwrap();
545
546        assert_eq!(result.len(), 3);
547        assert_eq!(result[0].allocated_region_ids.len(), 1);
548        assert_eq!(result[1].allocated_region_ids.len(), 0);
549        assert_eq!(result[2].allocated_region_ids.len(), 2);
550        // next_region_number should be incremented by 3 total
551        assert_eq!(next_region_number, 13);
552    }
553
554    #[test]
555    fn test_count_regions_to_allocate() {
556        let table_id = 1024;
557        let mut next_region_number = 10;
558
559        let plan_entries = vec![
560            create_allocation_plan_entry(table_id, &[1], &[(0, 50), (50, 100)]), // 1 allocation
561            create_allocation_plan_entry(table_id, &[2, 3], &[(100, 200)]), // 0 allocation (deallocate)
562            create_allocation_plan_entry(table_id, &[4], &[(200, 250), (250, 300)]), // 1 allocation
563        ];
564
565        let repartition_plans = AllocateRegion::convert_to_repartition_plans(
566            table_id,
567            &mut next_region_number,
568            &plan_entries,
569            &create_current_region_routes(table_id, &[1, 2, 3, 4]),
570        )
571        .unwrap();
572
573        let count = AllocateRegion::count_regions_to_allocate(&repartition_plans);
574        assert_eq!(count, 2);
575    }
576
577    #[test]
578    fn test_collect_allocate_regions() {
579        let table_id = 1024;
580        let mut next_region_number = 10;
581
582        let plan_entries = vec![
583            create_allocation_plan_entry(table_id, &[1], &[(0, 50), (50, 100)]), // 1 allocation
584            create_allocation_plan_entry(table_id, &[2], &[(100, 150), (150, 200)]), // 1 allocation
585        ];
586
587        let repartition_plans = AllocateRegion::convert_to_repartition_plans(
588            table_id,
589            &mut next_region_number,
590            &plan_entries,
591            &create_current_region_routes(table_id, &[1, 2]),
592        )
593        .unwrap();
594
595        let allocate_regions = AllocateRegion::collect_allocate_regions(&repartition_plans);
596        assert_eq!(allocate_regions.len(), 2);
597        assert_eq!(allocate_regions[0].region_id, RegionId::new(table_id, 10));
598        assert_eq!(allocate_regions[1].region_id, RegionId::new(table_id, 11));
599    }
600
601    #[test]
602    fn test_prepare_region_allocation_data() {
603        let table_id = 1024;
604        let regions = [
605            create_region_descriptor(table_id, 10, "x", 0, 50),
606            create_region_descriptor(table_id, 11, "x", 50, 100),
607        ];
608        let region_refs: Vec<&RegionDescriptor> = regions.iter().collect();
609
610        let result = AllocateRegion::prepare_region_allocation_data(&region_refs).unwrap();
611
612        assert_eq!(result.len(), 2);
613        assert_eq!(result[0].0, 10);
614        assert_eq!(result[1].0, 11);
615        // Verify partition expressions are serialized
616        assert!(!result[0].1.is_empty());
617        assert!(!result[1].1.is_empty());
618    }
619
620    #[test]
621    fn test_allocate_region_state_backward_compatibility() {
622        // Arrange
623        let serialized = r#"{"repartition_state":"AllocateRegion","plan_entries":[]}"#;
624
625        // Act
626        let state: Box<dyn State> = serde_json::from_str(serialized).unwrap();
627
628        // Assert
629        let allocate_region = state
630            .as_any()
631            .downcast_ref::<AllocateRegion>()
632            .expect("expected AllocateRegion state");
633        match allocate_region {
634            AllocateRegion::Build(build_plan) => assert!(build_plan.plan_entries.is_empty()),
635            AllocateRegion::Execute(_) => panic!("expected build plan"),
636        }
637    }
638
639    #[test]
640    fn test_allocate_region_state_round_trip() {
641        // Arrange
642        let state: Box<dyn State> = Box::new(AllocateRegion::new(vec![]));
643
644        // Act
645        let serialized = serde_json::to_string(&state).unwrap();
646        let deserialized: Box<dyn State> = serde_json::from_str(&serialized).unwrap();
647
648        // Assert
649        assert_eq!(
650            serialized,
651            r#"{"repartition_state":"AllocateRegion","Build":{"plan_entries":[]}}"#
652        );
653        let allocate_region = deserialized
654            .as_any()
655            .downcast_ref::<AllocateRegion>()
656            .expect("expected AllocateRegion state");
657        match allocate_region {
658            AllocateRegion::Build(build_plan) => assert!(build_plan.plan_entries.is_empty()),
659            AllocateRegion::Execute(_) => panic!("expected build plan"),
660        }
661    }
662
663    #[test]
664    fn test_allocate_region_execute_state_round_trip() {
665        // Arrange
666        let state: Box<dyn State> = Box::new(AllocateRegion::Execute(ExecutePlan));
667
668        // Act
669        let serialized = serde_json::to_string(&state).unwrap();
670        let deserialized: Box<dyn State> = serde_json::from_str(&serialized).unwrap();
671
672        // Assert
673        assert_eq!(
674            serialized,
675            r#"{"repartition_state":"AllocateRegion","Execute":null}"#
676        );
677        let allocate_region = deserialized
678            .as_any()
679            .downcast_ref::<AllocateRegion>()
680            .expect("expected AllocateRegion state");
681        match allocate_region {
682            AllocateRegion::Execute(_) => {}
683            AllocateRegion::Build(_) => panic!("expected execute plan"),
684        }
685    }
686}