1mod metadata;
16
17use std::collections::BTreeMap;
18use std::fmt;
19
20use api::v1::flow::flow_request::Body as PbFlowRequest;
21use api::v1::flow::{CreateRequest, FlowRequest, FlowRequestHeader};
22use api::v1::ExpireAfter;
23use async_trait::async_trait;
24use common_catalog::format_full_flow_name;
25use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
26use common_procedure::{
27 Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
28};
29use common_telemetry::info;
30use common_telemetry::tracing_context::TracingContext;
31use futures::future::join_all;
32use itertools::Itertools;
33use serde::{Deserialize, Serialize};
34use snafu::{ensure, ResultExt};
35use strum::AsRefStr;
36use table::metadata::TableId;
37
38use crate::cache_invalidator::Context;
39use crate::ddl::utils::{add_peer_context_if_needed, map_to_procedure_error};
40use crate::ddl::DdlContext;
41use crate::error::{self, Result, UnexpectedSnafu};
42use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
43use crate::key::flow::flow_info::FlowInfoValue;
44use crate::key::flow::flow_route::FlowRouteValue;
45use crate::key::table_name::TableNameKey;
46use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId};
47use crate::lock_key::{CatalogLock, FlowNameLock, TableNameLock};
48use crate::metrics;
49use crate::peer::Peer;
50use crate::rpc::ddl::{CreateFlowTask, QueryContext};
51
52pub struct CreateFlowProcedure {
54 pub context: DdlContext,
55 pub data: CreateFlowData,
56}
57
58impl CreateFlowProcedure {
59 pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateFlow";
60
61 pub fn new(task: CreateFlowTask, query_context: QueryContext, context: DdlContext) -> Self {
63 Self {
64 context,
65 data: CreateFlowData {
66 task,
67 flow_id: None,
68 peers: vec![],
69 source_table_ids: vec![],
70 query_context,
71 state: CreateFlowState::Prepare,
72 prev_flow_info_value: None,
73 did_replace: false,
74 flow_type: None,
75 },
76 }
77 }
78
79 pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
81 let data = serde_json::from_str(json).context(FromJsonSnafu)?;
82 Ok(CreateFlowProcedure { context, data })
83 }
84
85 pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
86 let catalog_name = &self.data.task.catalog_name;
87 let flow_name = &self.data.task.flow_name;
88 let sink_table_name = &self.data.task.sink_table_name;
89 let create_if_not_exists = self.data.task.create_if_not_exists;
90 let or_replace = self.data.task.or_replace;
91
92 let flow_name_value = self
93 .context
94 .flow_metadata_manager
95 .flow_name_manager()
96 .get(catalog_name, flow_name)
97 .await?;
98
99 if create_if_not_exists && or_replace {
100 return error::UnsupportedSnafu {
102 operation: "Create flow with both `IF NOT EXISTS` and `OR REPLACE`",
103 }
104 .fail();
105 }
106
107 if let Some(value) = flow_name_value {
108 ensure!(
109 create_if_not_exists || or_replace,
110 error::FlowAlreadyExistsSnafu {
111 flow_name: format_full_flow_name(catalog_name, flow_name),
112 }
113 );
114
115 let flow_id = value.flow_id();
116 if create_if_not_exists {
117 info!("Flow already exists, flow_id: {}", flow_id);
118 return Ok(Status::done_with_output(flow_id));
119 }
120
121 let flow_id = value.flow_id();
122 let peers = self
123 .context
124 .flow_metadata_manager
125 .flow_route_manager()
126 .routes(flow_id)
127 .await?
128 .into_iter()
129 .map(|(_, value)| value.peer)
130 .collect::<Vec<_>>();
131 self.data.flow_id = Some(flow_id);
132 self.data.peers = peers;
133 info!("Replacing flow, flow_id: {}", flow_id);
134
135 let flow_info_value = self
136 .context
137 .flow_metadata_manager
138 .flow_info_manager()
139 .get_raw(flow_id)
140 .await?;
141
142 ensure!(
143 flow_info_value.is_some(),
144 error::FlowNotFoundSnafu {
145 flow_name: format_full_flow_name(catalog_name, flow_name),
146 }
147 );
148
149 self.data.prev_flow_info_value = flow_info_value;
150 }
151
152 let exists = self
154 .context
155 .table_metadata_manager
156 .table_name_manager()
157 .exists(TableNameKey::new(
158 &sink_table_name.catalog_name,
159 &sink_table_name.schema_name,
160 &sink_table_name.table_name,
161 ))
162 .await?;
163 if exists {
166 common_telemetry::warn!("Table already exists, table: {}", sink_table_name);
167 }
168
169 self.collect_source_tables().await?;
170
171 let sink_table_name = &self.data.task.sink_table_name;
173 if self
174 .data
175 .task
176 .source_table_names
177 .iter()
178 .any(|source| source == sink_table_name)
179 {
180 return error::UnsupportedSnafu {
181 operation: format!(
182 "Creating flow with source and sink table being the same: {}",
183 sink_table_name
184 ),
185 }
186 .fail();
187 }
188
189 if self.data.flow_id.is_none() {
190 self.allocate_flow_id().await?;
191 }
192 self.data.state = CreateFlowState::CreateFlows;
193 self.data.flow_type = Some(get_flow_type_from_options(&self.data.task)?);
195
196 Ok(Status::executing(true))
197 }
198
199 async fn on_flownode_create_flows(&mut self) -> Result<Status> {
200 let mut create_flow = Vec::with_capacity(self.data.peers.len());
202 for peer in &self.data.peers {
203 let requester = self.context.node_manager.flownode(peer).await;
204 let request = FlowRequest {
205 header: Some(FlowRequestHeader {
206 tracing_context: TracingContext::from_current_span().to_w3c(),
207 query_context: Some(self.data.query_context.clone().into()),
208 }),
209 body: Some(PbFlowRequest::Create((&self.data).into())),
210 };
211 create_flow.push(async move {
212 requester
213 .handle(request)
214 .await
215 .map_err(add_peer_context_if_needed(peer.clone()))
216 });
217 }
218 info!(
219 "Creating flow({:?}, type={:?}) on flownodes with peers={:?}",
220 self.data.flow_id, self.data.flow_type, self.data.peers
221 );
222 join_all(create_flow)
223 .await
224 .into_iter()
225 .collect::<Result<Vec<_>>>()?;
226
227 self.data.state = CreateFlowState::CreateMetadata;
228 Ok(Status::executing(true))
229 }
230
231 async fn on_create_metadata(&mut self) -> Result<Status> {
236 let flow_id = self.data.flow_id.unwrap();
238 let (flow_info, flow_routes) = (&self.data).into();
239 if let Some(prev_flow_value) = self.data.prev_flow_info_value.as_ref()
240 && self.data.task.or_replace
241 {
242 self.context
243 .flow_metadata_manager
244 .update_flow_metadata(flow_id, prev_flow_value, &flow_info, flow_routes)
245 .await?;
246 info!("Replaced flow metadata for flow {flow_id}");
247 self.data.did_replace = true;
248 } else {
249 self.context
250 .flow_metadata_manager
251 .create_flow_metadata(flow_id, flow_info, flow_routes)
252 .await?;
253 info!("Created flow metadata for flow {flow_id}");
254 }
255
256 self.data.state = CreateFlowState::InvalidateFlowCache;
257 Ok(Status::executing(true))
258 }
259
260 async fn on_broadcast(&mut self) -> Result<Status> {
261 debug_assert!(self.data.state == CreateFlowState::InvalidateFlowCache);
262 let flow_id = self.data.flow_id.unwrap();
264 let did_replace = self.data.did_replace;
265 let ctx = Context {
266 subject: Some("Invalidate flow cache by creating flow".to_string()),
267 };
268
269 let mut caches = vec![];
270
271 if did_replace {
273 let old_flow_info = self.data.prev_flow_info_value.as_ref().unwrap();
274
275 caches.extend([CacheIdent::DropFlow(DropFlow {
277 flow_id,
278 source_table_ids: old_flow_info.source_table_ids.clone(),
279 flow_part2node_id: old_flow_info.flownode_ids().clone().into_iter().collect(),
280 })]);
281 }
282
283 let (_flow_info, flow_routes) = (&self.data).into();
284 let flow_part2peers = flow_routes
285 .into_iter()
286 .map(|(part_id, route)| (part_id, route.peer))
287 .collect();
288
289 caches.extend([
290 CacheIdent::CreateFlow(CreateFlow {
291 flow_id,
292 source_table_ids: self.data.source_table_ids.clone(),
293 partition_to_peer_mapping: flow_part2peers,
294 }),
295 CacheIdent::FlowId(flow_id),
296 ]);
297
298 self.context
299 .cache_invalidator
300 .invalidate(&ctx, &caches)
301 .await?;
302
303 Ok(Status::done_with_output(flow_id))
304 }
305}
306
307#[async_trait]
308impl Procedure for CreateFlowProcedure {
309 fn type_name(&self) -> &str {
310 Self::TYPE_NAME
311 }
312
313 async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
314 let state = &self.data.state;
315
316 let _timer = metrics::METRIC_META_PROCEDURE_CREATE_FLOW
317 .with_label_values(&[state.as_ref()])
318 .start_timer();
319
320 match state {
321 CreateFlowState::Prepare => self.on_prepare().await,
322 CreateFlowState::CreateFlows => self.on_flownode_create_flows().await,
323 CreateFlowState::CreateMetadata => self.on_create_metadata().await,
324 CreateFlowState::InvalidateFlowCache => self.on_broadcast().await,
325 }
326 .map_err(map_to_procedure_error)
327 }
328
329 fn dump(&self) -> ProcedureResult<String> {
330 serde_json::to_string(&self.data).context(ToJsonSnafu)
331 }
332
333 fn lock_key(&self) -> LockKey {
334 let catalog_name = &self.data.task.catalog_name;
335 let flow_name = &self.data.task.flow_name;
336 let sink_table_name = &self.data.task.sink_table_name;
337
338 LockKey::new(vec![
339 CatalogLock::Read(catalog_name).into(),
340 TableNameLock::new(
341 &sink_table_name.catalog_name,
342 &sink_table_name.schema_name,
343 &sink_table_name.catalog_name,
344 )
345 .into(),
346 FlowNameLock::new(catalog_name, flow_name).into(),
347 ])
348 }
349}
350
351pub fn get_flow_type_from_options(flow_task: &CreateFlowTask) -> Result<FlowType> {
352 let flow_type = flow_task
353 .flow_options
354 .get(FlowType::FLOW_TYPE_KEY)
355 .map(|s| s.as_str());
356 match flow_type {
357 Some(FlowType::BATCHING) => Ok(FlowType::Batching),
358 Some(FlowType::STREAMING) => Ok(FlowType::Streaming),
359 Some(unknown) => UnexpectedSnafu {
360 err_msg: format!("Unknown flow type: {}", unknown),
361 }
362 .fail(),
363 None => Ok(FlowType::Batching),
364 }
365}
366
367#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)]
369pub enum CreateFlowState {
370 Prepare,
372 CreateFlows,
374 InvalidateFlowCache,
376 CreateMetadata,
378}
379
380#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
382pub enum FlowType {
383 Batching,
385 Streaming,
387}
388
389impl FlowType {
390 pub const BATCHING: &str = "batching";
391 pub const STREAMING: &str = "streaming";
392 pub const FLOW_TYPE_KEY: &str = "flow_type";
393}
394
395impl Default for FlowType {
396 fn default() -> Self {
397 Self::Batching
398 }
399}
400
401impl fmt::Display for FlowType {
402 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
403 match self {
404 FlowType::Batching => write!(f, "{}", FlowType::BATCHING),
405 FlowType::Streaming => write!(f, "{}", FlowType::STREAMING),
406 }
407 }
408}
409
410#[derive(Debug, Serialize, Deserialize)]
412pub struct CreateFlowData {
413 pub(crate) state: CreateFlowState,
414 pub(crate) task: CreateFlowTask,
415 pub(crate) flow_id: Option<FlowId>,
416 pub(crate) peers: Vec<Peer>,
417 pub(crate) source_table_ids: Vec<TableId>,
418 pub(crate) query_context: QueryContext,
419 pub(crate) prev_flow_info_value: Option<DeserializedValueWithBytes<FlowInfoValue>>,
422 #[serde(default)]
425 pub(crate) did_replace: bool,
426 pub(crate) flow_type: Option<FlowType>,
427}
428
429impl From<&CreateFlowData> for CreateRequest {
430 fn from(value: &CreateFlowData) -> Self {
431 let flow_id = value.flow_id.unwrap();
432 let source_table_ids = &value.source_table_ids;
433
434 let mut req = CreateRequest {
435 flow_id: Some(api::v1::FlowId { id: flow_id }),
436 source_table_ids: source_table_ids
437 .iter()
438 .map(|table_id| api::v1::TableId { id: *table_id })
439 .collect_vec(),
440 sink_table_name: Some(value.task.sink_table_name.clone().into()),
441 create_if_not_exists: true,
443 or_replace: value.task.or_replace,
444 expire_after: value.task.expire_after.map(|value| ExpireAfter { value }),
445 comment: value.task.comment.clone(),
446 sql: value.task.sql.clone(),
447 flow_options: value.task.flow_options.clone(),
448 };
449
450 let flow_type = value.flow_type.unwrap_or_default().to_string();
451 req.flow_options
452 .insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type);
453 req
454 }
455}
456
457impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteValue)>) {
458 fn from(value: &CreateFlowData) -> Self {
459 let CreateFlowTask {
460 catalog_name,
461 flow_name,
462 sink_table_name,
463 expire_after,
464 comment,
465 sql,
466 flow_options: mut options,
467 ..
468 } = value.task.clone();
469
470 let flownode_ids = value
471 .peers
472 .iter()
473 .enumerate()
474 .map(|(idx, peer)| (idx as u32, peer.id))
475 .collect::<BTreeMap<_, _>>();
476 let flow_routes = value
477 .peers
478 .iter()
479 .enumerate()
480 .map(|(idx, peer)| (idx as u32, FlowRouteValue { peer: peer.clone() }))
481 .collect::<Vec<_>>();
482
483 let flow_type = value.flow_type.unwrap_or_default().to_string();
484 options.insert("flow_type".to_string(), flow_type);
485
486 let mut create_time = chrono::Utc::now();
487 if let Some(prev_flow_value) = value.prev_flow_info_value.as_ref()
488 && value.task.or_replace
489 {
490 create_time = prev_flow_value.get_inner_ref().created_time;
491 }
492
493 let flow_info: FlowInfoValue = FlowInfoValue {
494 source_table_ids: value.source_table_ids.clone(),
495 sink_table_name,
496 flownode_ids,
497 catalog_name,
498 query_context: Some(value.query_context.clone()),
499 flow_name,
500 raw_sql: sql,
501 expire_after,
502 comment,
503 options,
504 created_time: create_time,
505 updated_time: chrono::Utc::now(),
506 };
507
508 (flow_info, flow_routes)
509 }
510}