common_meta/ddl/
drop_flow.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod metadata;
16
17use api::v1::flow::{flow_request, DropRequest, FlowRequest};
18use async_trait::async_trait;
19use common_catalog::format_full_flow_name;
20use common_error::ext::ErrorExt;
21use common_error::status_code::StatusCode;
22use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
23use common_procedure::{
24    Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
25};
26use common_telemetry::info;
27use futures::future::join_all;
28use serde::{Deserialize, Serialize};
29use snafu::{ensure, ResultExt};
30use strum::AsRefStr;
31
32use crate::cache_invalidator::Context;
33use crate::ddl::utils::{add_peer_context_if_needed, map_to_procedure_error};
34use crate::ddl::DdlContext;
35use crate::error::{self, Result};
36use crate::flow_name::FlowName;
37use crate::instruction::{CacheIdent, DropFlow};
38use crate::key::flow::flow_info::FlowInfoValue;
39use crate::key::flow::flow_route::FlowRouteValue;
40use crate::lock_key::{CatalogLock, FlowLock};
41use crate::metrics;
42use crate::rpc::ddl::DropFlowTask;
43
44/// The procedure for dropping a flow.
45pub struct DropFlowProcedure {
46    /// The context of procedure runtime.
47    pub(crate) context: DdlContext,
48    /// The serializable data.
49    pub(crate) data: DropFlowData,
50}
51
52impl DropFlowProcedure {
53    pub const TYPE_NAME: &'static str = "metasrv-procedure::DropFlow";
54
55    pub fn new(task: DropFlowTask, context: DdlContext) -> Self {
56        Self {
57            context,
58            data: DropFlowData {
59                state: DropFlowState::Prepare,
60                task,
61                flow_info_value: None,
62                flow_route_values: vec![],
63            },
64        }
65    }
66
67    pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
68        let data: DropFlowData = serde_json::from_str(json).context(FromJsonSnafu)?;
69
70        Ok(Self { context, data })
71    }
72
73    /// Checks whether flow exists.
74    /// - Early returns if flow not exists and `drop_if_exists` is `true`.
75    /// - Throws an error if flow not exists and `drop_if_exists` is `false`.
76    pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
77        let catalog_name = &self.data.task.catalog_name;
78        let flow_name = &self.data.task.flow_name;
79        let exists = self
80            .context
81            .flow_metadata_manager
82            .flow_name_manager()
83            .exists(catalog_name, flow_name)
84            .await?;
85
86        if !exists && self.data.task.drop_if_exists {
87            return Ok(Status::done());
88        }
89
90        ensure!(
91            exists,
92            error::FlowNotFoundSnafu {
93                flow_name: format_full_flow_name(catalog_name, flow_name)
94            }
95        );
96
97        self.fill_flow_metadata().await?;
98        self.data.state = DropFlowState::DeleteMetadata;
99        Ok(Status::executing(true))
100    }
101
102    async fn on_flownode_drop_flows(&self) -> Result<Status> {
103        // Safety: checked
104        let flownode_ids = &self.data.flow_info_value.as_ref().unwrap().flownode_ids;
105        let flow_id = self.data.task.flow_id;
106        let mut drop_flow_tasks = Vec::with_capacity(flownode_ids.len());
107
108        for FlowRouteValue { peer } in &self.data.flow_route_values {
109            let requester = self.context.node_manager.flownode(peer).await;
110            let request = FlowRequest {
111                body: Some(flow_request::Body::Drop(DropRequest {
112                    flow_id: Some(api::v1::FlowId { id: flow_id }),
113                })),
114                ..Default::default()
115            };
116
117            drop_flow_tasks.push(async move {
118                if let Err(err) = requester.handle(request).await {
119                    if err.status_code() != StatusCode::FlowNotFound {
120                        return Err(add_peer_context_if_needed(peer.clone())(err));
121                    }
122                }
123                Ok(())
124            });
125        }
126
127        join_all(drop_flow_tasks)
128            .await
129            .into_iter()
130            .collect::<Result<Vec<_>>>()?;
131
132        Ok(Status::done())
133    }
134
135    async fn on_delete_metadata(&mut self) -> Result<Status> {
136        let flow_id = self.data.task.flow_id;
137        self.context
138            .flow_metadata_manager
139            .destroy_flow_metadata(
140                flow_id,
141                // Safety: checked
142                self.data.flow_info_value.as_ref().unwrap(),
143            )
144            .await?;
145        info!("Deleted flow metadata for flow {flow_id}");
146        self.data.state = DropFlowState::InvalidateFlowCache;
147        Ok(Status::executing(true))
148    }
149
150    async fn on_broadcast(&mut self) -> Result<Status> {
151        let flow_id = self.data.task.flow_id;
152        let ctx = Context {
153            subject: Some("Invalidate flow cache by dropping flow".to_string()),
154        };
155        let flow_info_value = self.data.flow_info_value.as_ref().unwrap();
156
157        let flow_part2nodes = flow_info_value
158            .flownode_ids()
159            .clone()
160            .into_iter()
161            .collect::<Vec<_>>();
162
163        self.context
164            .cache_invalidator
165            .invalidate(
166                &ctx,
167                &[
168                    CacheIdent::FlowId(flow_id),
169                    CacheIdent::FlowName(FlowName {
170                        catalog_name: flow_info_value.catalog_name.to_string(),
171                        flow_name: flow_info_value.flow_name.to_string(),
172                    }),
173                    CacheIdent::DropFlow(DropFlow {
174                        flow_id,
175                        source_table_ids: flow_info_value.source_table_ids.clone(),
176                        flow_part2node_id: flow_part2nodes,
177                    }),
178                ],
179            )
180            .await?;
181        self.data.state = DropFlowState::DropFlows;
182        Ok(Status::executing(true))
183    }
184}
185
186#[async_trait]
187impl Procedure for DropFlowProcedure {
188    fn type_name(&self) -> &str {
189        Self::TYPE_NAME
190    }
191
192    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
193        let state = &self.data.state;
194        let _timer = metrics::METRIC_META_PROCEDURE_DROP_FLOW
195            .with_label_values(&[state.as_ref()])
196            .start_timer();
197
198        match self.data.state {
199            DropFlowState::Prepare => self.on_prepare().await,
200            DropFlowState::DeleteMetadata => self.on_delete_metadata().await,
201            DropFlowState::InvalidateFlowCache => self.on_broadcast().await,
202            DropFlowState::DropFlows => self.on_flownode_drop_flows().await,
203        }
204        .map_err(map_to_procedure_error)
205    }
206
207    fn dump(&self) -> ProcedureResult<String> {
208        serde_json::to_string(&self.data).context(ToJsonSnafu)
209    }
210
211    fn lock_key(&self) -> LockKey {
212        let catalog_name = &self.data.task.catalog_name;
213        let flow_id = self.data.task.flow_id;
214
215        let lock_key = vec![
216            CatalogLock::Read(catalog_name).into(),
217            FlowLock::Write(flow_id).into(),
218        ];
219
220        LockKey::new(lock_key)
221    }
222}
223
224/// The serializable data
225#[derive(Debug, Serialize, Deserialize)]
226pub(crate) struct DropFlowData {
227    state: DropFlowState,
228    task: DropFlowTask,
229    pub(crate) flow_info_value: Option<FlowInfoValue>,
230    pub(crate) flow_route_values: Vec<FlowRouteValue>,
231}
232
233/// The state of drop flow
234#[derive(Debug, Serialize, Deserialize, AsRefStr, PartialEq)]
235enum DropFlowState {
236    /// Prepares to drop the flow
237    Prepare,
238    /// Deletes metadata
239    DeleteMetadata,
240    /// Invalidate flow cache
241    InvalidateFlowCache,
242    /// Drop flows on flownode
243    DropFlows,
244    // TODO(weny): support to rollback
245}