meta_srv/procedure/repartition/
collect.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16
17use common_procedure::{Context as ProcedureContext, ProcedureId, Status, watcher};
18use common_telemetry::{error, info};
19use serde::{Deserialize, Serialize};
20use snafu::ResultExt;
21
22use crate::error::{RepartitionSubprocedureStateReceiverSnafu, Result};
23use crate::procedure::repartition::deallocate_region::DeallocateRegion;
24use crate::procedure::repartition::group::GroupId;
25use crate::procedure::repartition::{Context, State};
26
27/// Metadata for tracking a dispatched sub-procedure.
28#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
29pub struct ProcedureMeta {
30    /// The index of the plan entry in the parent procedure's plan list.
31    pub plan_index: usize,
32    /// The group id of the repartition group.
33    pub group_id: GroupId,
34    /// The procedure id of the sub-procedure.
35    pub procedure_id: ProcedureId,
36}
37
38/// State for collecting results from dispatched sub-procedures.
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct Collect {
41    /// Sub-procedures that are currently in-flight.
42    pub inflight_procedures: Vec<ProcedureMeta>,
43    /// Sub-procedures that have completed successfully.
44    pub succeeded_procedures: Vec<ProcedureMeta>,
45    /// Sub-procedures that have failed.
46    pub failed_procedures: Vec<ProcedureMeta>,
47    /// Sub-procedures whose state could not be determined.
48    pub unknown_procedures: Vec<ProcedureMeta>,
49}
50
51impl Collect {
52    pub fn new(inflight_procedures: Vec<ProcedureMeta>) -> Self {
53        Self {
54            inflight_procedures,
55            succeeded_procedures: Vec::new(),
56            failed_procedures: Vec::new(),
57            unknown_procedures: Vec::new(),
58        }
59    }
60}
61
62#[async_trait::async_trait]
63#[typetag::serde]
64impl State for Collect {
65    async fn next(
66        &mut self,
67        ctx: &mut Context,
68        procedure_ctx: &ProcedureContext,
69    ) -> Result<(Box<dyn State>, Status)> {
70        let table_id = ctx.persistent_ctx.table_id;
71        for procedure_meta in self.inflight_procedures.iter() {
72            let procedure_id = procedure_meta.procedure_id;
73            let group_id = procedure_meta.group_id;
74            let Some(mut receiver) = procedure_ctx
75                .provider
76                .procedure_state_receiver(procedure_id)
77                .await
78                .context(RepartitionSubprocedureStateReceiverSnafu { procedure_id })?
79            else {
80                error!(
81                    "failed to get procedure state receiver, procedure_id: {}, group_id: {}",
82                    procedure_id, group_id
83                );
84                self.unknown_procedures.push(*procedure_meta);
85                continue;
86            };
87
88            match watcher::wait(&mut receiver).await {
89                Ok(_) => self.succeeded_procedures.push(*procedure_meta),
90                Err(e) => {
91                    error!(e; "failed to wait for repartition subprocedure, procedure_id: {}, group_id: {}", procedure_id, group_id);
92                    self.failed_procedures.push(*procedure_meta);
93                }
94            }
95        }
96
97        let inflight = self.inflight_procedures.len();
98        let succeeded = self.succeeded_procedures.len();
99        let failed = self.failed_procedures.len();
100        let unknown = self.unknown_procedures.len();
101        info!(
102            "Collected repartition group results for table_id: {}, inflight: {}, succeeded: {}, failed: {}, unknown: {}",
103            table_id, inflight, succeeded, failed, unknown
104        );
105
106        if failed > 0 || unknown > 0 {
107            // TODO(weny): retry the failed or unknown procedures.
108        }
109
110        if let Some(start_time) = ctx.volatile_ctx.dispatch_start_time.take() {
111            ctx.update_finish_groups_elapsed(start_time.elapsed());
112        }
113
114        Ok((Box::new(DeallocateRegion), Status::executing(true)))
115    }
116
117    fn as_any(&self) -> &dyn Any {
118        self
119    }
120}