1mod metadata;
16
17use std::collections::BTreeMap;
18use std::fmt;
19
20use api::v1::flow::flow_request::Body as PbFlowRequest;
21use api::v1::flow::{CreateRequest, FlowRequest, FlowRequestHeader};
22use api::v1::ExpireAfter;
23use async_trait::async_trait;
24use common_catalog::format_full_flow_name;
25use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
26use common_procedure::{
27 Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
28};
29use common_telemetry::info;
30use common_telemetry::tracing_context::TracingContext;
31use futures::future::join_all;
32use itertools::Itertools;
33use serde::{Deserialize, Serialize};
34use snafu::{ensure, ResultExt};
35use strum::AsRefStr;
36use table::metadata::TableId;
37
38use crate::cache_invalidator::Context;
39use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error};
40use crate::ddl::DdlContext;
41use crate::error::{self, Result, UnexpectedSnafu};
42use crate::instruction::{CacheIdent, CreateFlow};
43use crate::key::flow::flow_info::FlowInfoValue;
44use crate::key::flow::flow_route::FlowRouteValue;
45use crate::key::table_name::TableNameKey;
46use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId};
47use crate::lock_key::{CatalogLock, FlowNameLock, TableNameLock};
48use crate::metrics;
49use crate::peer::Peer;
50use crate::rpc::ddl::{CreateFlowTask, QueryContext};
51
52pub struct CreateFlowProcedure {
54 pub context: DdlContext,
55 pub data: CreateFlowData,
56}
57
58impl CreateFlowProcedure {
59 pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateFlow";
60
61 pub fn new(task: CreateFlowTask, query_context: QueryContext, context: DdlContext) -> Self {
63 Self {
64 context,
65 data: CreateFlowData {
66 task,
67 flow_id: None,
68 peers: vec![],
69 source_table_ids: vec![],
70 query_context,
71 state: CreateFlowState::Prepare,
72 prev_flow_info_value: None,
73 flow_type: None,
74 },
75 }
76 }
77
78 pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
80 let data = serde_json::from_str(json).context(FromJsonSnafu)?;
81 Ok(CreateFlowProcedure { context, data })
82 }
83
84 pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
85 let catalog_name = &self.data.task.catalog_name;
86 let flow_name = &self.data.task.flow_name;
87 let sink_table_name = &self.data.task.sink_table_name;
88 let create_if_not_exists = self.data.task.create_if_not_exists;
89 let or_replace = self.data.task.or_replace;
90
91 let flow_name_value = self
92 .context
93 .flow_metadata_manager
94 .flow_name_manager()
95 .get(catalog_name, flow_name)
96 .await?;
97
98 if create_if_not_exists && or_replace {
99 return error::UnsupportedSnafu {
101 operation: "Create flow with both `IF NOT EXISTS` and `OR REPLACE`",
102 }
103 .fail();
104 }
105
106 if let Some(value) = flow_name_value {
107 ensure!(
108 create_if_not_exists || or_replace,
109 error::FlowAlreadyExistsSnafu {
110 flow_name: format_full_flow_name(catalog_name, flow_name),
111 }
112 );
113
114 let flow_id = value.flow_id();
115 if create_if_not_exists {
116 info!("Flow already exists, flow_id: {}", flow_id);
117 return Ok(Status::done_with_output(flow_id));
118 }
119
120 let flow_id = value.flow_id();
121 let peers = self
122 .context
123 .flow_metadata_manager
124 .flow_route_manager()
125 .routes(flow_id)
126 .await?
127 .into_iter()
128 .map(|(_, value)| value.peer)
129 .collect::<Vec<_>>();
130 self.data.flow_id = Some(flow_id);
131 self.data.peers = peers;
132 info!("Replacing flow, flow_id: {}", flow_id);
133
134 let flow_info_value = self
135 .context
136 .flow_metadata_manager
137 .flow_info_manager()
138 .get_raw(flow_id)
139 .await?;
140
141 ensure!(
142 flow_info_value.is_some(),
143 error::FlowNotFoundSnafu {
144 flow_name: format_full_flow_name(catalog_name, flow_name),
145 }
146 );
147
148 self.data.prev_flow_info_value = flow_info_value;
149 }
150
151 let exists = self
153 .context
154 .table_metadata_manager
155 .table_name_manager()
156 .exists(TableNameKey::new(
157 &sink_table_name.catalog_name,
158 &sink_table_name.schema_name,
159 &sink_table_name.table_name,
160 ))
161 .await?;
162 if exists {
165 common_telemetry::warn!("Table already exists, table: {}", sink_table_name);
166 }
167
168 self.collect_source_tables().await?;
169 if self.data.flow_id.is_none() {
170 self.allocate_flow_id().await?;
171 }
172 self.data.state = CreateFlowState::CreateFlows;
173 self.data.flow_type = Some(get_flow_type_from_options(&self.data.task)?);
175
176 Ok(Status::executing(true))
177 }
178
179 async fn on_flownode_create_flows(&mut self) -> Result<Status> {
180 let mut create_flow = Vec::with_capacity(self.data.peers.len());
182 for peer in &self.data.peers {
183 let requester = self.context.node_manager.flownode(peer).await;
184 let request = FlowRequest {
185 header: Some(FlowRequestHeader {
186 tracing_context: TracingContext::from_current_span().to_w3c(),
187 query_context: Some(self.data.query_context.clone().into()),
188 }),
189 body: Some(PbFlowRequest::Create((&self.data).into())),
190 };
191 create_flow.push(async move {
192 requester
193 .handle(request)
194 .await
195 .map_err(add_peer_context_if_needed(peer.clone()))
196 });
197 }
198 info!(
199 "Creating flow({:?}, type={:?}) on flownodes with peers={:?}",
200 self.data.flow_id, self.data.flow_type, self.data.peers
201 );
202 join_all(create_flow)
203 .await
204 .into_iter()
205 .collect::<Result<Vec<_>>>()?;
206
207 self.data.state = CreateFlowState::CreateMetadata;
208 Ok(Status::executing(true))
209 }
210
211 async fn on_create_metadata(&mut self) -> Result<Status> {
216 let flow_id = self.data.flow_id.unwrap();
218 let (flow_info, flow_routes) = (&self.data).into();
219 if let Some(prev_flow_value) = self.data.prev_flow_info_value.as_ref()
220 && self.data.task.or_replace
221 {
222 self.context
223 .flow_metadata_manager
224 .update_flow_metadata(flow_id, prev_flow_value, &flow_info, flow_routes)
225 .await?;
226 info!("Replaced flow metadata for flow {flow_id}");
227 } else {
228 self.context
229 .flow_metadata_manager
230 .create_flow_metadata(flow_id, flow_info, flow_routes)
231 .await?;
232 info!("Created flow metadata for flow {flow_id}");
233 }
234
235 self.data.state = CreateFlowState::InvalidateFlowCache;
236 Ok(Status::executing(true))
237 }
238
239 async fn on_broadcast(&mut self) -> Result<Status> {
240 debug_assert!(self.data.state == CreateFlowState::InvalidateFlowCache);
241 let flow_id = self.data.flow_id.unwrap();
243 let ctx = Context {
244 subject: Some("Invalidate flow cache by creating flow".to_string()),
245 };
246
247 self.context
248 .cache_invalidator
249 .invalidate(
250 &ctx,
251 &[
252 CacheIdent::CreateFlow(CreateFlow {
253 source_table_ids: self.data.source_table_ids.clone(),
254 flownodes: self.data.peers.clone(),
255 }),
256 CacheIdent::FlowId(flow_id),
257 ],
258 )
259 .await?;
260
261 Ok(Status::done_with_output(flow_id))
262 }
263}
264
265#[async_trait]
266impl Procedure for CreateFlowProcedure {
267 fn type_name(&self) -> &str {
268 Self::TYPE_NAME
269 }
270
271 async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
272 let state = &self.data.state;
273
274 let _timer = metrics::METRIC_META_PROCEDURE_CREATE_FLOW
275 .with_label_values(&[state.as_ref()])
276 .start_timer();
277
278 match state {
279 CreateFlowState::Prepare => self.on_prepare().await,
280 CreateFlowState::CreateFlows => self.on_flownode_create_flows().await,
281 CreateFlowState::CreateMetadata => self.on_create_metadata().await,
282 CreateFlowState::InvalidateFlowCache => self.on_broadcast().await,
283 }
284 .map_err(handle_retry_error)
285 }
286
287 fn dump(&self) -> ProcedureResult<String> {
288 serde_json::to_string(&self.data).context(ToJsonSnafu)
289 }
290
291 fn lock_key(&self) -> LockKey {
292 let catalog_name = &self.data.task.catalog_name;
293 let flow_name = &self.data.task.flow_name;
294 let sink_table_name = &self.data.task.sink_table_name;
295
296 LockKey::new(vec![
297 CatalogLock::Read(catalog_name).into(),
298 TableNameLock::new(
299 &sink_table_name.catalog_name,
300 &sink_table_name.schema_name,
301 &sink_table_name.catalog_name,
302 )
303 .into(),
304 FlowNameLock::new(catalog_name, flow_name).into(),
305 ])
306 }
307}
308
309pub fn get_flow_type_from_options(flow_task: &CreateFlowTask) -> Result<FlowType> {
310 let flow_type = flow_task
311 .flow_options
312 .get(FlowType::FLOW_TYPE_KEY)
313 .map(|s| s.as_str());
314 match flow_type {
315 Some(FlowType::BATCHING) => Ok(FlowType::Batching),
316 Some(FlowType::STREAMING) => Ok(FlowType::Streaming),
317 Some(unknown) => UnexpectedSnafu {
318 err_msg: format!("Unknown flow type: {}", unknown),
319 }
320 .fail(),
321 None => Ok(FlowType::Batching),
322 }
323}
324
325#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)]
327pub enum CreateFlowState {
328 Prepare,
330 CreateFlows,
332 InvalidateFlowCache,
334 CreateMetadata,
336}
337
338#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
340pub enum FlowType {
341 Batching,
343 Streaming,
345}
346
347impl FlowType {
348 pub const BATCHING: &str = "batching";
349 pub const STREAMING: &str = "streaming";
350 pub const FLOW_TYPE_KEY: &str = "flow_type";
351}
352
353impl Default for FlowType {
354 fn default() -> Self {
355 Self::Batching
356 }
357}
358
359impl fmt::Display for FlowType {
360 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
361 match self {
362 FlowType::Batching => write!(f, "{}", FlowType::BATCHING),
363 FlowType::Streaming => write!(f, "{}", FlowType::STREAMING),
364 }
365 }
366}
367
368#[derive(Debug, Serialize, Deserialize)]
370pub struct CreateFlowData {
371 pub(crate) state: CreateFlowState,
372 pub(crate) task: CreateFlowTask,
373 pub(crate) flow_id: Option<FlowId>,
374 pub(crate) peers: Vec<Peer>,
375 pub(crate) source_table_ids: Vec<TableId>,
376 pub(crate) query_context: QueryContext,
377 pub(crate) prev_flow_info_value: Option<DeserializedValueWithBytes<FlowInfoValue>>,
380 pub(crate) flow_type: Option<FlowType>,
381}
382
383impl From<&CreateFlowData> for CreateRequest {
384 fn from(value: &CreateFlowData) -> Self {
385 let flow_id = value.flow_id.unwrap();
386 let source_table_ids = &value.source_table_ids;
387
388 let mut req = CreateRequest {
389 flow_id: Some(api::v1::FlowId { id: flow_id }),
390 source_table_ids: source_table_ids
391 .iter()
392 .map(|table_id| api::v1::TableId { id: *table_id })
393 .collect_vec(),
394 sink_table_name: Some(value.task.sink_table_name.clone().into()),
395 create_if_not_exists: true,
397 or_replace: value.task.or_replace,
398 expire_after: value.task.expire_after.map(|value| ExpireAfter { value }),
399 comment: value.task.comment.clone(),
400 sql: value.task.sql.clone(),
401 flow_options: value.task.flow_options.clone(),
402 };
403
404 let flow_type = value.flow_type.unwrap_or_default().to_string();
405 req.flow_options
406 .insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type);
407 req
408 }
409}
410
411impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteValue)>) {
412 fn from(value: &CreateFlowData) -> Self {
413 let CreateFlowTask {
414 catalog_name,
415 flow_name,
416 sink_table_name,
417 expire_after,
418 comment,
419 sql,
420 flow_options: mut options,
421 ..
422 } = value.task.clone();
423
424 let flownode_ids = value
425 .peers
426 .iter()
427 .enumerate()
428 .map(|(idx, peer)| (idx as u32, peer.id))
429 .collect::<BTreeMap<_, _>>();
430 let flow_routes = value
431 .peers
432 .iter()
433 .enumerate()
434 .map(|(idx, peer)| (idx as u32, FlowRouteValue { peer: peer.clone() }))
435 .collect::<Vec<_>>();
436
437 let flow_type = value.flow_type.unwrap_or_default().to_string();
438 options.insert("flow_type".to_string(), flow_type);
439
440 let mut create_time = chrono::Utc::now();
441 if let Some(prev_flow_value) = value.prev_flow_info_value.as_ref()
442 && value.task.or_replace
443 {
444 create_time = prev_flow_value.get_inner_ref().created_time;
445 }
446
447 let flow_info: FlowInfoValue = FlowInfoValue {
448 source_table_ids: value.source_table_ids.clone(),
449 sink_table_name,
450 flownode_ids,
451 catalog_name,
452 query_context: Some(value.query_context.clone()),
453 flow_name,
454 raw_sql: sql,
455 expire_after,
456 comment,
457 options,
458 created_time: create_time,
459 updated_time: chrono::Utc::now(),
460 };
461
462 (flow_info, flow_routes)
463 }
464}