common_meta/ddl/
drop_flow.rs1mod metadata;
16
17use api::v1::flow::{flow_request, DropRequest, FlowRequest};
18use async_trait::async_trait;
19use common_catalog::format_full_flow_name;
20use common_error::ext::ErrorExt;
21use common_error::status_code::StatusCode;
22use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
23use common_procedure::{
24 Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
25};
26use common_telemetry::info;
27use futures::future::join_all;
28use serde::{Deserialize, Serialize};
29use snafu::{ensure, ResultExt};
30use strum::AsRefStr;
31
32use crate::cache_invalidator::Context;
33use crate::ddl::utils::{add_peer_context_if_needed, map_to_procedure_error};
34use crate::ddl::DdlContext;
35use crate::error::{self, Result};
36use crate::flow_name::FlowName;
37use crate::instruction::{CacheIdent, DropFlow};
38use crate::key::flow::flow_info::FlowInfoValue;
39use crate::key::flow::flow_route::FlowRouteValue;
40use crate::lock_key::{CatalogLock, FlowLock};
41use crate::metrics;
42use crate::rpc::ddl::DropFlowTask;
43
44pub struct DropFlowProcedure {
46 pub(crate) context: DdlContext,
48 pub(crate) data: DropFlowData,
50}
51
52impl DropFlowProcedure {
53 pub const TYPE_NAME: &'static str = "metasrv-procedure::DropFlow";
54
55 pub fn new(task: DropFlowTask, context: DdlContext) -> Self {
56 Self {
57 context,
58 data: DropFlowData {
59 state: DropFlowState::Prepare,
60 task,
61 flow_info_value: None,
62 flow_route_values: vec![],
63 },
64 }
65 }
66
67 pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
68 let data: DropFlowData = serde_json::from_str(json).context(FromJsonSnafu)?;
69
70 Ok(Self { context, data })
71 }
72
73 pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
77 let catalog_name = &self.data.task.catalog_name;
78 let flow_name = &self.data.task.flow_name;
79 let exists = self
80 .context
81 .flow_metadata_manager
82 .flow_name_manager()
83 .exists(catalog_name, flow_name)
84 .await?;
85
86 if !exists && self.data.task.drop_if_exists {
87 return Ok(Status::done());
88 }
89
90 ensure!(
91 exists,
92 error::FlowNotFoundSnafu {
93 flow_name: format_full_flow_name(catalog_name, flow_name)
94 }
95 );
96
97 self.fill_flow_metadata().await?;
98 self.data.state = DropFlowState::DeleteMetadata;
99 Ok(Status::executing(true))
100 }
101
102 async fn on_flownode_drop_flows(&self) -> Result<Status> {
103 let flownode_ids = &self.data.flow_info_value.as_ref().unwrap().flownode_ids;
105 let flow_id = self.data.task.flow_id;
106 let mut drop_flow_tasks = Vec::with_capacity(flownode_ids.len());
107
108 for FlowRouteValue { peer } in &self.data.flow_route_values {
109 let requester = self.context.node_manager.flownode(peer).await;
110 let request = FlowRequest {
111 body: Some(flow_request::Body::Drop(DropRequest {
112 flow_id: Some(api::v1::FlowId { id: flow_id }),
113 })),
114 ..Default::default()
115 };
116
117 drop_flow_tasks.push(async move {
118 if let Err(err) = requester.handle(request).await {
119 if err.status_code() != StatusCode::FlowNotFound {
120 return Err(add_peer_context_if_needed(peer.clone())(err));
121 }
122 }
123 Ok(())
124 });
125 }
126
127 join_all(drop_flow_tasks)
128 .await
129 .into_iter()
130 .collect::<Result<Vec<_>>>()?;
131
132 Ok(Status::done())
133 }
134
135 async fn on_delete_metadata(&mut self) -> Result<Status> {
136 let flow_id = self.data.task.flow_id;
137 self.context
138 .flow_metadata_manager
139 .destroy_flow_metadata(
140 flow_id,
141 self.data.flow_info_value.as_ref().unwrap(),
143 )
144 .await?;
145 info!("Deleted flow metadata for flow {flow_id}");
146 self.data.state = DropFlowState::InvalidateFlowCache;
147 Ok(Status::executing(true))
148 }
149
150 async fn on_broadcast(&mut self) -> Result<Status> {
151 let flow_id = self.data.task.flow_id;
152 let ctx = Context {
153 subject: Some("Invalidate flow cache by dropping flow".to_string()),
154 };
155 let flow_info_value = self.data.flow_info_value.as_ref().unwrap();
156
157 let flow_part2nodes = flow_info_value
158 .flownode_ids()
159 .clone()
160 .into_iter()
161 .collect::<Vec<_>>();
162
163 self.context
164 .cache_invalidator
165 .invalidate(
166 &ctx,
167 &[
168 CacheIdent::FlowId(flow_id),
169 CacheIdent::FlowName(FlowName {
170 catalog_name: flow_info_value.catalog_name.to_string(),
171 flow_name: flow_info_value.flow_name.to_string(),
172 }),
173 CacheIdent::DropFlow(DropFlow {
174 flow_id,
175 source_table_ids: flow_info_value.source_table_ids.clone(),
176 flow_part2node_id: flow_part2nodes,
177 }),
178 ],
179 )
180 .await?;
181 self.data.state = DropFlowState::DropFlows;
182 Ok(Status::executing(true))
183 }
184}
185
186#[async_trait]
187impl Procedure for DropFlowProcedure {
188 fn type_name(&self) -> &str {
189 Self::TYPE_NAME
190 }
191
192 async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
193 let state = &self.data.state;
194 let _timer = metrics::METRIC_META_PROCEDURE_DROP_FLOW
195 .with_label_values(&[state.as_ref()])
196 .start_timer();
197
198 match self.data.state {
199 DropFlowState::Prepare => self.on_prepare().await,
200 DropFlowState::DeleteMetadata => self.on_delete_metadata().await,
201 DropFlowState::InvalidateFlowCache => self.on_broadcast().await,
202 DropFlowState::DropFlows => self.on_flownode_drop_flows().await,
203 }
204 .map_err(map_to_procedure_error)
205 }
206
207 fn dump(&self) -> ProcedureResult<String> {
208 serde_json::to_string(&self.data).context(ToJsonSnafu)
209 }
210
211 fn lock_key(&self) -> LockKey {
212 let catalog_name = &self.data.task.catalog_name;
213 let flow_id = self.data.task.flow_id;
214
215 let lock_key = vec![
216 CatalogLock::Read(catalog_name).into(),
217 FlowLock::Write(flow_id).into(),
218 ];
219
220 LockKey::new(lock_key)
221 }
222}
223
224#[derive(Debug, Serialize, Deserialize)]
226pub(crate) struct DropFlowData {
227 state: DropFlowState,
228 task: DropFlowTask,
229 pub(crate) flow_info_value: Option<FlowInfoValue>,
230 pub(crate) flow_route_values: Vec<FlowRouteValue>,
231}
232
233#[derive(Debug, Serialize, Deserialize, AsRefStr, PartialEq)]
235enum DropFlowState {
236 Prepare,
238 DeleteMetadata,
240 InvalidateFlowCache,
242 DropFlows,
244 }