common_meta/ddl/
drop_flow.rs1mod metadata;
16use api::v1::flow::{flow_request, DropRequest, FlowRequest};
17use async_trait::async_trait;
18use common_catalog::format_full_flow_name;
19use common_error::ext::ErrorExt;
20use common_error::status_code::StatusCode;
21use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
22use common_procedure::{
23 Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
24};
25use common_telemetry::info;
26use futures::future::join_all;
27use serde::{Deserialize, Serialize};
28use snafu::{ensure, ResultExt};
29use strum::AsRefStr;
30
31use crate::cache_invalidator::Context;
32use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error};
33use crate::ddl::DdlContext;
34use crate::error::{self, Result};
35use crate::flow_name::FlowName;
36use crate::instruction::{CacheIdent, DropFlow};
37use crate::key::flow::flow_info::FlowInfoValue;
38use crate::key::flow::flow_route::FlowRouteValue;
39use crate::lock_key::{CatalogLock, FlowLock};
40use crate::metrics;
41use crate::rpc::ddl::DropFlowTask;
42
43pub struct DropFlowProcedure {
45 pub(crate) context: DdlContext,
47 pub(crate) data: DropFlowData,
49}
50
51impl DropFlowProcedure {
52 pub const TYPE_NAME: &'static str = "metasrv-procedure::DropFlow";
53
54 pub fn new(task: DropFlowTask, context: DdlContext) -> Self {
55 Self {
56 context,
57 data: DropFlowData {
58 state: DropFlowState::Prepare,
59 task,
60 flow_info_value: None,
61 flow_route_values: vec![],
62 },
63 }
64 }
65
66 pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
67 let data: DropFlowData = serde_json::from_str(json).context(FromJsonSnafu)?;
68
69 Ok(Self { context, data })
70 }
71
72 pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
76 let catalog_name = &self.data.task.catalog_name;
77 let flow_name = &self.data.task.flow_name;
78 let exists = self
79 .context
80 .flow_metadata_manager
81 .flow_name_manager()
82 .exists(catalog_name, flow_name)
83 .await?;
84
85 if !exists && self.data.task.drop_if_exists {
86 return Ok(Status::done());
87 }
88
89 ensure!(
90 exists,
91 error::FlowNotFoundSnafu {
92 flow_name: format_full_flow_name(catalog_name, flow_name)
93 }
94 );
95
96 self.fill_flow_metadata().await?;
97 self.data.state = DropFlowState::DeleteMetadata;
98 Ok(Status::executing(true))
99 }
100
101 async fn on_flownode_drop_flows(&self) -> Result<Status> {
102 let flownode_ids = &self.data.flow_info_value.as_ref().unwrap().flownode_ids;
104 let flow_id = self.data.task.flow_id;
105 let mut drop_flow_tasks = Vec::with_capacity(flownode_ids.len());
106
107 for FlowRouteValue { peer } in &self.data.flow_route_values {
108 let requester = self.context.node_manager.flownode(peer).await;
109 let request = FlowRequest {
110 body: Some(flow_request::Body::Drop(DropRequest {
111 flow_id: Some(api::v1::FlowId { id: flow_id }),
112 })),
113 ..Default::default()
114 };
115
116 drop_flow_tasks.push(async move {
117 if let Err(err) = requester.handle(request).await {
118 if err.status_code() != StatusCode::FlowNotFound {
119 return Err(add_peer_context_if_needed(peer.clone())(err));
120 }
121 }
122 Ok(())
123 });
124 }
125
126 join_all(drop_flow_tasks)
127 .await
128 .into_iter()
129 .collect::<Result<Vec<_>>>()?;
130
131 Ok(Status::done())
132 }
133
134 async fn on_delete_metadata(&mut self) -> Result<Status> {
135 let flow_id = self.data.task.flow_id;
136 self.context
137 .flow_metadata_manager
138 .destroy_flow_metadata(
139 flow_id,
140 self.data.flow_info_value.as_ref().unwrap(),
142 )
143 .await?;
144 info!("Deleted flow metadata for flow {flow_id}");
145 self.data.state = DropFlowState::InvalidateFlowCache;
146 Ok(Status::executing(true))
147 }
148
149 async fn on_broadcast(&mut self) -> Result<Status> {
150 let flow_id = self.data.task.flow_id;
151 let ctx = Context {
152 subject: Some("Invalidate flow cache by dropping flow".to_string()),
153 };
154 let flow_info_value = self.data.flow_info_value.as_ref().unwrap();
155
156 self.context
157 .cache_invalidator
158 .invalidate(
159 &ctx,
160 &[
161 CacheIdent::FlowId(flow_id),
162 CacheIdent::FlowName(FlowName {
163 catalog_name: flow_info_value.catalog_name.to_string(),
164 flow_name: flow_info_value.flow_name.to_string(),
165 }),
166 CacheIdent::DropFlow(DropFlow {
167 source_table_ids: flow_info_value.source_table_ids.clone(),
168 flownode_ids: flow_info_value.flownode_ids.values().cloned().collect(),
169 }),
170 ],
171 )
172 .await?;
173 self.data.state = DropFlowState::DropFlows;
174 Ok(Status::executing(true))
175 }
176}
177
178#[async_trait]
179impl Procedure for DropFlowProcedure {
180 fn type_name(&self) -> &str {
181 Self::TYPE_NAME
182 }
183
184 async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
185 let state = &self.data.state;
186 let _timer = metrics::METRIC_META_PROCEDURE_DROP_FLOW
187 .with_label_values(&[state.as_ref()])
188 .start_timer();
189
190 match self.data.state {
191 DropFlowState::Prepare => self.on_prepare().await,
192 DropFlowState::DeleteMetadata => self.on_delete_metadata().await,
193 DropFlowState::InvalidateFlowCache => self.on_broadcast().await,
194 DropFlowState::DropFlows => self.on_flownode_drop_flows().await,
195 }
196 .map_err(handle_retry_error)
197 }
198
199 fn dump(&self) -> ProcedureResult<String> {
200 serde_json::to_string(&self.data).context(ToJsonSnafu)
201 }
202
203 fn lock_key(&self) -> LockKey {
204 let catalog_name = &self.data.task.catalog_name;
205 let flow_id = self.data.task.flow_id;
206
207 let lock_key = vec![
208 CatalogLock::Read(catalog_name).into(),
209 FlowLock::Write(flow_id).into(),
210 ];
211
212 LockKey::new(lock_key)
213 }
214}
215
216#[derive(Debug, Serialize, Deserialize)]
218pub(crate) struct DropFlowData {
219 state: DropFlowState,
220 task: DropFlowTask,
221 pub(crate) flow_info_value: Option<FlowInfoValue>,
222 pub(crate) flow_route_values: Vec<FlowRouteValue>,
223}
224
225#[derive(Debug, Serialize, Deserialize, AsRefStr, PartialEq)]
227enum DropFlowState {
228 Prepare,
230 DeleteMetadata,
232 InvalidateFlowCache,
234 DropFlows,
236 }