1mod metadata;
16
17use std::collections::{BTreeMap, HashMap};
18use std::fmt;
19
20use api::v1::ExpireAfter;
21use api::v1::flow::flow_request::Body as PbFlowRequest;
22use api::v1::flow::{CreateRequest, FlowRequest, FlowRequestHeader};
23use async_trait::async_trait;
24use common_catalog::format_full_flow_name;
25use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
26use common_procedure::{
27 Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
28};
29use common_telemetry::info;
30use common_telemetry::tracing_context::TracingContext;
31use futures::future::join_all;
32use itertools::Itertools;
33use serde::{Deserialize, Serialize};
34use snafu::{ResultExt, ensure};
35use strum::AsRefStr;
36use table::metadata::TableId;
37use table::table_name::TableName;
38
39use crate::cache_invalidator::Context;
40use crate::ddl::DdlContext;
41use crate::ddl::utils::{add_peer_context_if_needed, map_to_procedure_error};
42use crate::error::{self, Result, UnexpectedSnafu};
43use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
44use crate::key::flow::flow_info::{FlowInfoValue, FlowStatus};
45use crate::key::flow::flow_route::FlowRouteValue;
46use crate::key::table_name::TableNameKey;
47use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId};
48use crate::lock_key::{CatalogLock, FlowNameLock};
49use crate::metrics;
50use crate::peer::Peer;
51use crate::rpc::ddl::{CreateFlowTask, FlowQueryContext, QueryContext};
52
53pub struct CreateFlowProcedure {
55 pub context: DdlContext,
56 pub data: CreateFlowData,
57}
58
59impl CreateFlowProcedure {
60 pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateFlow";
61
62 pub fn new(task: CreateFlowTask, query_context: QueryContext, context: DdlContext) -> Self {
64 Self {
65 context,
66 data: CreateFlowData {
67 task,
68 flow_id: None,
69 peers: vec![],
70 source_table_ids: vec![],
71 unresolved_source_table_names: vec![],
72 flow_context: query_context.into(), state: CreateFlowState::Prepare,
74 prev_flow_info_value: None,
75 did_replace: false,
76 flow_type: None,
77 },
78 }
79 }
80
81 pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
83 let data = serde_json::from_str(json).context(FromJsonSnafu)?;
84 Ok(CreateFlowProcedure { context, data })
85 }
86
87 pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
88 let catalog_name = &self.data.task.catalog_name;
89 let flow_name = &self.data.task.flow_name;
90 let sink_table_name = &self.data.task.sink_table_name;
91 let create_if_not_exists = self.data.task.create_if_not_exists;
92 let or_replace = self.data.task.or_replace;
93
94 validate_flow_options(&self.data.task)?;
95
96 let flow_name_value = self
97 .context
98 .flow_metadata_manager
99 .flow_name_manager()
100 .get(catalog_name, flow_name)
101 .await?;
102
103 if create_if_not_exists && or_replace {
104 return error::UnsupportedSnafu {
106 operation: "Create flow with both `IF NOT EXISTS` and `OR REPLACE`",
107 }
108 .fail();
109 }
110
111 if let Some(value) = flow_name_value {
112 ensure!(
113 create_if_not_exists || or_replace,
114 error::FlowAlreadyExistsSnafu {
115 flow_name: format_full_flow_name(catalog_name, flow_name),
116 }
117 );
118
119 let flow_id = value.flow_id();
120 if create_if_not_exists {
121 info!("Flow already exists, flow_id: {}", flow_id);
122 return Ok(Status::done_with_output(flow_id));
123 }
124
125 let flow_id = value.flow_id();
126 let peers = self
127 .context
128 .flow_metadata_manager
129 .flow_route_manager()
130 .routes(flow_id)
131 .await?
132 .into_iter()
133 .map(|(_, value)| value.peer)
134 .collect::<Vec<_>>();
135 self.data.flow_id = Some(flow_id);
136 self.data.peers = peers;
137 info!("Replacing flow, flow_id: {}", flow_id);
138
139 let flow_info_value = self
140 .context
141 .flow_metadata_manager
142 .flow_info_manager()
143 .get_raw(flow_id)
144 .await?;
145
146 ensure!(
147 flow_info_value.is_some(),
148 error::FlowNotFoundSnafu {
149 flow_name: format_full_flow_name(catalog_name, flow_name),
150 }
151 );
152
153 self.data.prev_flow_info_value = flow_info_value;
154 }
155
156 let exists = self
158 .context
159 .table_metadata_manager
160 .table_name_manager()
161 .exists(TableNameKey::new(
162 &sink_table_name.catalog_name,
163 &sink_table_name.schema_name,
164 &sink_table_name.table_name,
165 ))
166 .await?;
167 if exists {
170 common_telemetry::warn!("Table already exists, table: {}", sink_table_name);
171 }
172
173 self.collect_source_tables().await?;
174 ensure!(
175 self.data.unresolved_source_table_names.is_empty()
176 || defer_on_missing_source(&self.data.task)?,
177 error::UnsupportedSnafu {
178 operation: format!(
179 "Create flow with missing source tables requires WITH ('{DEFER_ON_MISSING_SOURCE_KEY}'='true'): {}",
180 self.data
181 .unresolved_source_table_names
182 .iter()
183 .map(ToString::to_string)
184 .join(", ")
185 )
186 }
187 );
188 self.ensure_supported_replace_transition()?;
189
190 let sink_table_name = &self.data.task.sink_table_name;
192 if self
193 .data
194 .task
195 .source_table_names
196 .iter()
197 .any(|source| source == sink_table_name)
198 {
199 return error::UnsupportedSnafu {
200 operation: format!(
201 "Creating flow with source and sink table being the same: {}",
202 sink_table_name
203 ),
204 }
205 .fail();
206 }
207
208 if self.data.flow_id.is_none() {
209 self.allocate_flow_id().await?;
210 }
211 self.data.flow_type = Some(get_flow_type_from_options(&self.data.task)?);
212
213 self.data.state = if self.data.is_pending() {
214 self.data.peers.clear();
215 CreateFlowState::CreateMetadata
216 } else {
217 CreateFlowState::CreateFlows
218 };
219
220 Ok(Status::executing(true))
221 }
222
223 fn ensure_supported_replace_transition(&self) -> Result<()> {
224 if !self.data.task.or_replace {
225 return Ok(());
226 }
227
228 let Some(prev_flow_info) = self.data.prev_flow_info_value.as_ref() else {
229 return Ok(());
230 };
231 let prev_pending = prev_flow_info.get_inner_ref().is_pending();
232 let new_pending = self.data.is_pending();
233 ensure!(
234 prev_pending == new_pending,
235 error::UnsupportedSnafu {
236 operation: "Replacing between pending and active flow states is not supported yet"
237 }
238 );
239
240 Ok(())
241 }
242
243 async fn on_flownode_create_flows(&mut self) -> Result<Status> {
244 let mut create_flow = Vec::with_capacity(self.data.peers.len());
246 for peer in &self.data.peers {
247 let requester = self.context.node_manager.flownode(peer).await;
248 let request = FlowRequest {
249 header: Some(FlowRequestHeader {
250 tracing_context: TracingContext::from_current_span().to_w3c(),
251 query_context: Some(QueryContext::from(self.data.flow_context.clone()).into()),
253 }),
254 body: Some(PbFlowRequest::Create((&self.data).into())),
255 };
256 create_flow.push(async move {
257 requester
258 .handle(request)
259 .await
260 .map_err(add_peer_context_if_needed(peer.clone()))
261 });
262 }
263 info!(
264 "Creating flow({:?}, type={:?}) on flownodes with peers={:?}",
265 self.data.flow_id, self.data.flow_type, self.data.peers
266 );
267 join_all(create_flow)
268 .await
269 .into_iter()
270 .collect::<Result<Vec<_>>>()?;
271
272 self.data.state = CreateFlowState::CreateMetadata;
273 Ok(Status::executing(true))
274 }
275
276 async fn on_create_metadata(&mut self) -> Result<Status> {
281 let flow_id = self.data.flow_id.unwrap();
283 let (flow_info, flow_routes) = (&self.data).into();
284 if let Some(prev_flow_value) = self.data.prev_flow_info_value.as_ref()
285 && self.data.task.or_replace
286 {
287 self.context
288 .flow_metadata_manager
289 .update_flow_metadata(flow_id, prev_flow_value, &flow_info, flow_routes)
290 .await?;
291 info!("Replaced flow metadata for flow {flow_id}");
292 self.data.did_replace = true;
293 } else {
294 self.context
295 .flow_metadata_manager
296 .create_flow_metadata(flow_id, flow_info, flow_routes)
297 .await?;
298 info!("Created flow metadata for flow {flow_id}");
299 }
300
301 self.data.state = CreateFlowState::InvalidateFlowCache;
302 Ok(Status::executing(true))
303 }
304
305 async fn on_broadcast(&mut self) -> Result<Status> {
306 debug_assert!(self.data.state == CreateFlowState::InvalidateFlowCache);
307 let flow_id = self.data.flow_id.unwrap();
309 let did_replace = self.data.did_replace;
310 let ctx = Context {
311 subject: Some("Invalidate flow cache by creating flow".to_string()),
312 };
313
314 let mut caches = vec![];
315
316 if did_replace {
318 let old_flow_info = self.data.prev_flow_info_value.as_ref().unwrap();
319
320 caches.extend([CacheIdent::DropFlow(DropFlow {
322 flow_id,
323 source_table_ids: old_flow_info.source_table_ids.clone(),
324 flow_part2node_id: old_flow_info.flownode_ids().clone().into_iter().collect(),
325 })]);
326 }
327
328 let (_flow_info, flow_routes) = (&self.data).into();
329 let flow_part2peers = flow_routes
330 .into_iter()
331 .map(|(part_id, route)| (part_id, route.peer))
332 .collect();
333
334 caches.extend([
335 CacheIdent::CreateFlow(CreateFlow {
336 flow_id,
337 source_table_ids: self.data.source_table_ids.clone(),
338 partition_to_peer_mapping: flow_part2peers,
339 }),
340 CacheIdent::FlowId(flow_id),
341 ]);
342
343 self.context
344 .cache_invalidator
345 .invalidate(&ctx, &caches)
346 .await?;
347
348 Ok(Status::done_with_output(flow_id))
349 }
350}
351
352#[async_trait]
353impl Procedure for CreateFlowProcedure {
354 fn type_name(&self) -> &str {
355 Self::TYPE_NAME
356 }
357
358 async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
359 let state = &self.data.state;
360
361 let _timer = metrics::METRIC_META_PROCEDURE_CREATE_FLOW
362 .with_label_values(&[state.as_ref()])
363 .start_timer();
364
365 match state {
366 CreateFlowState::Prepare => self.on_prepare().await,
367 CreateFlowState::CreateFlows => self.on_flownode_create_flows().await,
368 CreateFlowState::CreateMetadata => self.on_create_metadata().await,
369 CreateFlowState::InvalidateFlowCache => self.on_broadcast().await,
370 }
371 .map_err(map_to_procedure_error)
372 }
373
374 fn dump(&self) -> ProcedureResult<String> {
375 serde_json::to_string(&self.data).context(ToJsonSnafu)
376 }
377
378 fn lock_key(&self) -> LockKey {
379 let catalog_name = &self.data.task.catalog_name;
380 let flow_name = &self.data.task.flow_name;
381
382 LockKey::new(vec![
383 CatalogLock::Read(catalog_name).into(),
384 FlowNameLock::new(catalog_name, flow_name).into(),
385 ])
386 }
387}
388
389pub fn get_flow_type_from_options(flow_task: &CreateFlowTask) -> Result<FlowType> {
390 let flow_type = flow_task
391 .flow_options
392 .get(FlowType::FLOW_TYPE_KEY)
393 .map(|s| s.as_str());
394 match flow_type {
395 Some(FlowType::BATCHING) => Ok(FlowType::Batching),
396 Some(FlowType::STREAMING) => Ok(FlowType::Streaming),
397 Some(unknown) => UnexpectedSnafu {
398 err_msg: format!("Unknown flow type: {}", unknown),
399 }
400 .fail(),
401 None => Ok(FlowType::Batching),
402 }
403}
404
405pub const DEFER_ON_MISSING_SOURCE_KEY: &str = "defer_on_missing_source";
407
408pub fn defer_on_missing_source(flow_task: &CreateFlowTask) -> Result<bool> {
409 flow_task
410 .flow_options
411 .get(DEFER_ON_MISSING_SOURCE_KEY)
412 .map(|value| {
413 value
414 .trim()
415 .to_ascii_lowercase()
416 .parse::<bool>()
417 .map_err(|_| {
418 error::UnexpectedSnafu {
419 err_msg: format!(
420 "Invalid flow option '{DEFER_ON_MISSING_SOURCE_KEY}': {value}"
421 ),
422 }
423 .build()
424 })
425 })
426 .transpose()
427 .map(|value| value.unwrap_or(false))
428}
429
430pub fn validate_flow_options(flow_task: &CreateFlowTask) -> Result<()> {
431 for key in flow_task.flow_options.keys() {
432 match key.as_str() {
433 DEFER_ON_MISSING_SOURCE_KEY
434 | FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY
435 | FlowType::FLOW_TYPE_KEY => {}
436 unknown => {
437 return UnexpectedSnafu {
438 err_msg: format!(
439 "Unknown flow option '{unknown}', supported user options: {DEFER_ON_MISSING_SOURCE_KEY}, {FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY}"
440 ),
441 }
442 .fail();
443 }
444 }
445 }
446
447 defer_on_missing_source(flow_task)?;
448 get_flow_type_from_options(flow_task)?;
449 Ok(())
450}
451
452fn user_runtime_flow_options(options: &HashMap<String, String>) -> HashMap<String, String> {
453 let mut options = options.clone();
454 options.remove(DEFER_ON_MISSING_SOURCE_KEY);
455 options
456}
457
458fn metadata_flow_options(options: &HashMap<String, String>) -> HashMap<String, String> {
459 options.clone()
460}
461
462#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)]
464pub enum CreateFlowState {
465 Prepare,
467 CreateFlows,
469 InvalidateFlowCache,
471 CreateMetadata,
473}
474
475#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
477pub enum FlowType {
478 #[default]
480 Batching,
481 Streaming,
483}
484
485pub const FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY: &str =
486 "experimental_enable_incremental_read";
487
488impl FlowType {
489 pub const BATCHING: &str = "batching";
490 pub const STREAMING: &str = "streaming";
491 pub const FLOW_TYPE_KEY: &str = "flow_type";
492}
493
494impl fmt::Display for FlowType {
495 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
496 match self {
497 FlowType::Batching => write!(f, "{}", FlowType::BATCHING),
498 FlowType::Streaming => write!(f, "{}", FlowType::STREAMING),
499 }
500 }
501}
502
503#[derive(Debug, Serialize, Deserialize)]
505pub struct CreateFlowData {
506 pub(crate) state: CreateFlowState,
507 pub(crate) task: CreateFlowTask,
508 pub(crate) flow_id: Option<FlowId>,
509 pub(crate) peers: Vec<Peer>,
510 pub(crate) source_table_ids: Vec<TableId>,
511 #[serde(default)]
512 pub(crate) unresolved_source_table_names: Vec<TableName>,
513 #[serde(alias = "query_context")]
515 pub(crate) flow_context: FlowQueryContext,
516 pub(crate) prev_flow_info_value: Option<DeserializedValueWithBytes<FlowInfoValue>>,
519 #[serde(default)]
522 pub(crate) did_replace: bool,
523 pub(crate) flow_type: Option<FlowType>,
524}
525
526impl CreateFlowData {
527 pub(crate) fn is_pending(&self) -> bool {
528 !self.unresolved_source_table_names.is_empty()
529 }
530
531 pub(crate) fn is_active(&self) -> bool {
532 !self.is_pending()
533 }
534}
535
536impl From<&CreateFlowData> for CreateRequest {
537 fn from(value: &CreateFlowData) -> Self {
538 let flow_id = value.flow_id.unwrap();
539 let source_table_ids = &value.source_table_ids;
540
541 let mut req = CreateRequest {
542 flow_id: Some(api::v1::FlowId { id: flow_id }),
543 source_table_ids: source_table_ids
544 .iter()
545 .map(|table_id| api::v1::TableId { id: *table_id })
546 .collect_vec(),
547 sink_table_name: Some(value.task.sink_table_name.clone().into()),
548 create_if_not_exists: true,
550 or_replace: value.task.or_replace,
551 expire_after: value.task.expire_after.map(|value| ExpireAfter { value }),
552 eval_interval: value
553 .task
554 .eval_interval_secs
555 .map(|seconds| api::v1::EvalInterval { seconds }),
556 comment: value.task.comment.clone(),
557 sql: value.task.sql.clone(),
558 flow_options: user_runtime_flow_options(&value.task.flow_options),
559 };
560
561 let flow_type = value.flow_type.unwrap_or_default().to_string();
562 req.flow_options
563 .insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type);
564 req
565 }
566}
567
568impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteValue)>) {
569 fn from(value: &CreateFlowData) -> Self {
570 let CreateFlowTask {
571 catalog_name,
572 flow_name,
573 sink_table_name,
574 expire_after,
575 eval_interval_secs: eval_interval,
576 comment,
577 sql,
578 ..
579 } = value.task.clone();
580 let mut options = metadata_flow_options(&value.task.flow_options);
581
582 let flownode_ids = value
583 .peers
584 .iter()
585 .enumerate()
586 .map(|(idx, peer)| (idx as u32, peer.id))
587 .collect::<BTreeMap<_, _>>();
588 let flow_routes = value
589 .peers
590 .iter()
591 .enumerate()
592 .map(|(idx, peer)| (idx as u32, FlowRouteValue { peer: peer.clone() }))
593 .collect::<Vec<_>>();
594
595 let flow_type = value.flow_type.unwrap_or_default().to_string();
596 options.insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type);
597
598 let mut create_time = chrono::Utc::now();
599 if let Some(prev_flow_value) = value.prev_flow_info_value.as_ref()
600 && value.task.or_replace
601 {
602 create_time = prev_flow_value.get_inner_ref().created_time;
603 }
604
605 let flow_info: FlowInfoValue = FlowInfoValue {
606 source_table_ids: value.source_table_ids.clone(),
607 all_source_table_names: value.task.source_table_names.clone(),
608 unresolved_source_table_names: value.unresolved_source_table_names.clone(),
609 sink_table_name,
610 flownode_ids,
611 catalog_name,
612 query_context: Some(QueryContext::from(value.flow_context.clone())),
614 flow_name,
615 raw_sql: sql,
616 expire_after,
617 eval_interval_secs: eval_interval,
618 comment,
619 options,
620 status: if value.is_active() {
621 FlowStatus::Active
622 } else {
623 FlowStatus::PendingSources
624 },
625 created_time: create_time,
626 updated_time: chrono::Utc::now(),
627 };
628
629 (flow_info, flow_routes)
630 }
631}