Skip to main content

meta_srv/procedure/repartition/
collect.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16
17use common_procedure::{Context as ProcedureContext, ProcedureId, Status, watcher};
18use common_telemetry::{error, info};
19use serde::{Deserialize, Serialize};
20use snafu::ResultExt;
21
22use crate::error::{RepartitionSubprocedureStateReceiverSnafu, Result};
23use crate::procedure::repartition::deallocate_region::DeallocateRegion;
24use crate::procedure::repartition::group::GroupId;
25use crate::procedure::repartition::{Context, State};
26
27/// Metadata for tracking a dispatched sub-procedure.
28#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
29pub struct ProcedureMeta {
30    /// The index of the plan entry in the parent procedure's plan list.
31    pub plan_index: usize,
32    /// The group id of the repartition group.
33    pub group_id: GroupId,
34    /// The procedure id of the sub-procedure.
35    pub procedure_id: ProcedureId,
36}
37
38/// State for collecting results from dispatched sub-procedures.
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct Collect {
41    /// Sub-procedures that are currently in-flight.
42    pub inflight_procedures: Vec<ProcedureMeta>,
43    /// Sub-procedures that have completed successfully.
44    pub succeeded_procedures: Vec<ProcedureMeta>,
45    /// Sub-procedures that have failed.
46    pub failed_procedures: Vec<ProcedureMeta>,
47    /// Sub-procedures whose state could not be determined.
48    pub unknown_procedures: Vec<ProcedureMeta>,
49}
50
51impl Collect {
52    pub fn new(inflight_procedures: Vec<ProcedureMeta>) -> Self {
53        Self {
54            inflight_procedures,
55            succeeded_procedures: Vec::new(),
56            failed_procedures: Vec::new(),
57            unknown_procedures: Vec::new(),
58        }
59    }
60}
61
62#[async_trait::async_trait]
63#[typetag::serde]
64impl State for Collect {
65    async fn next(
66        &mut self,
67        ctx: &mut Context,
68        procedure_ctx: &ProcedureContext,
69    ) -> Result<(Box<dyn State>, Status)> {
70        let table_id = ctx.persistent_ctx.table_id;
71        for procedure_meta in self.inflight_procedures.iter() {
72            let procedure_id = procedure_meta.procedure_id;
73            let group_id = procedure_meta.group_id;
74            let Some(mut receiver) = procedure_ctx
75                .provider
76                .procedure_state_receiver(procedure_id)
77                .await
78                .context(RepartitionSubprocedureStateReceiverSnafu { procedure_id })?
79            else {
80                error!(
81                    "failed to get procedure state receiver, procedure_id: {}, group_id: {}",
82                    procedure_id, group_id
83                );
84                self.unknown_procedures.push(*procedure_meta);
85                continue;
86            };
87
88            match watcher::wait(&mut receiver).await {
89                Ok(_) => self.succeeded_procedures.push(*procedure_meta),
90                Err(e) => {
91                    error!(e; "failed to wait for repartition subprocedure, procedure_id: {}, group_id: {}", procedure_id, group_id);
92                    self.failed_procedures.push(*procedure_meta);
93                }
94            }
95        }
96
97        let succeeded = self.succeeded_procedures.len();
98        let failed = self.failed_procedures.len();
99        let unknown = self.unknown_procedures.len();
100        info!(
101            "Collected repartition group results for table_id: {}, succeeded: {}, failed: {}, unknown: {}",
102            table_id, succeeded, failed, unknown
103        );
104
105        if failed > 0 || unknown > 0 {
106            ctx.persistent_ctx
107                .failed_procedures
108                .extend(self.failed_procedures.iter());
109            ctx.persistent_ctx
110                .unknown_procedures
111                .extend(self.unknown_procedures.iter());
112            return crate::error::UnexpectedSnafu {
113                violated: format!(
114                    "Repartition groups failed or became unknown, table_id: {}, failed: {}, unknown: {}",
115                    table_id, failed, unknown
116                ),
117            }
118            .fail();
119        }
120
121        if let Some(start_time) = ctx.volatile_ctx.dispatch_start_time.take() {
122            ctx.update_finish_groups_elapsed(start_time.elapsed());
123        }
124
125        Ok((Box::new(DeallocateRegion), Status::executing(true)))
126    }
127
128    fn as_any(&self) -> &dyn Any {
129        self
130    }
131}
132
133#[cfg(test)]
134mod tests {
135    use std::sync::Arc;
136
137    use common_error::mock::MockError;
138    use common_error::status_code::StatusCode;
139    use common_meta::test_util::MockDatanodeManager;
140    use common_procedure::{
141        Context as ProcedureContext, ContextProvider, Error as ProcedureError, ProcedureId,
142        ProcedureState,
143    };
144    use common_procedure_test::MockContextProvider;
145    use tokio::sync::watch;
146
147    use super::*;
148    use crate::procedure::repartition::PersistentContext;
149    use crate::procedure::repartition::test_util::TestingEnv;
150
151    struct FailedProcedureContextProvider {
152        receiver: watch::Receiver<ProcedureState>,
153        inner: MockContextProvider,
154    }
155
156    #[async_trait::async_trait]
157    impl ContextProvider for FailedProcedureContextProvider {
158        async fn procedure_state(
159            &self,
160            procedure_id: ProcedureId,
161        ) -> common_procedure::Result<Option<ProcedureState>> {
162            self.inner.procedure_state(procedure_id).await
163        }
164
165        async fn procedure_state_receiver(
166            &self,
167            _procedure_id: ProcedureId,
168        ) -> common_procedure::Result<Option<watch::Receiver<ProcedureState>>> {
169            Ok(Some(self.receiver.clone()))
170        }
171
172        async fn try_put_poison(
173            &self,
174            key: &common_procedure::PoisonKey,
175            procedure_id: ProcedureId,
176        ) -> common_procedure::Result<()> {
177            self.inner.try_put_poison(key, procedure_id).await
178        }
179
180        async fn acquire_lock(
181            &self,
182            key: &common_procedure::StringKey,
183        ) -> common_procedure::local::DynamicKeyLockGuard {
184            self.inner.acquire_lock(key).await
185        }
186    }
187
188    #[tokio::test]
189    async fn test_collect_returns_error_when_unknown_exists() {
190        let env = TestingEnv::new();
191        let ddl_ctx = env.ddl_context(Arc::new(MockDatanodeManager::new(())));
192        let persistent_ctx = PersistentContext::new(
193            table::table_name::TableName::new("test_catalog", "test_schema", "test_table"),
194            1024,
195            None,
196        );
197        let mut ctx = crate::procedure::repartition::Context::new(
198            &ddl_ctx,
199            env.mailbox_ctx.mailbox().clone(),
200            env.server_addr.clone(),
201            persistent_ctx,
202        );
203        let mut state = Collect {
204            inflight_procedures: vec![],
205            succeeded_procedures: vec![],
206            failed_procedures: vec![],
207            unknown_procedures: vec![ProcedureMeta {
208                plan_index: 0,
209                group_id: uuid::Uuid::new_v4(),
210                procedure_id: common_procedure::ProcedureId::random(),
211            }],
212        };
213
214        let err = state
215            .next(&mut ctx, &TestingEnv::procedure_context())
216            .await
217            .unwrap_err();
218
219        assert!(!err.is_retryable());
220    }
221
222    #[tokio::test]
223    async fn test_collect_returns_error_when_failed_exists() {
224        let env = TestingEnv::new();
225        let ddl_ctx = env.ddl_context(Arc::new(MockDatanodeManager::new(())));
226        let persistent_ctx = PersistentContext::new(
227            table::table_name::TableName::new("test_catalog", "test_schema", "test_table"),
228            1024,
229            None,
230        );
231        let mut ctx = crate::procedure::repartition::Context::new(
232            &ddl_ctx,
233            env.mailbox_ctx.mailbox().clone(),
234            env.server_addr.clone(),
235            persistent_ctx,
236        );
237        let procedure_id = common_procedure::ProcedureId::random();
238        let (tx, rx) = watch::channel(ProcedureState::Running);
239        tx.send(ProcedureState::failed(Arc::new(ProcedureError::external(
240            MockError::new(StatusCode::Internal),
241        ))))
242        .unwrap();
243        let procedure_ctx = ProcedureContext {
244            procedure_id: ProcedureId::random(),
245            provider: Arc::new(FailedProcedureContextProvider {
246                receiver: rx,
247                inner: MockContextProvider::default(),
248            }),
249        };
250        let mut state = Collect {
251            inflight_procedures: vec![ProcedureMeta {
252                plan_index: 0,
253                group_id: uuid::Uuid::new_v4(),
254                procedure_id,
255            }],
256            succeeded_procedures: vec![],
257            failed_procedures: vec![],
258            unknown_procedures: vec![],
259        };
260
261        let err = state.next(&mut ctx, &procedure_ctx).await.unwrap_err();
262
263        assert_eq!(state.failed_procedures.len(), 1);
264        assert_eq!(state.unknown_procedures.len(), 0);
265        assert!(!err.is_retryable());
266    }
267}