meta_srv/procedure/repartition/
dispatch.rs1use std::any::Any;
16use std::collections::HashMap;
17use std::time::Instant;
18
19use common_procedure::{Context as ProcedureContext, ProcedureWithId, Status};
20use common_telemetry::info;
21use serde::{Deserialize, Serialize};
22use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
23use store_api::storage::RegionId;
24
25use crate::error::Result;
26use crate::procedure::repartition::collect::{Collect, ProcedureMeta};
27use crate::procedure::repartition::group::RepartitionGroupProcedure;
28use crate::procedure::repartition::plan::RegionDescriptor;
29use crate::procedure::repartition::{self, Context, State};
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct Dispatch;
33
34fn build_region_mapping(
35 source_regions: &[RegionDescriptor],
36 target_regions: &[RegionDescriptor],
37 transition_map: &[Vec<usize>],
38) -> HashMap<RegionId, Vec<RegionId>> {
39 transition_map
40 .iter()
41 .enumerate()
42 .map(|(source_idx, indices)| {
43 let source_region = source_regions[source_idx].region_id;
44 let target_regions = indices
45 .iter()
46 .map(|&target_idx| target_regions[target_idx].region_id)
47 .collect::<Vec<_>>();
48 (source_region, target_regions)
49 })
50 .collect::<HashMap<RegionId, _>>()
51}
52
53#[async_trait::async_trait]
54#[typetag::serde]
55impl State for Dispatch {
56 async fn next(
57 &mut self,
58 ctx: &mut Context,
59 _procedure_ctx: &ProcedureContext,
60 ) -> Result<(Box<dyn State>, Status)> {
61 ctx.volatile_ctx.dispatch_start_time = Some(Instant::now());
62 let table_id = ctx.persistent_ctx.table_id;
63 let table_info_value = ctx.get_table_info_value().await?;
64 let table_engine = table_info_value.table_info.meta.engine;
65 let sync_region = table_engine == METRIC_ENGINE_NAME;
66 let plan_count = ctx.persistent_ctx.plans.len();
67 let mut procedures = Vec::with_capacity(plan_count);
68 let mut procedure_metas = Vec::with_capacity(plan_count);
69 for (plan_index, plan) in ctx.persistent_ctx.plans.iter().enumerate() {
70 let region_mapping = build_region_mapping(
71 &plan.source_regions,
72 &plan.target_regions,
73 &plan.transition_map,
74 );
75 let persistent_ctx = repartition::group::PersistentContext::new(
76 plan.group_id,
77 table_id,
78 ctx.persistent_ctx.catalog_name.clone(),
79 ctx.persistent_ctx.schema_name.clone(),
80 plan.source_regions.clone(),
81 plan.target_regions.clone(),
82 region_mapping,
83 sync_region,
84 plan.allocated_region_ids.clone(),
85 plan.pending_deallocate_region_ids.clone(),
86 ctx.persistent_ctx.timeout,
87 );
88
89 let group_procedure = RepartitionGroupProcedure::new(persistent_ctx, ctx);
90 let procedure = ProcedureWithId::with_random_id(Box::new(group_procedure));
91 procedure_metas.push(ProcedureMeta {
92 plan_index,
93 group_id: plan.group_id,
94 procedure_id: procedure.id,
95 });
96 procedures.push(procedure);
97 }
98
99 let group_ids: Vec<_> = procedure_metas.iter().map(|m| m.group_id).collect();
100 info!(
101 "Dispatch repartition groups for table_id: {}, group_count: {}, group_ids: {:?}",
102 table_id,
103 group_ids.len(),
104 group_ids
105 );
106
107 Ok((
108 Box::new(Collect::new(procedure_metas)),
109 Status::suspended(procedures, true),
110 ))
111 }
112
113 fn as_any(&self) -> &dyn Any {
114 self
115 }
116}