meta_srv/procedure/repartition/
collect.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16
17use common_procedure::{Context as ProcedureContext, ProcedureId, Status, watcher};
18use common_telemetry::error;
19use serde::{Deserialize, Serialize};
20use snafu::ResultExt;
21
22use crate::error::{RepartitionSubprocedureStateReceiverSnafu, Result};
23use crate::procedure::repartition::deallocate_region::DeallocateRegion;
24use crate::procedure::repartition::group::GroupId;
25use crate::procedure::repartition::{Context, State};
26
27/// Metadata for tracking a dispatched sub-procedure.
28#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
29pub struct ProcedureMeta {
30    /// The index of the plan entry in the parent procedure's plan list.
31    pub plan_index: usize,
32    /// The group id of the repartition group.
33    pub group_id: GroupId,
34    /// The procedure id of the sub-procedure.
35    pub procedure_id: ProcedureId,
36}
37
38/// State for collecting results from dispatched sub-procedures.
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct Collect {
41    /// Sub-procedures that are currently in-flight.
42    pub inflight_procedures: Vec<ProcedureMeta>,
43    /// Sub-procedures that have completed successfully.
44    pub succeeded_procedures: Vec<ProcedureMeta>,
45    /// Sub-procedures that have failed.
46    pub failed_procedures: Vec<ProcedureMeta>,
47    /// Sub-procedures whose state could not be determined.
48    pub unknown_procedures: Vec<ProcedureMeta>,
49}
50
51impl Collect {
52    pub fn new(inflight_procedures: Vec<ProcedureMeta>) -> Self {
53        Self {
54            inflight_procedures,
55            succeeded_procedures: Vec::new(),
56            failed_procedures: Vec::new(),
57            unknown_procedures: Vec::new(),
58        }
59    }
60}
61
62#[async_trait::async_trait]
63#[typetag::serde]
64impl State for Collect {
65    async fn next(
66        &mut self,
67        _ctx: &mut Context,
68        procedure_ctx: &ProcedureContext,
69    ) -> Result<(Box<dyn State>, Status)> {
70        for procedure_meta in self.inflight_procedures.iter() {
71            let procedure_id = procedure_meta.procedure_id;
72            let group_id = procedure_meta.group_id;
73            let Some(mut receiver) = procedure_ctx
74                .provider
75                .procedure_state_receiver(procedure_id)
76                .await
77                .context(RepartitionSubprocedureStateReceiverSnafu { procedure_id })?
78            else {
79                error!(
80                    "failed to get procedure state receiver, procedure_id: {}, group_id: {}",
81                    procedure_id, group_id
82                );
83                self.unknown_procedures.push(*procedure_meta);
84                continue;
85            };
86
87            match watcher::wait(&mut receiver).await {
88                Ok(_) => self.succeeded_procedures.push(*procedure_meta),
89                Err(e) => {
90                    error!(e; "failed to wait for repartition subprocedure, procedure_id: {}, group_id: {}", procedure_id, group_id);
91                    self.failed_procedures.push(*procedure_meta);
92                }
93            }
94        }
95
96        if !self.failed_procedures.is_empty() || !self.unknown_procedures.is_empty() {
97            // TODO(weny): retry the failed or unknown procedures.
98        }
99
100        Ok((Box::new(DeallocateRegion), Status::executing(true)))
101    }
102
103    fn as_any(&self) -> &dyn Any {
104        self
105    }
106}