meta_srv/procedure/repartition/
dispatch.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16
17use common_procedure::{Context as ProcedureContext, ProcedureWithId, Status};
18use serde::{Deserialize, Serialize};
19
20use crate::error::Result;
21use crate::procedure::repartition::collect::{Collect, ProcedureMeta};
22use crate::procedure::repartition::group::RepartitionGroupProcedure;
23use crate::procedure::repartition::{self, Context, State};
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct Dispatch;
27
28#[async_trait::async_trait]
29#[typetag::serde]
30impl State for Dispatch {
31    async fn next(
32        &mut self,
33        ctx: &mut Context,
34        _procedure_ctx: &ProcedureContext,
35    ) -> Result<(Box<dyn State>, Status)> {
36        let table_id = ctx.persistent_ctx.table_id;
37        let mut procedures = Vec::with_capacity(ctx.persistent_ctx.plans.len());
38        let mut procedure_metas = Vec::with_capacity(ctx.persistent_ctx.plans.len());
39        for (plan_index, plan) in ctx.persistent_ctx.plans.iter().enumerate() {
40            let persistent_ctx = repartition::group::PersistentContext::new(
41                plan.group_id,
42                table_id,
43                plan.source_regions.clone(),
44                plan.target_regions.clone(),
45            );
46
47            let group_procedure = RepartitionGroupProcedure::new(persistent_ctx, ctx);
48            let procedure = ProcedureWithId::with_random_id(Box::new(group_procedure));
49            procedure_metas.push(ProcedureMeta {
50                plan_index,
51                group_id: plan.group_id,
52                procedure_id: procedure.id,
53            });
54            procedures.push(procedure);
55        }
56
57        Ok((
58            Box::new(Collect::new(procedure_metas)),
59            Status::suspended(procedures, true),
60        ))
61    }
62
63    fn as_any(&self) -> &dyn Any {
64        self
65    }
66}