common_meta/ddl/
drop_flow.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod metadata;
16use api::v1::flow::{flow_request, DropRequest, FlowRequest};
17use async_trait::async_trait;
18use common_catalog::format_full_flow_name;
19use common_error::ext::ErrorExt;
20use common_error::status_code::StatusCode;
21use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
22use common_procedure::{
23    Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
24};
25use common_telemetry::info;
26use futures::future::join_all;
27use serde::{Deserialize, Serialize};
28use snafu::{ensure, ResultExt};
29use strum::AsRefStr;
30
31use crate::cache_invalidator::Context;
32use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error};
33use crate::ddl::DdlContext;
34use crate::error::{self, Result};
35use crate::flow_name::FlowName;
36use crate::instruction::{CacheIdent, DropFlow};
37use crate::key::flow::flow_info::FlowInfoValue;
38use crate::key::flow::flow_route::FlowRouteValue;
39use crate::lock_key::{CatalogLock, FlowLock};
40use crate::metrics;
41use crate::rpc::ddl::DropFlowTask;
42
43/// The procedure for dropping a flow.
44pub struct DropFlowProcedure {
45    /// The context of procedure runtime.
46    pub(crate) context: DdlContext,
47    /// The serializable data.
48    pub(crate) data: DropFlowData,
49}
50
51impl DropFlowProcedure {
52    pub const TYPE_NAME: &'static str = "metasrv-procedure::DropFlow";
53
54    pub fn new(task: DropFlowTask, context: DdlContext) -> Self {
55        Self {
56            context,
57            data: DropFlowData {
58                state: DropFlowState::Prepare,
59                task,
60                flow_info_value: None,
61                flow_route_values: vec![],
62            },
63        }
64    }
65
66    pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
67        let data: DropFlowData = serde_json::from_str(json).context(FromJsonSnafu)?;
68
69        Ok(Self { context, data })
70    }
71
72    /// Checks whether flow exists.
73    /// - Early returns if flow not exists and `drop_if_exists` is `true`.
74    /// - Throws an error if flow not exists and `drop_if_exists` is `false`.
75    pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
76        let catalog_name = &self.data.task.catalog_name;
77        let flow_name = &self.data.task.flow_name;
78        let exists = self
79            .context
80            .flow_metadata_manager
81            .flow_name_manager()
82            .exists(catalog_name, flow_name)
83            .await?;
84
85        if !exists && self.data.task.drop_if_exists {
86            return Ok(Status::done());
87        }
88
89        ensure!(
90            exists,
91            error::FlowNotFoundSnafu {
92                flow_name: format_full_flow_name(catalog_name, flow_name)
93            }
94        );
95
96        self.fill_flow_metadata().await?;
97        self.data.state = DropFlowState::DeleteMetadata;
98        Ok(Status::executing(true))
99    }
100
101    async fn on_flownode_drop_flows(&self) -> Result<Status> {
102        // Safety: checked
103        let flownode_ids = &self.data.flow_info_value.as_ref().unwrap().flownode_ids;
104        let flow_id = self.data.task.flow_id;
105        let mut drop_flow_tasks = Vec::with_capacity(flownode_ids.len());
106
107        for FlowRouteValue { peer } in &self.data.flow_route_values {
108            let requester = self.context.node_manager.flownode(peer).await;
109            let request = FlowRequest {
110                body: Some(flow_request::Body::Drop(DropRequest {
111                    flow_id: Some(api::v1::FlowId { id: flow_id }),
112                })),
113                ..Default::default()
114            };
115
116            drop_flow_tasks.push(async move {
117                if let Err(err) = requester.handle(request).await {
118                    if err.status_code() != StatusCode::FlowNotFound {
119                        return Err(add_peer_context_if_needed(peer.clone())(err));
120                    }
121                }
122                Ok(())
123            });
124        }
125
126        join_all(drop_flow_tasks)
127            .await
128            .into_iter()
129            .collect::<Result<Vec<_>>>()?;
130
131        Ok(Status::done())
132    }
133
134    async fn on_delete_metadata(&mut self) -> Result<Status> {
135        let flow_id = self.data.task.flow_id;
136        self.context
137            .flow_metadata_manager
138            .destroy_flow_metadata(
139                flow_id,
140                // Safety: checked
141                self.data.flow_info_value.as_ref().unwrap(),
142            )
143            .await?;
144        info!("Deleted flow metadata for flow {flow_id}");
145        self.data.state = DropFlowState::InvalidateFlowCache;
146        Ok(Status::executing(true))
147    }
148
149    async fn on_broadcast(&mut self) -> Result<Status> {
150        let flow_id = self.data.task.flow_id;
151        let ctx = Context {
152            subject: Some("Invalidate flow cache by dropping flow".to_string()),
153        };
154        let flow_info_value = self.data.flow_info_value.as_ref().unwrap();
155
156        self.context
157            .cache_invalidator
158            .invalidate(
159                &ctx,
160                &[
161                    CacheIdent::FlowId(flow_id),
162                    CacheIdent::FlowName(FlowName {
163                        catalog_name: flow_info_value.catalog_name.to_string(),
164                        flow_name: flow_info_value.flow_name.to_string(),
165                    }),
166                    CacheIdent::DropFlow(DropFlow {
167                        source_table_ids: flow_info_value.source_table_ids.clone(),
168                        flownode_ids: flow_info_value.flownode_ids.values().cloned().collect(),
169                    }),
170                ],
171            )
172            .await?;
173        self.data.state = DropFlowState::DropFlows;
174        Ok(Status::executing(true))
175    }
176}
177
178#[async_trait]
179impl Procedure for DropFlowProcedure {
180    fn type_name(&self) -> &str {
181        Self::TYPE_NAME
182    }
183
184    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
185        let state = &self.data.state;
186        let _timer = metrics::METRIC_META_PROCEDURE_DROP_FLOW
187            .with_label_values(&[state.as_ref()])
188            .start_timer();
189
190        match self.data.state {
191            DropFlowState::Prepare => self.on_prepare().await,
192            DropFlowState::DeleteMetadata => self.on_delete_metadata().await,
193            DropFlowState::InvalidateFlowCache => self.on_broadcast().await,
194            DropFlowState::DropFlows => self.on_flownode_drop_flows().await,
195        }
196        .map_err(handle_retry_error)
197    }
198
199    fn dump(&self) -> ProcedureResult<String> {
200        serde_json::to_string(&self.data).context(ToJsonSnafu)
201    }
202
203    fn lock_key(&self) -> LockKey {
204        let catalog_name = &self.data.task.catalog_name;
205        let flow_id = self.data.task.flow_id;
206
207        let lock_key = vec![
208            CatalogLock::Read(catalog_name).into(),
209            FlowLock::Write(flow_id).into(),
210        ];
211
212        LockKey::new(lock_key)
213    }
214}
215
216/// The serializable data
217#[derive(Debug, Serialize, Deserialize)]
218pub(crate) struct DropFlowData {
219    state: DropFlowState,
220    task: DropFlowTask,
221    pub(crate) flow_info_value: Option<FlowInfoValue>,
222    pub(crate) flow_route_values: Vec<FlowRouteValue>,
223}
224
225/// The state of drop flow
226#[derive(Debug, Serialize, Deserialize, AsRefStr, PartialEq)]
227enum DropFlowState {
228    /// Prepares to drop the flow
229    Prepare,
230    /// Deletes metadata
231    DeleteMetadata,
232    /// Invalidate flow cache
233    InvalidateFlowCache,
234    /// Drop flows on flownode
235    DropFlows,
236    // TODO(weny): support to rollback
237}