meta_srv/procedure/repartition/
dispatch.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::HashMap;
17use std::time::Instant;
18
19use common_procedure::{Context as ProcedureContext, ProcedureWithId, Status};
20use common_telemetry::info;
21use serde::{Deserialize, Serialize};
22use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
23use store_api::storage::RegionId;
24
25use crate::error::Result;
26use crate::procedure::repartition::collect::{Collect, ProcedureMeta};
27use crate::procedure::repartition::group::RepartitionGroupProcedure;
28use crate::procedure::repartition::plan::RegionDescriptor;
29use crate::procedure::repartition::{self, Context, State};
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct Dispatch;
33
34fn build_region_mapping(
35    source_regions: &[RegionDescriptor],
36    target_regions: &[RegionDescriptor],
37    transition_map: &[Vec<usize>],
38) -> HashMap<RegionId, Vec<RegionId>> {
39    transition_map
40        .iter()
41        .enumerate()
42        .map(|(source_idx, indices)| {
43            let source_region = source_regions[source_idx].region_id;
44            let target_regions = indices
45                .iter()
46                .map(|&target_idx| target_regions[target_idx].region_id)
47                .collect::<Vec<_>>();
48            (source_region, target_regions)
49        })
50        .collect::<HashMap<RegionId, _>>()
51}
52
53#[async_trait::async_trait]
54#[typetag::serde]
55impl State for Dispatch {
56    async fn next(
57        &mut self,
58        ctx: &mut Context,
59        _procedure_ctx: &ProcedureContext,
60    ) -> Result<(Box<dyn State>, Status)> {
61        ctx.volatile_ctx.dispatch_start_time = Some(Instant::now());
62        let table_id = ctx.persistent_ctx.table_id;
63        let table_info_value = ctx.get_table_info_value().await?;
64        let table_engine = table_info_value.table_info.meta.engine;
65        let sync_region = table_engine == METRIC_ENGINE_NAME;
66        let plan_count = ctx.persistent_ctx.plans.len();
67        let mut procedures = Vec::with_capacity(plan_count);
68        let mut procedure_metas = Vec::with_capacity(plan_count);
69        for (plan_index, plan) in ctx.persistent_ctx.plans.iter().enumerate() {
70            let region_mapping = build_region_mapping(
71                &plan.source_regions,
72                &plan.target_regions,
73                &plan.transition_map,
74            );
75            let persistent_ctx = repartition::group::PersistentContext::new(
76                plan.group_id,
77                table_id,
78                ctx.persistent_ctx.catalog_name.clone(),
79                ctx.persistent_ctx.schema_name.clone(),
80                plan.source_regions.clone(),
81                plan.target_regions.clone(),
82                region_mapping,
83                sync_region,
84                plan.allocated_region_ids.clone(),
85                plan.pending_deallocate_region_ids.clone(),
86                ctx.persistent_ctx.timeout,
87            );
88
89            let group_procedure = RepartitionGroupProcedure::new(persistent_ctx, ctx);
90            let procedure = ProcedureWithId::with_random_id(Box::new(group_procedure));
91            procedure_metas.push(ProcedureMeta {
92                plan_index,
93                group_id: plan.group_id,
94                procedure_id: procedure.id,
95            });
96            procedures.push(procedure);
97        }
98
99        let group_ids: Vec<_> = procedure_metas.iter().map(|m| m.group_id).collect();
100        info!(
101            "Dispatch repartition groups for table_id: {}, group_count: {}, group_ids: {:?}",
102            table_id,
103            group_ids.len(),
104            group_ids
105        );
106
107        Ok((
108            Box::new(Collect::new(procedure_metas)),
109            Status::suspended(procedures, true),
110        ))
111    }
112
113    fn as_any(&self) -> &dyn Any {
114        self
115    }
116}