common_meta/ddl/
create_flow.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod metadata;
16
17use std::collections::BTreeMap;
18use std::fmt;
19
20use api::v1::flow::flow_request::Body as PbFlowRequest;
21use api::v1::flow::{CreateRequest, FlowRequest, FlowRequestHeader};
22use api::v1::ExpireAfter;
23use async_trait::async_trait;
24use common_catalog::format_full_flow_name;
25use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
26use common_procedure::{
27    Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
28};
29use common_telemetry::info;
30use common_telemetry::tracing_context::TracingContext;
31use futures::future::join_all;
32use itertools::Itertools;
33use serde::{Deserialize, Serialize};
34use snafu::{ensure, ResultExt};
35use strum::AsRefStr;
36use table::metadata::TableId;
37
38use crate::cache_invalidator::Context;
39use crate::ddl::utils::{add_peer_context_if_needed, map_to_procedure_error};
40use crate::ddl::DdlContext;
41use crate::error::{self, Result, UnexpectedSnafu};
42use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
43use crate::key::flow::flow_info::FlowInfoValue;
44use crate::key::flow::flow_route::FlowRouteValue;
45use crate::key::table_name::TableNameKey;
46use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId};
47use crate::lock_key::{CatalogLock, FlowNameLock, TableNameLock};
48use crate::metrics;
49use crate::peer::Peer;
50use crate::rpc::ddl::{CreateFlowTask, FlowQueryContext, QueryContext};
51
52/// The procedure of flow creation.
53pub struct CreateFlowProcedure {
54    pub context: DdlContext,
55    pub data: CreateFlowData,
56}
57
58impl CreateFlowProcedure {
59    pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateFlow";
60
61    /// Returns a new [CreateFlowProcedure].
62    pub fn new(task: CreateFlowTask, query_context: QueryContext, context: DdlContext) -> Self {
63        Self {
64            context,
65            data: CreateFlowData {
66                task,
67                flow_id: None,
68                peers: vec![],
69                source_table_ids: vec![],
70                flow_context: query_context.into(), // Convert to FlowQueryContext
71                state: CreateFlowState::Prepare,
72                prev_flow_info_value: None,
73                did_replace: false,
74                flow_type: None,
75            },
76        }
77    }
78
79    /// Deserializes from `json`.
80    pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
81        let data = serde_json::from_str(json).context(FromJsonSnafu)?;
82        Ok(CreateFlowProcedure { context, data })
83    }
84
85    pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
86        let catalog_name = &self.data.task.catalog_name;
87        let flow_name = &self.data.task.flow_name;
88        let sink_table_name = &self.data.task.sink_table_name;
89        let create_if_not_exists = self.data.task.create_if_not_exists;
90        let or_replace = self.data.task.or_replace;
91
92        let flow_name_value = self
93            .context
94            .flow_metadata_manager
95            .flow_name_manager()
96            .get(catalog_name, flow_name)
97            .await?;
98
99        if create_if_not_exists && or_replace {
100            // this is forbidden because not clear what does that mean exactly
101            return error::UnsupportedSnafu {
102                operation: "Create flow with both `IF NOT EXISTS` and `OR REPLACE`",
103            }
104            .fail();
105        }
106
107        if let Some(value) = flow_name_value {
108            ensure!(
109                create_if_not_exists || or_replace,
110                error::FlowAlreadyExistsSnafu {
111                    flow_name: format_full_flow_name(catalog_name, flow_name),
112                }
113            );
114
115            let flow_id = value.flow_id();
116            if create_if_not_exists {
117                info!("Flow already exists, flow_id: {}", flow_id);
118                return Ok(Status::done_with_output(flow_id));
119            }
120
121            let flow_id = value.flow_id();
122            let peers = self
123                .context
124                .flow_metadata_manager
125                .flow_route_manager()
126                .routes(flow_id)
127                .await?
128                .into_iter()
129                .map(|(_, value)| value.peer)
130                .collect::<Vec<_>>();
131            self.data.flow_id = Some(flow_id);
132            self.data.peers = peers;
133            info!("Replacing flow, flow_id: {}", flow_id);
134
135            let flow_info_value = self
136                .context
137                .flow_metadata_manager
138                .flow_info_manager()
139                .get_raw(flow_id)
140                .await?;
141
142            ensure!(
143                flow_info_value.is_some(),
144                error::FlowNotFoundSnafu {
145                    flow_name: format_full_flow_name(catalog_name, flow_name),
146                }
147            );
148
149            self.data.prev_flow_info_value = flow_info_value;
150        }
151
152        //  Ensures sink table doesn't exist.
153        let exists = self
154            .context
155            .table_metadata_manager
156            .table_name_manager()
157            .exists(TableNameKey::new(
158                &sink_table_name.catalog_name,
159                &sink_table_name.schema_name,
160                &sink_table_name.table_name,
161            ))
162            .await?;
163        // TODO(discord9): due to undefined behavior in flow's plan in how to transform types in mfp, sometime flow can't deduce correct schema
164        // and require manually create sink table
165        if exists {
166            common_telemetry::warn!("Table already exists, table: {}", sink_table_name);
167        }
168
169        self.collect_source_tables().await?;
170
171        // Validate that source and sink tables are not the same
172        let sink_table_name = &self.data.task.sink_table_name;
173        if self
174            .data
175            .task
176            .source_table_names
177            .iter()
178            .any(|source| source == sink_table_name)
179        {
180            return error::UnsupportedSnafu {
181                operation: format!(
182                    "Creating flow with source and sink table being the same: {}",
183                    sink_table_name
184                ),
185            }
186            .fail();
187        }
188
189        if self.data.flow_id.is_none() {
190            self.allocate_flow_id().await?;
191        }
192        self.data.state = CreateFlowState::CreateFlows;
193        // determine flow type
194        self.data.flow_type = Some(get_flow_type_from_options(&self.data.task)?);
195
196        Ok(Status::executing(true))
197    }
198
199    async fn on_flownode_create_flows(&mut self) -> Result<Status> {
200        // Safety: must be allocated.
201        let mut create_flow = Vec::with_capacity(self.data.peers.len());
202        for peer in &self.data.peers {
203            let requester = self.context.node_manager.flownode(peer).await;
204            let request = FlowRequest {
205                header: Some(FlowRequestHeader {
206                    tracing_context: TracingContext::from_current_span().to_w3c(),
207                    // Convert FlowQueryContext to QueryContext
208                    query_context: Some(QueryContext::from(self.data.flow_context.clone()).into()),
209                }),
210                body: Some(PbFlowRequest::Create((&self.data).into())),
211            };
212            create_flow.push(async move {
213                requester
214                    .handle(request)
215                    .await
216                    .map_err(add_peer_context_if_needed(peer.clone()))
217            });
218        }
219        info!(
220            "Creating flow({:?}, type={:?}) on flownodes with peers={:?}",
221            self.data.flow_id, self.data.flow_type, self.data.peers
222        );
223        join_all(create_flow)
224            .await
225            .into_iter()
226            .collect::<Result<Vec<_>>>()?;
227
228        self.data.state = CreateFlowState::CreateMetadata;
229        Ok(Status::executing(true))
230    }
231
232    /// Creates flow metadata.
233    ///
234    /// Abort(not-retry):
235    /// - Failed to create table metadata.
236    async fn on_create_metadata(&mut self) -> Result<Status> {
237        // Safety: The flow id must be allocated.
238        let flow_id = self.data.flow_id.unwrap();
239        let (flow_info, flow_routes) = (&self.data).into();
240        if let Some(prev_flow_value) = self.data.prev_flow_info_value.as_ref()
241            && self.data.task.or_replace
242        {
243            self.context
244                .flow_metadata_manager
245                .update_flow_metadata(flow_id, prev_flow_value, &flow_info, flow_routes)
246                .await?;
247            info!("Replaced flow metadata for flow {flow_id}");
248            self.data.did_replace = true;
249        } else {
250            self.context
251                .flow_metadata_manager
252                .create_flow_metadata(flow_id, flow_info, flow_routes)
253                .await?;
254            info!("Created flow metadata for flow {flow_id}");
255        }
256
257        self.data.state = CreateFlowState::InvalidateFlowCache;
258        Ok(Status::executing(true))
259    }
260
261    async fn on_broadcast(&mut self) -> Result<Status> {
262        debug_assert!(self.data.state == CreateFlowState::InvalidateFlowCache);
263        // Safety: The flow id must be allocated.
264        let flow_id = self.data.flow_id.unwrap();
265        let did_replace = self.data.did_replace;
266        let ctx = Context {
267            subject: Some("Invalidate flow cache by creating flow".to_string()),
268        };
269
270        let mut caches = vec![];
271
272        // if did replaced, invalidate the flow cache with drop the old flow
273        if did_replace {
274            let old_flow_info = self.data.prev_flow_info_value.as_ref().unwrap();
275
276            // only drop flow is needed, since flow name haven't changed, and flow id already invalidated below
277            caches.extend([CacheIdent::DropFlow(DropFlow {
278                flow_id,
279                source_table_ids: old_flow_info.source_table_ids.clone(),
280                flow_part2node_id: old_flow_info.flownode_ids().clone().into_iter().collect(),
281            })]);
282        }
283
284        let (_flow_info, flow_routes) = (&self.data).into();
285        let flow_part2peers = flow_routes
286            .into_iter()
287            .map(|(part_id, route)| (part_id, route.peer))
288            .collect();
289
290        caches.extend([
291            CacheIdent::CreateFlow(CreateFlow {
292                flow_id,
293                source_table_ids: self.data.source_table_ids.clone(),
294                partition_to_peer_mapping: flow_part2peers,
295            }),
296            CacheIdent::FlowId(flow_id),
297        ]);
298
299        self.context
300            .cache_invalidator
301            .invalidate(&ctx, &caches)
302            .await?;
303
304        Ok(Status::done_with_output(flow_id))
305    }
306}
307
308#[async_trait]
309impl Procedure for CreateFlowProcedure {
310    fn type_name(&self) -> &str {
311        Self::TYPE_NAME
312    }
313
314    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
315        let state = &self.data.state;
316
317        let _timer = metrics::METRIC_META_PROCEDURE_CREATE_FLOW
318            .with_label_values(&[state.as_ref()])
319            .start_timer();
320
321        match state {
322            CreateFlowState::Prepare => self.on_prepare().await,
323            CreateFlowState::CreateFlows => self.on_flownode_create_flows().await,
324            CreateFlowState::CreateMetadata => self.on_create_metadata().await,
325            CreateFlowState::InvalidateFlowCache => self.on_broadcast().await,
326        }
327        .map_err(map_to_procedure_error)
328    }
329
330    fn dump(&self) -> ProcedureResult<String> {
331        serde_json::to_string(&self.data).context(ToJsonSnafu)
332    }
333
334    fn lock_key(&self) -> LockKey {
335        let catalog_name = &self.data.task.catalog_name;
336        let flow_name = &self.data.task.flow_name;
337        let sink_table_name = &self.data.task.sink_table_name;
338
339        LockKey::new(vec![
340            CatalogLock::Read(catalog_name).into(),
341            TableNameLock::new(
342                &sink_table_name.catalog_name,
343                &sink_table_name.schema_name,
344                &sink_table_name.catalog_name,
345            )
346            .into(),
347            FlowNameLock::new(catalog_name, flow_name).into(),
348        ])
349    }
350}
351
352pub fn get_flow_type_from_options(flow_task: &CreateFlowTask) -> Result<FlowType> {
353    let flow_type = flow_task
354        .flow_options
355        .get(FlowType::FLOW_TYPE_KEY)
356        .map(|s| s.as_str());
357    match flow_type {
358        Some(FlowType::BATCHING) => Ok(FlowType::Batching),
359        Some(FlowType::STREAMING) => Ok(FlowType::Streaming),
360        Some(unknown) => UnexpectedSnafu {
361            err_msg: format!("Unknown flow type: {}", unknown),
362        }
363        .fail(),
364        None => Ok(FlowType::Batching),
365    }
366}
367
368/// The state of [CreateFlowProcedure].
369#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)]
370pub enum CreateFlowState {
371    /// Prepares to create the flow.
372    Prepare,
373    /// Creates flows on the flownode.
374    CreateFlows,
375    /// Invalidate flow cache.
376    InvalidateFlowCache,
377    /// Create metadata.
378    CreateMetadata,
379}
380
381/// The type of flow.
382#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
383pub enum FlowType {
384    /// The flow is a batching task.
385    Batching,
386    /// The flow is a streaming task.
387    Streaming,
388}
389
390impl FlowType {
391    pub const BATCHING: &str = "batching";
392    pub const STREAMING: &str = "streaming";
393    pub const FLOW_TYPE_KEY: &str = "flow_type";
394}
395
396impl Default for FlowType {
397    fn default() -> Self {
398        Self::Batching
399    }
400}
401
402impl fmt::Display for FlowType {
403    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
404        match self {
405            FlowType::Batching => write!(f, "{}", FlowType::BATCHING),
406            FlowType::Streaming => write!(f, "{}", FlowType::STREAMING),
407        }
408    }
409}
410
411/// The serializable data.
412#[derive(Debug, Serialize, Deserialize)]
413pub struct CreateFlowData {
414    pub(crate) state: CreateFlowState,
415    pub(crate) task: CreateFlowTask,
416    pub(crate) flow_id: Option<FlowId>,
417    pub(crate) peers: Vec<Peer>,
418    pub(crate) source_table_ids: Vec<TableId>,
419    /// Use alias for backward compatibility with QueryContext serialized data
420    #[serde(alias = "query_context")]
421    pub(crate) flow_context: FlowQueryContext,
422    /// For verify if prev value is consistent when need to update flow metadata.
423    /// only set when `or_replace` is true.
424    pub(crate) prev_flow_info_value: Option<DeserializedValueWithBytes<FlowInfoValue>>,
425    /// Only set to true when replace actually happened.
426    /// This is used to determine whether to invalidate the cache.
427    #[serde(default)]
428    pub(crate) did_replace: bool,
429    pub(crate) flow_type: Option<FlowType>,
430}
431
432impl From<&CreateFlowData> for CreateRequest {
433    fn from(value: &CreateFlowData) -> Self {
434        let flow_id = value.flow_id.unwrap();
435        let source_table_ids = &value.source_table_ids;
436
437        let mut req = CreateRequest {
438            flow_id: Some(api::v1::FlowId { id: flow_id }),
439            source_table_ids: source_table_ids
440                .iter()
441                .map(|table_id| api::v1::TableId { id: *table_id })
442                .collect_vec(),
443            sink_table_name: Some(value.task.sink_table_name.clone().into()),
444            // Always be true to ensure idempotent in case of retry
445            create_if_not_exists: true,
446            or_replace: value.task.or_replace,
447            expire_after: value.task.expire_after.map(|value| ExpireAfter { value }),
448            comment: value.task.comment.clone(),
449            sql: value.task.sql.clone(),
450            flow_options: value.task.flow_options.clone(),
451        };
452
453        let flow_type = value.flow_type.unwrap_or_default().to_string();
454        req.flow_options
455            .insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type);
456        req
457    }
458}
459
460impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteValue)>) {
461    fn from(value: &CreateFlowData) -> Self {
462        let CreateFlowTask {
463            catalog_name,
464            flow_name,
465            sink_table_name,
466            expire_after,
467            comment,
468            sql,
469            flow_options: mut options,
470            ..
471        } = value.task.clone();
472
473        let flownode_ids = value
474            .peers
475            .iter()
476            .enumerate()
477            .map(|(idx, peer)| (idx as u32, peer.id))
478            .collect::<BTreeMap<_, _>>();
479        let flow_routes = value
480            .peers
481            .iter()
482            .enumerate()
483            .map(|(idx, peer)| (idx as u32, FlowRouteValue { peer: peer.clone() }))
484            .collect::<Vec<_>>();
485
486        let flow_type = value.flow_type.unwrap_or_default().to_string();
487        options.insert("flow_type".to_string(), flow_type);
488
489        let mut create_time = chrono::Utc::now();
490        if let Some(prev_flow_value) = value.prev_flow_info_value.as_ref()
491            && value.task.or_replace
492        {
493            create_time = prev_flow_value.get_inner_ref().created_time;
494        }
495
496        let flow_info: FlowInfoValue = FlowInfoValue {
497            source_table_ids: value.source_table_ids.clone(),
498            sink_table_name,
499            flownode_ids,
500            catalog_name,
501            // Convert FlowQueryContext back to QueryContext for storage
502            query_context: Some(QueryContext::from(value.flow_context.clone())),
503            flow_name,
504            raw_sql: sql,
505            expire_after,
506            comment,
507            options,
508            created_time: create_time,
509            updated_time: chrono::Utc::now(),
510        };
511
512        (flow_info, flow_routes)
513    }
514}