Skip to main content

meta_srv/procedure/repartition/
allocate_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::{HashMap, HashSet};
17
18use common_meta::ddl::create_table::executor::CreateTableExecutor;
19use common_meta::ddl::create_table::template::{
20    CreateRequestBuilder, build_template_from_raw_table_info_for_physical_table,
21};
22use common_meta::lock_key::TableLock;
23use common_meta::node_manager::NodeManagerRef;
24use common_meta::rpc::router::RegionRoute;
25use common_procedure::{Context as ProcedureContext, Status};
26use common_telemetry::{debug, info};
27use serde::{Deserialize, Deserializer, Serialize};
28use snafu::ResultExt;
29use store_api::storage::{RegionNumber, TableId};
30use table::metadata::TableInfo;
31use table::table_reference::TableReference;
32use tokio::time::Instant;
33
34use crate::error::{self, Result};
35use crate::procedure::repartition::dispatch::Dispatch;
36use crate::procedure::repartition::plan::{
37    AllocationPlanEntry, RegionDescriptor, RepartitionPlanEntry,
38    convert_allocation_plan_to_repartition_plan,
39};
40use crate::procedure::repartition::{Context, State};
41
42#[derive(Debug, Clone, Serialize)]
43pub enum AllocateRegion {
44    Build(BuildPlan),
45    Execute(ExecutePlan),
46}
47
48impl<'de> Deserialize<'de> for AllocateRegion {
49    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
50    where
51        D: Deserializer<'de>,
52    {
53        #[derive(Deserialize)]
54        enum CurrentAllocateRegion {
55            Build(BuildPlan),
56            Execute(ExecutePlan),
57        }
58
59        #[derive(Deserialize)]
60        struct LegacyAllocateRegion {
61            plan_entries: Vec<AllocationPlanEntry>,
62        }
63
64        #[derive(Deserialize)]
65        #[serde(untagged)]
66        enum AllocateRegionRepr {
67            Current(CurrentAllocateRegion),
68            Legacy(LegacyAllocateRegion),
69        }
70
71        match AllocateRegionRepr::deserialize(deserializer)? {
72            AllocateRegionRepr::Current(CurrentAllocateRegion::Build(build_plan)) => {
73                Ok(Self::Build(build_plan))
74            }
75            AllocateRegionRepr::Current(CurrentAllocateRegion::Execute(execute_plan)) => {
76                Ok(Self::Execute(execute_plan))
77            }
78            AllocateRegionRepr::Legacy(legacy) => Ok(Self::Build(BuildPlan {
79                plan_entries: legacy.plan_entries,
80            })),
81        }
82    }
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct BuildPlan {
87    plan_entries: Vec<AllocationPlanEntry>,
88}
89
90impl BuildPlan {
91    async fn next(
92        &mut self,
93        ctx: &mut Context,
94        _procedure_ctx: &ProcedureContext,
95    ) -> Result<(Box<dyn State>, Status)> {
96        let timer = Instant::now();
97        let table_id = ctx.persistent_ctx.table_id;
98        let table_route_value = ctx.get_table_route_value().await?;
99        let mut next_region_number =
100            AllocateRegion::get_next_region_number(table_route_value.max_region_number().unwrap());
101
102        // Converts allocation plan to repartition plan.
103        let repartition_plan_entries = AllocateRegion::convert_to_repartition_plans(
104            table_id,
105            &mut next_region_number,
106            &self.plan_entries,
107        );
108        let plan_count = repartition_plan_entries.len();
109        let to_allocate = AllocateRegion::count_regions_to_allocate(&repartition_plan_entries);
110        info!(
111            "Repartition allocate regions start, table_id: {}, groups: {}, regions_to_allocate: {}",
112            table_id, plan_count, to_allocate
113        );
114
115        // If no region to allocate, directly dispatch the plan.
116        if AllocateRegion::count_regions_to_allocate(&repartition_plan_entries) == 0 {
117            ctx.persistent_ctx.plans = repartition_plan_entries;
118            ctx.update_allocate_region_elapsed(timer.elapsed());
119            return Ok((Box::new(Dispatch), Status::executing(true)));
120        }
121
122        ctx.persistent_ctx.plans = repartition_plan_entries;
123        debug!(
124            "Repartition allocate regions build plan completed, table_id: {}, elapsed: {:?}",
125            table_id,
126            timer.elapsed()
127        );
128        Ok((
129            Box::new(AllocateRegion::Execute(ExecutePlan)),
130            Status::executing(true),
131        ))
132    }
133}
134
135#[derive(Debug, Clone, Serialize, Deserialize)]
136pub struct ExecutePlan;
137
138impl ExecutePlan {
139    async fn next(
140        &mut self,
141        ctx: &mut Context,
142        procedure_ctx: &ProcedureContext,
143    ) -> Result<(Box<dyn State>, Status)> {
144        let timer = Instant::now();
145        let table_id = ctx.persistent_ctx.table_id;
146        let allocate_regions = AllocateRegion::collect_allocate_regions(&ctx.persistent_ctx.plans);
147        let region_number_and_partition_exprs =
148            AllocateRegion::prepare_region_allocation_data(&allocate_regions)?;
149        let table_info_value = ctx.get_table_info_value().await?;
150        let table_route_value = ctx.get_table_route_value().await?;
151        // Safety: it is physical table route value.
152        let region_routes = table_route_value.region_routes().unwrap();
153        let new_allocated_region_routes = ctx
154            .region_routes_allocator
155            .allocate(
156                table_id,
157                &region_number_and_partition_exprs
158                    .iter()
159                    .map(|(n, p)| (*n, p.as_str()))
160                    .collect::<Vec<_>>(),
161            )
162            .await
163            .context(error::AllocateRegionRoutesSnafu { table_id })?;
164        let wal_options = ctx
165            .wal_options_allocator
166            .allocate(
167                &allocate_regions
168                    .iter()
169                    .map(|r| r.region_id.region_number())
170                    .collect::<Vec<_>>(),
171                table_info_value.table_info.meta.options.skip_wal,
172            )
173            .await
174            .context(error::AllocateWalOptionsSnafu { table_id })?;
175
176        let new_region_count = new_allocated_region_routes.len();
177        let new_regions_brief: Vec<_> = new_allocated_region_routes
178            .iter()
179            .map(|route| {
180                let region_id = route.region.id;
181                let peer = route.leader_peer.as_ref().map(|p| p.id).unwrap_or_default();
182                format!("region_id: {}, peer: {}", region_id, peer)
183            })
184            .collect();
185        info!(
186            "Allocated regions for repartition, table_id: {}, new_region_count: {}, new_regions: {:?}",
187            table_id, new_region_count, new_regions_brief
188        );
189
190        // The table route metadata is not updated yet; register it in memory for region lease renewal.
191        let _operating_guards = Context::register_operating_regions(
192            &ctx.memory_region_keeper,
193            &new_allocated_region_routes,
194        )?;
195        // Allocates the regions on datanodes.
196        AllocateRegion::allocate_regions(
197            &ctx.node_manager,
198            &table_info_value.table_info,
199            &new_allocated_region_routes,
200            &wal_options,
201        )
202        .await?;
203
204        // Updates the table routes.
205        let table_lock = TableLock::Write(table_id).into();
206        let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
207        let new_region_routes =
208            AllocateRegion::generate_region_routes(region_routes, &new_allocated_region_routes);
209        ctx.update_table_route(&table_route_value, new_region_routes, wal_options)
210            .await?;
211        ctx.invalidate_table_cache().await?;
212
213        ctx.update_allocate_region_elapsed(timer.elapsed());
214        Ok((Box::new(Dispatch), Status::executing(true)))
215    }
216}
217
218#[async_trait::async_trait]
219#[typetag::serde]
220impl State for AllocateRegion {
221    async fn next(
222        &mut self,
223        ctx: &mut Context,
224        procedure_ctx: &ProcedureContext,
225    ) -> Result<(Box<dyn State>, Status)> {
226        match self {
227            AllocateRegion::Build(build_plan) => build_plan.next(ctx, procedure_ctx).await,
228            AllocateRegion::Execute(execute_plan) => execute_plan.next(ctx, procedure_ctx).await,
229        }
230    }
231
232    fn as_any(&self) -> &dyn Any {
233        self
234    }
235}
236
237impl AllocateRegion {
238    pub fn new(plan_entries: Vec<AllocationPlanEntry>) -> Self {
239        AllocateRegion::Build(BuildPlan { plan_entries })
240    }
241
242    fn generate_region_routes(
243        region_routes: &[RegionRoute],
244        new_allocated_region_ids: &[RegionRoute],
245    ) -> Vec<RegionRoute> {
246        let region_ids = region_routes
247            .iter()
248            .map(|r| r.region.id)
249            .collect::<HashSet<_>>();
250        let mut new_region_routes = region_routes.to_vec();
251        for new_allocated_region_id in new_allocated_region_ids {
252            if !region_ids.contains(&new_allocated_region_id.region.id) {
253                new_region_routes.push(new_allocated_region_id.clone());
254            }
255        }
256        new_region_routes
257    }
258
259    /// Converts allocation plan entries to repartition plan entries.
260    ///
261    /// This method takes the allocation plan entries and converts them to repartition plan entries,
262    /// updating `next_region_number` for each newly allocated region.
263    fn convert_to_repartition_plans(
264        table_id: TableId,
265        next_region_number: &mut RegionNumber,
266        plan_entries: &[AllocationPlanEntry],
267    ) -> Vec<RepartitionPlanEntry> {
268        plan_entries
269            .iter()
270            .map(|plan_entry| {
271                convert_allocation_plan_to_repartition_plan(
272                    table_id,
273                    next_region_number,
274                    plan_entry,
275                )
276            })
277            .collect()
278    }
279
280    /// Collects all regions that need to be allocated from the repartition plan entries.
281    fn collect_allocate_regions(
282        repartition_plan_entries: &[RepartitionPlanEntry],
283    ) -> Vec<&RegionDescriptor> {
284        repartition_plan_entries
285            .iter()
286            .flat_map(|p| p.allocate_regions())
287            .collect()
288    }
289
290    /// Prepares region allocation data: region numbers and their partition expressions.
291    fn prepare_region_allocation_data(
292        allocate_regions: &[&RegionDescriptor],
293    ) -> Result<Vec<(RegionNumber, String)>> {
294        allocate_regions
295            .iter()
296            .map(|r| {
297                Ok((
298                    r.region_id.region_number(),
299                    r.partition_expr
300                        .as_json_str()
301                        .context(error::SerializePartitionExprSnafu)?,
302                ))
303            })
304            .collect()
305    }
306
307    /// Calculates the total number of regions that need to be allocated.
308    fn count_regions_to_allocate(repartition_plan_entries: &[RepartitionPlanEntry]) -> usize {
309        repartition_plan_entries
310            .iter()
311            .map(|p| p.allocated_region_ids.len())
312            .sum()
313    }
314
315    /// Gets the next region number from the physical table route.
316    fn get_next_region_number(max_region_number: RegionNumber) -> RegionNumber {
317        max_region_number + 1
318    }
319
320    async fn allocate_regions(
321        node_manager: &NodeManagerRef,
322        raw_table_info: &TableInfo,
323        region_routes: &[RegionRoute],
324        wal_options: &HashMap<RegionNumber, String>,
325    ) -> Result<()> {
326        let table_ref = TableReference::full(
327            &raw_table_info.catalog_name,
328            &raw_table_info.schema_name,
329            &raw_table_info.name,
330        );
331        let table_id = raw_table_info.ident.table_id;
332        // Repartition allocation targets physical regions, so exclude metric internal columns
333        // and derive primary keys from tag semantics.
334        let request = build_template_from_raw_table_info_for_physical_table(raw_table_info)
335            .context(error::BuildCreateRequestSnafu { table_id })?;
336        common_telemetry::debug!(
337            "Allocating regions request, table_id: {}, request: {:?}",
338            table_id,
339            request
340        );
341        let builder = CreateRequestBuilder::new(request, None);
342        let region_count = region_routes.len();
343        let wal_region_count = wal_options.len();
344        info!(
345            "Allocating regions on datanodes, table_id: {}, region_count: {}, wal_regions: {}",
346            table_id, region_count, wal_region_count
347        );
348        let executor = CreateTableExecutor::new(table_ref.into(), false, builder);
349        executor
350            .on_create_regions(node_manager, table_id, region_routes, wal_options)
351            .await
352            .context(error::AllocateRegionsSnafu { table_id })?;
353
354        Ok(())
355    }
356}
357
358#[cfg(test)]
359mod tests {
360    use store_api::storage::RegionId;
361    use uuid::Uuid;
362
363    use super::*;
364    use crate::procedure::repartition::State;
365    use crate::procedure::repartition::test_util::range_expr;
366
367    fn create_region_descriptor(
368        table_id: TableId,
369        region_number: u32,
370        col: &str,
371        start: i64,
372        end: i64,
373    ) -> RegionDescriptor {
374        RegionDescriptor {
375            region_id: RegionId::new(table_id, region_number),
376            partition_expr: range_expr(col, start, end),
377        }
378    }
379
380    fn create_allocation_plan_entry(
381        table_id: TableId,
382        source_region_numbers: &[u32],
383        target_ranges: &[(i64, i64)],
384    ) -> AllocationPlanEntry {
385        let source_regions = source_region_numbers
386            .iter()
387            .enumerate()
388            .map(|(i, &n)| {
389                let start = i as i64 * 100;
390                let end = (i + 1) as i64 * 100;
391                create_region_descriptor(table_id, n, "x", start, end)
392            })
393            .collect();
394
395        let target_partition_exprs = target_ranges
396            .iter()
397            .map(|&(start, end)| range_expr("x", start, end))
398            .collect();
399
400        AllocationPlanEntry {
401            group_id: Uuid::new_v4(),
402            source_regions,
403            target_partition_exprs,
404            transition_map: vec![],
405        }
406    }
407
408    #[test]
409    fn test_convert_to_repartition_plans_no_allocation() {
410        let table_id = 1024;
411        let mut next_region_number = 10;
412
413        // 2 source -> 2 target (no allocation needed)
414        let plan_entries = vec![create_allocation_plan_entry(
415            table_id,
416            &[1, 2],
417            &[(0, 50), (50, 200)],
418        )];
419
420        let result = AllocateRegion::convert_to_repartition_plans(
421            table_id,
422            &mut next_region_number,
423            &plan_entries,
424        );
425
426        assert_eq!(result.len(), 1);
427        assert_eq!(result[0].target_regions.len(), 2);
428        assert!(result[0].allocated_region_ids.is_empty());
429        // next_region_number should not change
430        assert_eq!(next_region_number, 10);
431    }
432
433    #[test]
434    fn test_convert_to_repartition_plans_with_allocation() {
435        let table_id = 1024;
436        let mut next_region_number = 10;
437
438        // 2 source -> 4 target (need to allocate 2 regions)
439        let plan_entries = vec![create_allocation_plan_entry(
440            table_id,
441            &[1, 2],
442            &[(0, 50), (50, 100), (100, 150), (150, 200)],
443        )];
444
445        let result = AllocateRegion::convert_to_repartition_plans(
446            table_id,
447            &mut next_region_number,
448            &plan_entries,
449        );
450
451        assert_eq!(result.len(), 1);
452        assert_eq!(result[0].target_regions.len(), 4);
453        assert_eq!(result[0].allocated_region_ids.len(), 2);
454        assert_eq!(
455            result[0].allocated_region_ids[0],
456            RegionId::new(table_id, 10)
457        );
458        assert_eq!(
459            result[0].allocated_region_ids[1],
460            RegionId::new(table_id, 11)
461        );
462        // next_region_number should be incremented by 2
463        assert_eq!(next_region_number, 12);
464    }
465
466    #[test]
467    fn test_convert_to_repartition_plans_multiple_entries() {
468        let table_id = 1024;
469        let mut next_region_number = 10;
470
471        // Multiple plan entries with different allocation needs
472        let plan_entries = vec![
473            create_allocation_plan_entry(table_id, &[1], &[(0, 50), (50, 100)]), // need 1 allocation
474            create_allocation_plan_entry(table_id, &[2, 3], &[(100, 150), (150, 200)]), // no allocation
475            create_allocation_plan_entry(table_id, &[4], &[(200, 250), (250, 300), (300, 400)]), // need 2 allocations
476        ];
477
478        let result = AllocateRegion::convert_to_repartition_plans(
479            table_id,
480            &mut next_region_number,
481            &plan_entries,
482        );
483
484        assert_eq!(result.len(), 3);
485        assert_eq!(result[0].allocated_region_ids.len(), 1);
486        assert_eq!(result[1].allocated_region_ids.len(), 0);
487        assert_eq!(result[2].allocated_region_ids.len(), 2);
488        // next_region_number should be incremented by 3 total
489        assert_eq!(next_region_number, 13);
490    }
491
492    #[test]
493    fn test_count_regions_to_allocate() {
494        let table_id = 1024;
495        let mut next_region_number = 10;
496
497        let plan_entries = vec![
498            create_allocation_plan_entry(table_id, &[1], &[(0, 50), (50, 100)]), // 1 allocation
499            create_allocation_plan_entry(table_id, &[2, 3], &[(100, 200)]), // 0 allocation (deallocate)
500            create_allocation_plan_entry(table_id, &[4], &[(200, 250), (250, 300)]), // 1 allocation
501        ];
502
503        let repartition_plans = AllocateRegion::convert_to_repartition_plans(
504            table_id,
505            &mut next_region_number,
506            &plan_entries,
507        );
508
509        let count = AllocateRegion::count_regions_to_allocate(&repartition_plans);
510        assert_eq!(count, 2);
511    }
512
513    #[test]
514    fn test_collect_allocate_regions() {
515        let table_id = 1024;
516        let mut next_region_number = 10;
517
518        let plan_entries = vec![
519            create_allocation_plan_entry(table_id, &[1], &[(0, 50), (50, 100)]), // 1 allocation
520            create_allocation_plan_entry(table_id, &[2], &[(100, 150), (150, 200)]), // 1 allocation
521        ];
522
523        let repartition_plans = AllocateRegion::convert_to_repartition_plans(
524            table_id,
525            &mut next_region_number,
526            &plan_entries,
527        );
528
529        let allocate_regions = AllocateRegion::collect_allocate_regions(&repartition_plans);
530        assert_eq!(allocate_regions.len(), 2);
531        assert_eq!(allocate_regions[0].region_id, RegionId::new(table_id, 10));
532        assert_eq!(allocate_regions[1].region_id, RegionId::new(table_id, 11));
533    }
534
535    #[test]
536    fn test_prepare_region_allocation_data() {
537        let table_id = 1024;
538        let regions = [
539            create_region_descriptor(table_id, 10, "x", 0, 50),
540            create_region_descriptor(table_id, 11, "x", 50, 100),
541        ];
542        let region_refs: Vec<&RegionDescriptor> = regions.iter().collect();
543
544        let result = AllocateRegion::prepare_region_allocation_data(&region_refs).unwrap();
545
546        assert_eq!(result.len(), 2);
547        assert_eq!(result[0].0, 10);
548        assert_eq!(result[1].0, 11);
549        // Verify partition expressions are serialized
550        assert!(!result[0].1.is_empty());
551        assert!(!result[1].1.is_empty());
552    }
553
554    #[test]
555    fn test_allocate_region_state_backward_compatibility() {
556        // Arrange
557        let serialized = r#"{"repartition_state":"AllocateRegion","plan_entries":[]}"#;
558
559        // Act
560        let state: Box<dyn State> = serde_json::from_str(serialized).unwrap();
561
562        // Assert
563        let allocate_region = state
564            .as_any()
565            .downcast_ref::<AllocateRegion>()
566            .expect("expected AllocateRegion state");
567        match allocate_region {
568            AllocateRegion::Build(build_plan) => assert!(build_plan.plan_entries.is_empty()),
569            AllocateRegion::Execute(_) => panic!("expected build plan"),
570        }
571    }
572
573    #[test]
574    fn test_allocate_region_state_round_trip() {
575        // Arrange
576        let state: Box<dyn State> = Box::new(AllocateRegion::new(vec![]));
577
578        // Act
579        let serialized = serde_json::to_string(&state).unwrap();
580        let deserialized: Box<dyn State> = serde_json::from_str(&serialized).unwrap();
581
582        // Assert
583        assert_eq!(
584            serialized,
585            r#"{"repartition_state":"AllocateRegion","Build":{"plan_entries":[]}}"#
586        );
587        let allocate_region = deserialized
588            .as_any()
589            .downcast_ref::<AllocateRegion>()
590            .expect("expected AllocateRegion state");
591        match allocate_region {
592            AllocateRegion::Build(build_plan) => assert!(build_plan.plan_entries.is_empty()),
593            AllocateRegion::Execute(_) => panic!("expected build plan"),
594        }
595    }
596
597    #[test]
598    fn test_allocate_region_execute_state_round_trip() {
599        // Arrange
600        let state: Box<dyn State> = Box::new(AllocateRegion::Execute(ExecutePlan));
601
602        // Act
603        let serialized = serde_json::to_string(&state).unwrap();
604        let deserialized: Box<dyn State> = serde_json::from_str(&serialized).unwrap();
605
606        // Assert
607        assert_eq!(
608            serialized,
609            r#"{"repartition_state":"AllocateRegion","Execute":null}"#
610        );
611        let allocate_region = deserialized
612            .as_any()
613            .downcast_ref::<AllocateRegion>()
614            .expect("expected AllocateRegion state");
615        match allocate_region {
616            AllocateRegion::Execute(_) => {}
617            AllocateRegion::Build(_) => panic!("expected execute plan"),
618        }
619    }
620}