common_meta/ddl/
create_flow.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod metadata;
16
17use std::collections::BTreeMap;
18use std::fmt;
19
20use api::v1::flow::flow_request::Body as PbFlowRequest;
21use api::v1::flow::{CreateRequest, FlowRequest, FlowRequestHeader};
22use api::v1::ExpireAfter;
23use async_trait::async_trait;
24use common_catalog::format_full_flow_name;
25use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
26use common_procedure::{
27    Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
28};
29use common_telemetry::info;
30use common_telemetry::tracing_context::TracingContext;
31use futures::future::join_all;
32use itertools::Itertools;
33use serde::{Deserialize, Serialize};
34use snafu::{ensure, ResultExt};
35use strum::AsRefStr;
36use table::metadata::TableId;
37
38use crate::cache_invalidator::Context;
39use crate::ddl::utils::{add_peer_context_if_needed, map_to_procedure_error};
40use crate::ddl::DdlContext;
41use crate::error::{self, Result, UnexpectedSnafu};
42use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
43use crate::key::flow::flow_info::FlowInfoValue;
44use crate::key::flow::flow_route::FlowRouteValue;
45use crate::key::table_name::TableNameKey;
46use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId};
47use crate::lock_key::{CatalogLock, FlowNameLock, TableNameLock};
48use crate::metrics;
49use crate::peer::Peer;
50use crate::rpc::ddl::{CreateFlowTask, QueryContext};
51
52/// The procedure of flow creation.
53pub struct CreateFlowProcedure {
54    pub context: DdlContext,
55    pub data: CreateFlowData,
56}
57
58impl CreateFlowProcedure {
59    pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateFlow";
60
61    /// Returns a new [CreateFlowProcedure].
62    pub fn new(task: CreateFlowTask, query_context: QueryContext, context: DdlContext) -> Self {
63        Self {
64            context,
65            data: CreateFlowData {
66                task,
67                flow_id: None,
68                peers: vec![],
69                source_table_ids: vec![],
70                query_context,
71                state: CreateFlowState::Prepare,
72                prev_flow_info_value: None,
73                did_replace: false,
74                flow_type: None,
75            },
76        }
77    }
78
79    /// Deserializes from `json`.
80    pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
81        let data = serde_json::from_str(json).context(FromJsonSnafu)?;
82        Ok(CreateFlowProcedure { context, data })
83    }
84
85    pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
86        let catalog_name = &self.data.task.catalog_name;
87        let flow_name = &self.data.task.flow_name;
88        let sink_table_name = &self.data.task.sink_table_name;
89        let create_if_not_exists = self.data.task.create_if_not_exists;
90        let or_replace = self.data.task.or_replace;
91
92        let flow_name_value = self
93            .context
94            .flow_metadata_manager
95            .flow_name_manager()
96            .get(catalog_name, flow_name)
97            .await?;
98
99        if create_if_not_exists && or_replace {
100            // this is forbidden because not clear what does that mean exactly
101            return error::UnsupportedSnafu {
102                operation: "Create flow with both `IF NOT EXISTS` and `OR REPLACE`",
103            }
104            .fail();
105        }
106
107        if let Some(value) = flow_name_value {
108            ensure!(
109                create_if_not_exists || or_replace,
110                error::FlowAlreadyExistsSnafu {
111                    flow_name: format_full_flow_name(catalog_name, flow_name),
112                }
113            );
114
115            let flow_id = value.flow_id();
116            if create_if_not_exists {
117                info!("Flow already exists, flow_id: {}", flow_id);
118                return Ok(Status::done_with_output(flow_id));
119            }
120
121            let flow_id = value.flow_id();
122            let peers = self
123                .context
124                .flow_metadata_manager
125                .flow_route_manager()
126                .routes(flow_id)
127                .await?
128                .into_iter()
129                .map(|(_, value)| value.peer)
130                .collect::<Vec<_>>();
131            self.data.flow_id = Some(flow_id);
132            self.data.peers = peers;
133            info!("Replacing flow, flow_id: {}", flow_id);
134
135            let flow_info_value = self
136                .context
137                .flow_metadata_manager
138                .flow_info_manager()
139                .get_raw(flow_id)
140                .await?;
141
142            ensure!(
143                flow_info_value.is_some(),
144                error::FlowNotFoundSnafu {
145                    flow_name: format_full_flow_name(catalog_name, flow_name),
146                }
147            );
148
149            self.data.prev_flow_info_value = flow_info_value;
150        }
151
152        //  Ensures sink table doesn't exist.
153        let exists = self
154            .context
155            .table_metadata_manager
156            .table_name_manager()
157            .exists(TableNameKey::new(
158                &sink_table_name.catalog_name,
159                &sink_table_name.schema_name,
160                &sink_table_name.table_name,
161            ))
162            .await?;
163        // TODO(discord9): due to undefined behavior in flow's plan in how to transform types in mfp, sometime flow can't deduce correct schema
164        // and require manually create sink table
165        if exists {
166            common_telemetry::warn!("Table already exists, table: {}", sink_table_name);
167        }
168
169        self.collect_source_tables().await?;
170
171        // Validate that source and sink tables are not the same
172        let sink_table_name = &self.data.task.sink_table_name;
173        if self
174            .data
175            .task
176            .source_table_names
177            .iter()
178            .any(|source| source == sink_table_name)
179        {
180            return error::UnsupportedSnafu {
181                operation: format!(
182                    "Creating flow with source and sink table being the same: {}",
183                    sink_table_name
184                ),
185            }
186            .fail();
187        }
188
189        if self.data.flow_id.is_none() {
190            self.allocate_flow_id().await?;
191        }
192        self.data.state = CreateFlowState::CreateFlows;
193        // determine flow type
194        self.data.flow_type = Some(get_flow_type_from_options(&self.data.task)?);
195
196        Ok(Status::executing(true))
197    }
198
199    async fn on_flownode_create_flows(&mut self) -> Result<Status> {
200        // Safety: must be allocated.
201        let mut create_flow = Vec::with_capacity(self.data.peers.len());
202        for peer in &self.data.peers {
203            let requester = self.context.node_manager.flownode(peer).await;
204            let request = FlowRequest {
205                header: Some(FlowRequestHeader {
206                    tracing_context: TracingContext::from_current_span().to_w3c(),
207                    query_context: Some(self.data.query_context.clone().into()),
208                }),
209                body: Some(PbFlowRequest::Create((&self.data).into())),
210            };
211            create_flow.push(async move {
212                requester
213                    .handle(request)
214                    .await
215                    .map_err(add_peer_context_if_needed(peer.clone()))
216            });
217        }
218        info!(
219            "Creating flow({:?}, type={:?}) on flownodes with peers={:?}",
220            self.data.flow_id, self.data.flow_type, self.data.peers
221        );
222        join_all(create_flow)
223            .await
224            .into_iter()
225            .collect::<Result<Vec<_>>>()?;
226
227        self.data.state = CreateFlowState::CreateMetadata;
228        Ok(Status::executing(true))
229    }
230
231    /// Creates flow metadata.
232    ///
233    /// Abort(not-retry):
234    /// - Failed to create table metadata.
235    async fn on_create_metadata(&mut self) -> Result<Status> {
236        // Safety: The flow id must be allocated.
237        let flow_id = self.data.flow_id.unwrap();
238        let (flow_info, flow_routes) = (&self.data).into();
239        if let Some(prev_flow_value) = self.data.prev_flow_info_value.as_ref()
240            && self.data.task.or_replace
241        {
242            self.context
243                .flow_metadata_manager
244                .update_flow_metadata(flow_id, prev_flow_value, &flow_info, flow_routes)
245                .await?;
246            info!("Replaced flow metadata for flow {flow_id}");
247            self.data.did_replace = true;
248        } else {
249            self.context
250                .flow_metadata_manager
251                .create_flow_metadata(flow_id, flow_info, flow_routes)
252                .await?;
253            info!("Created flow metadata for flow {flow_id}");
254        }
255
256        self.data.state = CreateFlowState::InvalidateFlowCache;
257        Ok(Status::executing(true))
258    }
259
260    async fn on_broadcast(&mut self) -> Result<Status> {
261        debug_assert!(self.data.state == CreateFlowState::InvalidateFlowCache);
262        // Safety: The flow id must be allocated.
263        let flow_id = self.data.flow_id.unwrap();
264        let did_replace = self.data.did_replace;
265        let ctx = Context {
266            subject: Some("Invalidate flow cache by creating flow".to_string()),
267        };
268
269        let mut caches = vec![];
270
271        // if did replaced, invalidate the flow cache with drop the old flow
272        if did_replace {
273            let old_flow_info = self.data.prev_flow_info_value.as_ref().unwrap();
274
275            // only drop flow is needed, since flow name haven't changed, and flow id already invalidated below
276            caches.extend([CacheIdent::DropFlow(DropFlow {
277                flow_id,
278                source_table_ids: old_flow_info.source_table_ids.clone(),
279                flow_part2node_id: old_flow_info.flownode_ids().clone().into_iter().collect(),
280            })]);
281        }
282
283        let (_flow_info, flow_routes) = (&self.data).into();
284        let flow_part2peers = flow_routes
285            .into_iter()
286            .map(|(part_id, route)| (part_id, route.peer))
287            .collect();
288
289        caches.extend([
290            CacheIdent::CreateFlow(CreateFlow {
291                flow_id,
292                source_table_ids: self.data.source_table_ids.clone(),
293                partition_to_peer_mapping: flow_part2peers,
294            }),
295            CacheIdent::FlowId(flow_id),
296        ]);
297
298        self.context
299            .cache_invalidator
300            .invalidate(&ctx, &caches)
301            .await?;
302
303        Ok(Status::done_with_output(flow_id))
304    }
305}
306
307#[async_trait]
308impl Procedure for CreateFlowProcedure {
309    fn type_name(&self) -> &str {
310        Self::TYPE_NAME
311    }
312
313    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
314        let state = &self.data.state;
315
316        let _timer = metrics::METRIC_META_PROCEDURE_CREATE_FLOW
317            .with_label_values(&[state.as_ref()])
318            .start_timer();
319
320        match state {
321            CreateFlowState::Prepare => self.on_prepare().await,
322            CreateFlowState::CreateFlows => self.on_flownode_create_flows().await,
323            CreateFlowState::CreateMetadata => self.on_create_metadata().await,
324            CreateFlowState::InvalidateFlowCache => self.on_broadcast().await,
325        }
326        .map_err(map_to_procedure_error)
327    }
328
329    fn dump(&self) -> ProcedureResult<String> {
330        serde_json::to_string(&self.data).context(ToJsonSnafu)
331    }
332
333    fn lock_key(&self) -> LockKey {
334        let catalog_name = &self.data.task.catalog_name;
335        let flow_name = &self.data.task.flow_name;
336        let sink_table_name = &self.data.task.sink_table_name;
337
338        LockKey::new(vec![
339            CatalogLock::Read(catalog_name).into(),
340            TableNameLock::new(
341                &sink_table_name.catalog_name,
342                &sink_table_name.schema_name,
343                &sink_table_name.catalog_name,
344            )
345            .into(),
346            FlowNameLock::new(catalog_name, flow_name).into(),
347        ])
348    }
349}
350
351pub fn get_flow_type_from_options(flow_task: &CreateFlowTask) -> Result<FlowType> {
352    let flow_type = flow_task
353        .flow_options
354        .get(FlowType::FLOW_TYPE_KEY)
355        .map(|s| s.as_str());
356    match flow_type {
357        Some(FlowType::BATCHING) => Ok(FlowType::Batching),
358        Some(FlowType::STREAMING) => Ok(FlowType::Streaming),
359        Some(unknown) => UnexpectedSnafu {
360            err_msg: format!("Unknown flow type: {}", unknown),
361        }
362        .fail(),
363        None => Ok(FlowType::Batching),
364    }
365}
366
367/// The state of [CreateFlowProcedure].
368#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)]
369pub enum CreateFlowState {
370    /// Prepares to create the flow.
371    Prepare,
372    /// Creates flows on the flownode.
373    CreateFlows,
374    /// Invalidate flow cache.
375    InvalidateFlowCache,
376    /// Create metadata.
377    CreateMetadata,
378}
379
380/// The type of flow.
381#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
382pub enum FlowType {
383    /// The flow is a batching task.
384    Batching,
385    /// The flow is a streaming task.
386    Streaming,
387}
388
389impl FlowType {
390    pub const BATCHING: &str = "batching";
391    pub const STREAMING: &str = "streaming";
392    pub const FLOW_TYPE_KEY: &str = "flow_type";
393}
394
395impl Default for FlowType {
396    fn default() -> Self {
397        Self::Batching
398    }
399}
400
401impl fmt::Display for FlowType {
402    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
403        match self {
404            FlowType::Batching => write!(f, "{}", FlowType::BATCHING),
405            FlowType::Streaming => write!(f, "{}", FlowType::STREAMING),
406        }
407    }
408}
409
410/// The serializable data.
411#[derive(Debug, Serialize, Deserialize)]
412pub struct CreateFlowData {
413    pub(crate) state: CreateFlowState,
414    pub(crate) task: CreateFlowTask,
415    pub(crate) flow_id: Option<FlowId>,
416    pub(crate) peers: Vec<Peer>,
417    pub(crate) source_table_ids: Vec<TableId>,
418    pub(crate) query_context: QueryContext,
419    /// For verify if prev value is consistent when need to update flow metadata.
420    /// only set when `or_replace` is true.
421    pub(crate) prev_flow_info_value: Option<DeserializedValueWithBytes<FlowInfoValue>>,
422    /// Only set to true when replace actually happened.
423    /// This is used to determine whether to invalidate the cache.
424    #[serde(default)]
425    pub(crate) did_replace: bool,
426    pub(crate) flow_type: Option<FlowType>,
427}
428
429impl From<&CreateFlowData> for CreateRequest {
430    fn from(value: &CreateFlowData) -> Self {
431        let flow_id = value.flow_id.unwrap();
432        let source_table_ids = &value.source_table_ids;
433
434        let mut req = CreateRequest {
435            flow_id: Some(api::v1::FlowId { id: flow_id }),
436            source_table_ids: source_table_ids
437                .iter()
438                .map(|table_id| api::v1::TableId { id: *table_id })
439                .collect_vec(),
440            sink_table_name: Some(value.task.sink_table_name.clone().into()),
441            // Always be true to ensure idempotent in case of retry
442            create_if_not_exists: true,
443            or_replace: value.task.or_replace,
444            expire_after: value.task.expire_after.map(|value| ExpireAfter { value }),
445            comment: value.task.comment.clone(),
446            sql: value.task.sql.clone(),
447            flow_options: value.task.flow_options.clone(),
448        };
449
450        let flow_type = value.flow_type.unwrap_or_default().to_string();
451        req.flow_options
452            .insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type);
453        req
454    }
455}
456
457impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteValue)>) {
458    fn from(value: &CreateFlowData) -> Self {
459        let CreateFlowTask {
460            catalog_name,
461            flow_name,
462            sink_table_name,
463            expire_after,
464            comment,
465            sql,
466            flow_options: mut options,
467            ..
468        } = value.task.clone();
469
470        let flownode_ids = value
471            .peers
472            .iter()
473            .enumerate()
474            .map(|(idx, peer)| (idx as u32, peer.id))
475            .collect::<BTreeMap<_, _>>();
476        let flow_routes = value
477            .peers
478            .iter()
479            .enumerate()
480            .map(|(idx, peer)| (idx as u32, FlowRouteValue { peer: peer.clone() }))
481            .collect::<Vec<_>>();
482
483        let flow_type = value.flow_type.unwrap_or_default().to_string();
484        options.insert("flow_type".to_string(), flow_type);
485
486        let mut create_time = chrono::Utc::now();
487        if let Some(prev_flow_value) = value.prev_flow_info_value.as_ref()
488            && value.task.or_replace
489        {
490            create_time = prev_flow_value.get_inner_ref().created_time;
491        }
492
493        let flow_info: FlowInfoValue = FlowInfoValue {
494            source_table_ids: value.source_table_ids.clone(),
495            sink_table_name,
496            flownode_ids,
497            catalog_name,
498            query_context: Some(value.query_context.clone()),
499            flow_name,
500            raw_sql: sql,
501            expire_after,
502            comment,
503            options,
504            created_time: create_time,
505            updated_time: chrono::Utc::now(),
506        };
507
508        (flow_info, flow_routes)
509    }
510}