1mod metadata;
16
17use std::collections::BTreeMap;
18use std::fmt;
19
20use api::v1::flow::flow_request::Body as PbFlowRequest;
21use api::v1::flow::{CreateRequest, FlowRequest, FlowRequestHeader};
22use api::v1::ExpireAfter;
23use async_trait::async_trait;
24use common_catalog::format_full_flow_name;
25use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
26use common_procedure::{
27 Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
28};
29use common_telemetry::info;
30use common_telemetry::tracing_context::TracingContext;
31use futures::future::join_all;
32use itertools::Itertools;
33use serde::{Deserialize, Serialize};
34use snafu::{ensure, ResultExt};
35use strum::AsRefStr;
36use table::metadata::TableId;
37
38use crate::cache_invalidator::Context;
39use crate::ddl::utils::{add_peer_context_if_needed, map_to_procedure_error};
40use crate::ddl::DdlContext;
41use crate::error::{self, Result, UnexpectedSnafu};
42use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
43use crate::key::flow::flow_info::FlowInfoValue;
44use crate::key::flow::flow_route::FlowRouteValue;
45use crate::key::table_name::TableNameKey;
46use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId};
47use crate::lock_key::{CatalogLock, FlowNameLock, TableNameLock};
48use crate::metrics;
49use crate::peer::Peer;
50use crate::rpc::ddl::{CreateFlowTask, FlowQueryContext, QueryContext};
51
52pub struct CreateFlowProcedure {
54 pub context: DdlContext,
55 pub data: CreateFlowData,
56}
57
58impl CreateFlowProcedure {
59 pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateFlow";
60
61 pub fn new(task: CreateFlowTask, query_context: QueryContext, context: DdlContext) -> Self {
63 Self {
64 context,
65 data: CreateFlowData {
66 task,
67 flow_id: None,
68 peers: vec![],
69 source_table_ids: vec![],
70 flow_context: query_context.into(), state: CreateFlowState::Prepare,
72 prev_flow_info_value: None,
73 did_replace: false,
74 flow_type: None,
75 },
76 }
77 }
78
79 pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
81 let data = serde_json::from_str(json).context(FromJsonSnafu)?;
82 Ok(CreateFlowProcedure { context, data })
83 }
84
85 pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
86 let catalog_name = &self.data.task.catalog_name;
87 let flow_name = &self.data.task.flow_name;
88 let sink_table_name = &self.data.task.sink_table_name;
89 let create_if_not_exists = self.data.task.create_if_not_exists;
90 let or_replace = self.data.task.or_replace;
91
92 let flow_name_value = self
93 .context
94 .flow_metadata_manager
95 .flow_name_manager()
96 .get(catalog_name, flow_name)
97 .await?;
98
99 if create_if_not_exists && or_replace {
100 return error::UnsupportedSnafu {
102 operation: "Create flow with both `IF NOT EXISTS` and `OR REPLACE`",
103 }
104 .fail();
105 }
106
107 if let Some(value) = flow_name_value {
108 ensure!(
109 create_if_not_exists || or_replace,
110 error::FlowAlreadyExistsSnafu {
111 flow_name: format_full_flow_name(catalog_name, flow_name),
112 }
113 );
114
115 let flow_id = value.flow_id();
116 if create_if_not_exists {
117 info!("Flow already exists, flow_id: {}", flow_id);
118 return Ok(Status::done_with_output(flow_id));
119 }
120
121 let flow_id = value.flow_id();
122 let peers = self
123 .context
124 .flow_metadata_manager
125 .flow_route_manager()
126 .routes(flow_id)
127 .await?
128 .into_iter()
129 .map(|(_, value)| value.peer)
130 .collect::<Vec<_>>();
131 self.data.flow_id = Some(flow_id);
132 self.data.peers = peers;
133 info!("Replacing flow, flow_id: {}", flow_id);
134
135 let flow_info_value = self
136 .context
137 .flow_metadata_manager
138 .flow_info_manager()
139 .get_raw(flow_id)
140 .await?;
141
142 ensure!(
143 flow_info_value.is_some(),
144 error::FlowNotFoundSnafu {
145 flow_name: format_full_flow_name(catalog_name, flow_name),
146 }
147 );
148
149 self.data.prev_flow_info_value = flow_info_value;
150 }
151
152 let exists = self
154 .context
155 .table_metadata_manager
156 .table_name_manager()
157 .exists(TableNameKey::new(
158 &sink_table_name.catalog_name,
159 &sink_table_name.schema_name,
160 &sink_table_name.table_name,
161 ))
162 .await?;
163 if exists {
166 common_telemetry::warn!("Table already exists, table: {}", sink_table_name);
167 }
168
169 self.collect_source_tables().await?;
170
171 let sink_table_name = &self.data.task.sink_table_name;
173 if self
174 .data
175 .task
176 .source_table_names
177 .iter()
178 .any(|source| source == sink_table_name)
179 {
180 return error::UnsupportedSnafu {
181 operation: format!(
182 "Creating flow with source and sink table being the same: {}",
183 sink_table_name
184 ),
185 }
186 .fail();
187 }
188
189 if self.data.flow_id.is_none() {
190 self.allocate_flow_id().await?;
191 }
192 self.data.state = CreateFlowState::CreateFlows;
193 self.data.flow_type = Some(get_flow_type_from_options(&self.data.task)?);
195
196 Ok(Status::executing(true))
197 }
198
199 async fn on_flownode_create_flows(&mut self) -> Result<Status> {
200 let mut create_flow = Vec::with_capacity(self.data.peers.len());
202 for peer in &self.data.peers {
203 let requester = self.context.node_manager.flownode(peer).await;
204 let request = FlowRequest {
205 header: Some(FlowRequestHeader {
206 tracing_context: TracingContext::from_current_span().to_w3c(),
207 query_context: Some(QueryContext::from(self.data.flow_context.clone()).into()),
209 }),
210 body: Some(PbFlowRequest::Create((&self.data).into())),
211 };
212 create_flow.push(async move {
213 requester
214 .handle(request)
215 .await
216 .map_err(add_peer_context_if_needed(peer.clone()))
217 });
218 }
219 info!(
220 "Creating flow({:?}, type={:?}) on flownodes with peers={:?}",
221 self.data.flow_id, self.data.flow_type, self.data.peers
222 );
223 join_all(create_flow)
224 .await
225 .into_iter()
226 .collect::<Result<Vec<_>>>()?;
227
228 self.data.state = CreateFlowState::CreateMetadata;
229 Ok(Status::executing(true))
230 }
231
232 async fn on_create_metadata(&mut self) -> Result<Status> {
237 let flow_id = self.data.flow_id.unwrap();
239 let (flow_info, flow_routes) = (&self.data).into();
240 if let Some(prev_flow_value) = self.data.prev_flow_info_value.as_ref()
241 && self.data.task.or_replace
242 {
243 self.context
244 .flow_metadata_manager
245 .update_flow_metadata(flow_id, prev_flow_value, &flow_info, flow_routes)
246 .await?;
247 info!("Replaced flow metadata for flow {flow_id}");
248 self.data.did_replace = true;
249 } else {
250 self.context
251 .flow_metadata_manager
252 .create_flow_metadata(flow_id, flow_info, flow_routes)
253 .await?;
254 info!("Created flow metadata for flow {flow_id}");
255 }
256
257 self.data.state = CreateFlowState::InvalidateFlowCache;
258 Ok(Status::executing(true))
259 }
260
261 async fn on_broadcast(&mut self) -> Result<Status> {
262 debug_assert!(self.data.state == CreateFlowState::InvalidateFlowCache);
263 let flow_id = self.data.flow_id.unwrap();
265 let did_replace = self.data.did_replace;
266 let ctx = Context {
267 subject: Some("Invalidate flow cache by creating flow".to_string()),
268 };
269
270 let mut caches = vec![];
271
272 if did_replace {
274 let old_flow_info = self.data.prev_flow_info_value.as_ref().unwrap();
275
276 caches.extend([CacheIdent::DropFlow(DropFlow {
278 flow_id,
279 source_table_ids: old_flow_info.source_table_ids.clone(),
280 flow_part2node_id: old_flow_info.flownode_ids().clone().into_iter().collect(),
281 })]);
282 }
283
284 let (_flow_info, flow_routes) = (&self.data).into();
285 let flow_part2peers = flow_routes
286 .into_iter()
287 .map(|(part_id, route)| (part_id, route.peer))
288 .collect();
289
290 caches.extend([
291 CacheIdent::CreateFlow(CreateFlow {
292 flow_id,
293 source_table_ids: self.data.source_table_ids.clone(),
294 partition_to_peer_mapping: flow_part2peers,
295 }),
296 CacheIdent::FlowId(flow_id),
297 ]);
298
299 self.context
300 .cache_invalidator
301 .invalidate(&ctx, &caches)
302 .await?;
303
304 Ok(Status::done_with_output(flow_id))
305 }
306}
307
308#[async_trait]
309impl Procedure for CreateFlowProcedure {
310 fn type_name(&self) -> &str {
311 Self::TYPE_NAME
312 }
313
314 async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
315 let state = &self.data.state;
316
317 let _timer = metrics::METRIC_META_PROCEDURE_CREATE_FLOW
318 .with_label_values(&[state.as_ref()])
319 .start_timer();
320
321 match state {
322 CreateFlowState::Prepare => self.on_prepare().await,
323 CreateFlowState::CreateFlows => self.on_flownode_create_flows().await,
324 CreateFlowState::CreateMetadata => self.on_create_metadata().await,
325 CreateFlowState::InvalidateFlowCache => self.on_broadcast().await,
326 }
327 .map_err(map_to_procedure_error)
328 }
329
330 fn dump(&self) -> ProcedureResult<String> {
331 serde_json::to_string(&self.data).context(ToJsonSnafu)
332 }
333
334 fn lock_key(&self) -> LockKey {
335 let catalog_name = &self.data.task.catalog_name;
336 let flow_name = &self.data.task.flow_name;
337 let sink_table_name = &self.data.task.sink_table_name;
338
339 LockKey::new(vec![
340 CatalogLock::Read(catalog_name).into(),
341 TableNameLock::new(
342 &sink_table_name.catalog_name,
343 &sink_table_name.schema_name,
344 &sink_table_name.catalog_name,
345 )
346 .into(),
347 FlowNameLock::new(catalog_name, flow_name).into(),
348 ])
349 }
350}
351
352pub fn get_flow_type_from_options(flow_task: &CreateFlowTask) -> Result<FlowType> {
353 let flow_type = flow_task
354 .flow_options
355 .get(FlowType::FLOW_TYPE_KEY)
356 .map(|s| s.as_str());
357 match flow_type {
358 Some(FlowType::BATCHING) => Ok(FlowType::Batching),
359 Some(FlowType::STREAMING) => Ok(FlowType::Streaming),
360 Some(unknown) => UnexpectedSnafu {
361 err_msg: format!("Unknown flow type: {}", unknown),
362 }
363 .fail(),
364 None => Ok(FlowType::Batching),
365 }
366}
367
368#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)]
370pub enum CreateFlowState {
371 Prepare,
373 CreateFlows,
375 InvalidateFlowCache,
377 CreateMetadata,
379}
380
381#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
383pub enum FlowType {
384 Batching,
386 Streaming,
388}
389
390impl FlowType {
391 pub const BATCHING: &str = "batching";
392 pub const STREAMING: &str = "streaming";
393 pub const FLOW_TYPE_KEY: &str = "flow_type";
394}
395
396impl Default for FlowType {
397 fn default() -> Self {
398 Self::Batching
399 }
400}
401
402impl fmt::Display for FlowType {
403 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
404 match self {
405 FlowType::Batching => write!(f, "{}", FlowType::BATCHING),
406 FlowType::Streaming => write!(f, "{}", FlowType::STREAMING),
407 }
408 }
409}
410
411#[derive(Debug, Serialize, Deserialize)]
413pub struct CreateFlowData {
414 pub(crate) state: CreateFlowState,
415 pub(crate) task: CreateFlowTask,
416 pub(crate) flow_id: Option<FlowId>,
417 pub(crate) peers: Vec<Peer>,
418 pub(crate) source_table_ids: Vec<TableId>,
419 #[serde(alias = "query_context")]
421 pub(crate) flow_context: FlowQueryContext,
422 pub(crate) prev_flow_info_value: Option<DeserializedValueWithBytes<FlowInfoValue>>,
425 #[serde(default)]
428 pub(crate) did_replace: bool,
429 pub(crate) flow_type: Option<FlowType>,
430}
431
432impl From<&CreateFlowData> for CreateRequest {
433 fn from(value: &CreateFlowData) -> Self {
434 let flow_id = value.flow_id.unwrap();
435 let source_table_ids = &value.source_table_ids;
436
437 let mut req = CreateRequest {
438 flow_id: Some(api::v1::FlowId { id: flow_id }),
439 source_table_ids: source_table_ids
440 .iter()
441 .map(|table_id| api::v1::TableId { id: *table_id })
442 .collect_vec(),
443 sink_table_name: Some(value.task.sink_table_name.clone().into()),
444 create_if_not_exists: true,
446 or_replace: value.task.or_replace,
447 expire_after: value.task.expire_after.map(|value| ExpireAfter { value }),
448 comment: value.task.comment.clone(),
449 sql: value.task.sql.clone(),
450 flow_options: value.task.flow_options.clone(),
451 };
452
453 let flow_type = value.flow_type.unwrap_or_default().to_string();
454 req.flow_options
455 .insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type);
456 req
457 }
458}
459
460impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteValue)>) {
461 fn from(value: &CreateFlowData) -> Self {
462 let CreateFlowTask {
463 catalog_name,
464 flow_name,
465 sink_table_name,
466 expire_after,
467 comment,
468 sql,
469 flow_options: mut options,
470 ..
471 } = value.task.clone();
472
473 let flownode_ids = value
474 .peers
475 .iter()
476 .enumerate()
477 .map(|(idx, peer)| (idx as u32, peer.id))
478 .collect::<BTreeMap<_, _>>();
479 let flow_routes = value
480 .peers
481 .iter()
482 .enumerate()
483 .map(|(idx, peer)| (idx as u32, FlowRouteValue { peer: peer.clone() }))
484 .collect::<Vec<_>>();
485
486 let flow_type = value.flow_type.unwrap_or_default().to_string();
487 options.insert("flow_type".to_string(), flow_type);
488
489 let mut create_time = chrono::Utc::now();
490 if let Some(prev_flow_value) = value.prev_flow_info_value.as_ref()
491 && value.task.or_replace
492 {
493 create_time = prev_flow_value.get_inner_ref().created_time;
494 }
495
496 let flow_info: FlowInfoValue = FlowInfoValue {
497 source_table_ids: value.source_table_ids.clone(),
498 sink_table_name,
499 flownode_ids,
500 catalog_name,
501 query_context: Some(QueryContext::from(value.flow_context.clone())),
503 flow_name,
504 raw_sql: sql,
505 expire_after,
506 comment,
507 options,
508 created_time: create_time,
509 updated_time: chrono::Utc::now(),
510 };
511
512 (flow_info, flow_routes)
513 }
514}