1mod metadata;
16
17use std::collections::BTreeMap;
18use std::fmt;
19
20use api::v1::flow::flow_request::Body as PbFlowRequest;
21use api::v1::flow::{CreateRequest, FlowRequest, FlowRequestHeader};
22use api::v1::ExpireAfter;
23use async_trait::async_trait;
24use common_catalog::format_full_flow_name;
25use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
26use common_procedure::{
27 Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
28};
29use common_telemetry::info;
30use common_telemetry::tracing_context::TracingContext;
31use futures::future::join_all;
32use itertools::Itertools;
33use serde::{Deserialize, Serialize};
34use snafu::{ensure, ResultExt};
35use strum::AsRefStr;
36use table::metadata::TableId;
37
38use crate::cache_invalidator::Context;
39use crate::ddl::utils::{add_peer_context_if_needed, map_to_procedure_error};
40use crate::ddl::DdlContext;
41use crate::error::{self, Result, UnexpectedSnafu};
42use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
43use crate::key::flow::flow_info::FlowInfoValue;
44use crate::key::flow::flow_route::FlowRouteValue;
45use crate::key::table_name::TableNameKey;
46use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId};
47use crate::lock_key::{CatalogLock, FlowNameLock, TableNameLock};
48use crate::metrics;
49use crate::peer::Peer;
50use crate::rpc::ddl::{CreateFlowTask, QueryContext};
51
52pub struct CreateFlowProcedure {
54 pub context: DdlContext,
55 pub data: CreateFlowData,
56}
57
58impl CreateFlowProcedure {
59 pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateFlow";
60
61 pub fn new(task: CreateFlowTask, query_context: QueryContext, context: DdlContext) -> Self {
63 Self {
64 context,
65 data: CreateFlowData {
66 task,
67 flow_id: None,
68 peers: vec![],
69 source_table_ids: vec![],
70 query_context,
71 state: CreateFlowState::Prepare,
72 prev_flow_info_value: None,
73 did_replace: false,
74 flow_type: None,
75 },
76 }
77 }
78
79 pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
81 let data = serde_json::from_str(json).context(FromJsonSnafu)?;
82 Ok(CreateFlowProcedure { context, data })
83 }
84
85 pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
86 let catalog_name = &self.data.task.catalog_name;
87 let flow_name = &self.data.task.flow_name;
88 let sink_table_name = &self.data.task.sink_table_name;
89 let create_if_not_exists = self.data.task.create_if_not_exists;
90 let or_replace = self.data.task.or_replace;
91
92 let flow_name_value = self
93 .context
94 .flow_metadata_manager
95 .flow_name_manager()
96 .get(catalog_name, flow_name)
97 .await?;
98
99 if create_if_not_exists && or_replace {
100 return error::UnsupportedSnafu {
102 operation: "Create flow with both `IF NOT EXISTS` and `OR REPLACE`",
103 }
104 .fail();
105 }
106
107 if let Some(value) = flow_name_value {
108 ensure!(
109 create_if_not_exists || or_replace,
110 error::FlowAlreadyExistsSnafu {
111 flow_name: format_full_flow_name(catalog_name, flow_name),
112 }
113 );
114
115 let flow_id = value.flow_id();
116 if create_if_not_exists {
117 info!("Flow already exists, flow_id: {}", flow_id);
118 return Ok(Status::done_with_output(flow_id));
119 }
120
121 let flow_id = value.flow_id();
122 let peers = self
123 .context
124 .flow_metadata_manager
125 .flow_route_manager()
126 .routes(flow_id)
127 .await?
128 .into_iter()
129 .map(|(_, value)| value.peer)
130 .collect::<Vec<_>>();
131 self.data.flow_id = Some(flow_id);
132 self.data.peers = peers;
133 info!("Replacing flow, flow_id: {}", flow_id);
134
135 let flow_info_value = self
136 .context
137 .flow_metadata_manager
138 .flow_info_manager()
139 .get_raw(flow_id)
140 .await?;
141
142 ensure!(
143 flow_info_value.is_some(),
144 error::FlowNotFoundSnafu {
145 flow_name: format_full_flow_name(catalog_name, flow_name),
146 }
147 );
148
149 self.data.prev_flow_info_value = flow_info_value;
150 }
151
152 let exists = self
154 .context
155 .table_metadata_manager
156 .table_name_manager()
157 .exists(TableNameKey::new(
158 &sink_table_name.catalog_name,
159 &sink_table_name.schema_name,
160 &sink_table_name.table_name,
161 ))
162 .await?;
163 if exists {
166 common_telemetry::warn!("Table already exists, table: {}", sink_table_name);
167 }
168
169 self.collect_source_tables().await?;
170 if self.data.flow_id.is_none() {
171 self.allocate_flow_id().await?;
172 }
173 self.data.state = CreateFlowState::CreateFlows;
174 self.data.flow_type = Some(get_flow_type_from_options(&self.data.task)?);
176
177 Ok(Status::executing(true))
178 }
179
180 async fn on_flownode_create_flows(&mut self) -> Result<Status> {
181 let mut create_flow = Vec::with_capacity(self.data.peers.len());
183 for peer in &self.data.peers {
184 let requester = self.context.node_manager.flownode(peer).await;
185 let request = FlowRequest {
186 header: Some(FlowRequestHeader {
187 tracing_context: TracingContext::from_current_span().to_w3c(),
188 query_context: Some(self.data.query_context.clone().into()),
189 }),
190 body: Some(PbFlowRequest::Create((&self.data).into())),
191 };
192 create_flow.push(async move {
193 requester
194 .handle(request)
195 .await
196 .map_err(add_peer_context_if_needed(peer.clone()))
197 });
198 }
199 info!(
200 "Creating flow({:?}, type={:?}) on flownodes with peers={:?}",
201 self.data.flow_id, self.data.flow_type, self.data.peers
202 );
203 join_all(create_flow)
204 .await
205 .into_iter()
206 .collect::<Result<Vec<_>>>()?;
207
208 self.data.state = CreateFlowState::CreateMetadata;
209 Ok(Status::executing(true))
210 }
211
212 async fn on_create_metadata(&mut self) -> Result<Status> {
217 let flow_id = self.data.flow_id.unwrap();
219 let (flow_info, flow_routes) = (&self.data).into();
220 if let Some(prev_flow_value) = self.data.prev_flow_info_value.as_ref()
221 && self.data.task.or_replace
222 {
223 self.context
224 .flow_metadata_manager
225 .update_flow_metadata(flow_id, prev_flow_value, &flow_info, flow_routes)
226 .await?;
227 info!("Replaced flow metadata for flow {flow_id}");
228 self.data.did_replace = true;
229 } else {
230 self.context
231 .flow_metadata_manager
232 .create_flow_metadata(flow_id, flow_info, flow_routes)
233 .await?;
234 info!("Created flow metadata for flow {flow_id}");
235 }
236
237 self.data.state = CreateFlowState::InvalidateFlowCache;
238 Ok(Status::executing(true))
239 }
240
241 async fn on_broadcast(&mut self) -> Result<Status> {
242 debug_assert!(self.data.state == CreateFlowState::InvalidateFlowCache);
243 let flow_id = self.data.flow_id.unwrap();
245 let did_replace = self.data.did_replace;
246 let ctx = Context {
247 subject: Some("Invalidate flow cache by creating flow".to_string()),
248 };
249
250 let mut caches = vec![];
251
252 if did_replace {
254 let old_flow_info = self.data.prev_flow_info_value.as_ref().unwrap();
255
256 caches.extend([CacheIdent::DropFlow(DropFlow {
258 flow_id,
259 source_table_ids: old_flow_info.source_table_ids.clone(),
260 flow_part2node_id: old_flow_info.flownode_ids().clone().into_iter().collect(),
261 })]);
262 }
263
264 let (_flow_info, flow_routes) = (&self.data).into();
265 let flow_part2peers = flow_routes
266 .into_iter()
267 .map(|(part_id, route)| (part_id, route.peer))
268 .collect();
269
270 caches.extend([
271 CacheIdent::CreateFlow(CreateFlow {
272 flow_id,
273 source_table_ids: self.data.source_table_ids.clone(),
274 partition_to_peer_mapping: flow_part2peers,
275 }),
276 CacheIdent::FlowId(flow_id),
277 ]);
278
279 self.context
280 .cache_invalidator
281 .invalidate(&ctx, &caches)
282 .await?;
283
284 Ok(Status::done_with_output(flow_id))
285 }
286}
287
288#[async_trait]
289impl Procedure for CreateFlowProcedure {
290 fn type_name(&self) -> &str {
291 Self::TYPE_NAME
292 }
293
294 async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
295 let state = &self.data.state;
296
297 let _timer = metrics::METRIC_META_PROCEDURE_CREATE_FLOW
298 .with_label_values(&[state.as_ref()])
299 .start_timer();
300
301 match state {
302 CreateFlowState::Prepare => self.on_prepare().await,
303 CreateFlowState::CreateFlows => self.on_flownode_create_flows().await,
304 CreateFlowState::CreateMetadata => self.on_create_metadata().await,
305 CreateFlowState::InvalidateFlowCache => self.on_broadcast().await,
306 }
307 .map_err(map_to_procedure_error)
308 }
309
310 fn dump(&self) -> ProcedureResult<String> {
311 serde_json::to_string(&self.data).context(ToJsonSnafu)
312 }
313
314 fn lock_key(&self) -> LockKey {
315 let catalog_name = &self.data.task.catalog_name;
316 let flow_name = &self.data.task.flow_name;
317 let sink_table_name = &self.data.task.sink_table_name;
318
319 LockKey::new(vec![
320 CatalogLock::Read(catalog_name).into(),
321 TableNameLock::new(
322 &sink_table_name.catalog_name,
323 &sink_table_name.schema_name,
324 &sink_table_name.catalog_name,
325 )
326 .into(),
327 FlowNameLock::new(catalog_name, flow_name).into(),
328 ])
329 }
330}
331
332pub fn get_flow_type_from_options(flow_task: &CreateFlowTask) -> Result<FlowType> {
333 let flow_type = flow_task
334 .flow_options
335 .get(FlowType::FLOW_TYPE_KEY)
336 .map(|s| s.as_str());
337 match flow_type {
338 Some(FlowType::BATCHING) => Ok(FlowType::Batching),
339 Some(FlowType::STREAMING) => Ok(FlowType::Streaming),
340 Some(unknown) => UnexpectedSnafu {
341 err_msg: format!("Unknown flow type: {}", unknown),
342 }
343 .fail(),
344 None => Ok(FlowType::Batching),
345 }
346}
347
348#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)]
350pub enum CreateFlowState {
351 Prepare,
353 CreateFlows,
355 InvalidateFlowCache,
357 CreateMetadata,
359}
360
361#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
363pub enum FlowType {
364 Batching,
366 Streaming,
368}
369
370impl FlowType {
371 pub const BATCHING: &str = "batching";
372 pub const STREAMING: &str = "streaming";
373 pub const FLOW_TYPE_KEY: &str = "flow_type";
374}
375
376impl Default for FlowType {
377 fn default() -> Self {
378 Self::Batching
379 }
380}
381
382impl fmt::Display for FlowType {
383 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
384 match self {
385 FlowType::Batching => write!(f, "{}", FlowType::BATCHING),
386 FlowType::Streaming => write!(f, "{}", FlowType::STREAMING),
387 }
388 }
389}
390
391#[derive(Debug, Serialize, Deserialize)]
393pub struct CreateFlowData {
394 pub(crate) state: CreateFlowState,
395 pub(crate) task: CreateFlowTask,
396 pub(crate) flow_id: Option<FlowId>,
397 pub(crate) peers: Vec<Peer>,
398 pub(crate) source_table_ids: Vec<TableId>,
399 pub(crate) query_context: QueryContext,
400 pub(crate) prev_flow_info_value: Option<DeserializedValueWithBytes<FlowInfoValue>>,
403 #[serde(default)]
406 pub(crate) did_replace: bool,
407 pub(crate) flow_type: Option<FlowType>,
408}
409
410impl From<&CreateFlowData> for CreateRequest {
411 fn from(value: &CreateFlowData) -> Self {
412 let flow_id = value.flow_id.unwrap();
413 let source_table_ids = &value.source_table_ids;
414
415 let mut req = CreateRequest {
416 flow_id: Some(api::v1::FlowId { id: flow_id }),
417 source_table_ids: source_table_ids
418 .iter()
419 .map(|table_id| api::v1::TableId { id: *table_id })
420 .collect_vec(),
421 sink_table_name: Some(value.task.sink_table_name.clone().into()),
422 create_if_not_exists: true,
424 or_replace: value.task.or_replace,
425 expire_after: value.task.expire_after.map(|value| ExpireAfter { value }),
426 comment: value.task.comment.clone(),
427 sql: value.task.sql.clone(),
428 flow_options: value.task.flow_options.clone(),
429 };
430
431 let flow_type = value.flow_type.unwrap_or_default().to_string();
432 req.flow_options
433 .insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type);
434 req
435 }
436}
437
438impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteValue)>) {
439 fn from(value: &CreateFlowData) -> Self {
440 let CreateFlowTask {
441 catalog_name,
442 flow_name,
443 sink_table_name,
444 expire_after,
445 comment,
446 sql,
447 flow_options: mut options,
448 ..
449 } = value.task.clone();
450
451 let flownode_ids = value
452 .peers
453 .iter()
454 .enumerate()
455 .map(|(idx, peer)| (idx as u32, peer.id))
456 .collect::<BTreeMap<_, _>>();
457 let flow_routes = value
458 .peers
459 .iter()
460 .enumerate()
461 .map(|(idx, peer)| (idx as u32, FlowRouteValue { peer: peer.clone() }))
462 .collect::<Vec<_>>();
463
464 let flow_type = value.flow_type.unwrap_or_default().to_string();
465 options.insert("flow_type".to_string(), flow_type);
466
467 let mut create_time = chrono::Utc::now();
468 if let Some(prev_flow_value) = value.prev_flow_info_value.as_ref()
469 && value.task.or_replace
470 {
471 create_time = prev_flow_value.get_inner_ref().created_time;
472 }
473
474 let flow_info: FlowInfoValue = FlowInfoValue {
475 source_table_ids: value.source_table_ids.clone(),
476 sink_table_name,
477 flownode_ids,
478 catalog_name,
479 query_context: Some(value.query_context.clone()),
480 flow_name,
481 raw_sql: sql,
482 expire_after,
483 comment,
484 options,
485 created_time: create_time,
486 updated_time: chrono::Utc::now(),
487 };
488
489 (flow_info, flow_routes)
490 }
491}