common_meta/ddl/
create_flow.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod metadata;
16
17use std::collections::BTreeMap;
18use std::fmt;
19
20use api::v1::flow::flow_request::Body as PbFlowRequest;
21use api::v1::flow::{CreateRequest, FlowRequest, FlowRequestHeader};
22use api::v1::ExpireAfter;
23use async_trait::async_trait;
24use common_catalog::format_full_flow_name;
25use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
26use common_procedure::{
27    Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
28};
29use common_telemetry::info;
30use common_telemetry::tracing_context::TracingContext;
31use futures::future::join_all;
32use itertools::Itertools;
33use serde::{Deserialize, Serialize};
34use snafu::{ensure, ResultExt};
35use strum::AsRefStr;
36use table::metadata::TableId;
37
38use crate::cache_invalidator::Context;
39use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error};
40use crate::ddl::DdlContext;
41use crate::error::{self, Result, UnexpectedSnafu};
42use crate::instruction::{CacheIdent, CreateFlow};
43use crate::key::flow::flow_info::FlowInfoValue;
44use crate::key::flow::flow_route::FlowRouteValue;
45use crate::key::table_name::TableNameKey;
46use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId};
47use crate::lock_key::{CatalogLock, FlowNameLock, TableNameLock};
48use crate::metrics;
49use crate::peer::Peer;
50use crate::rpc::ddl::{CreateFlowTask, QueryContext};
51
52/// The procedure of flow creation.
53pub struct CreateFlowProcedure {
54    pub context: DdlContext,
55    pub data: CreateFlowData,
56}
57
58impl CreateFlowProcedure {
59    pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateFlow";
60
61    /// Returns a new [CreateFlowProcedure].
62    pub fn new(task: CreateFlowTask, query_context: QueryContext, context: DdlContext) -> Self {
63        Self {
64            context,
65            data: CreateFlowData {
66                task,
67                flow_id: None,
68                peers: vec![],
69                source_table_ids: vec![],
70                query_context,
71                state: CreateFlowState::Prepare,
72                prev_flow_info_value: None,
73                flow_type: None,
74            },
75        }
76    }
77
78    /// Deserializes from `json`.
79    pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
80        let data = serde_json::from_str(json).context(FromJsonSnafu)?;
81        Ok(CreateFlowProcedure { context, data })
82    }
83
84    pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
85        let catalog_name = &self.data.task.catalog_name;
86        let flow_name = &self.data.task.flow_name;
87        let sink_table_name = &self.data.task.sink_table_name;
88        let create_if_not_exists = self.data.task.create_if_not_exists;
89        let or_replace = self.data.task.or_replace;
90
91        let flow_name_value = self
92            .context
93            .flow_metadata_manager
94            .flow_name_manager()
95            .get(catalog_name, flow_name)
96            .await?;
97
98        if create_if_not_exists && or_replace {
99            // this is forbidden because not clear what does that mean exactly
100            return error::UnsupportedSnafu {
101                operation: "Create flow with both `IF NOT EXISTS` and `OR REPLACE`",
102            }
103            .fail();
104        }
105
106        if let Some(value) = flow_name_value {
107            ensure!(
108                create_if_not_exists || or_replace,
109                error::FlowAlreadyExistsSnafu {
110                    flow_name: format_full_flow_name(catalog_name, flow_name),
111                }
112            );
113
114            let flow_id = value.flow_id();
115            if create_if_not_exists {
116                info!("Flow already exists, flow_id: {}", flow_id);
117                return Ok(Status::done_with_output(flow_id));
118            }
119
120            let flow_id = value.flow_id();
121            let peers = self
122                .context
123                .flow_metadata_manager
124                .flow_route_manager()
125                .routes(flow_id)
126                .await?
127                .into_iter()
128                .map(|(_, value)| value.peer)
129                .collect::<Vec<_>>();
130            self.data.flow_id = Some(flow_id);
131            self.data.peers = peers;
132            info!("Replacing flow, flow_id: {}", flow_id);
133
134            let flow_info_value = self
135                .context
136                .flow_metadata_manager
137                .flow_info_manager()
138                .get_raw(flow_id)
139                .await?;
140
141            ensure!(
142                flow_info_value.is_some(),
143                error::FlowNotFoundSnafu {
144                    flow_name: format_full_flow_name(catalog_name, flow_name),
145                }
146            );
147
148            self.data.prev_flow_info_value = flow_info_value;
149        }
150
151        //  Ensures sink table doesn't exist.
152        let exists = self
153            .context
154            .table_metadata_manager
155            .table_name_manager()
156            .exists(TableNameKey::new(
157                &sink_table_name.catalog_name,
158                &sink_table_name.schema_name,
159                &sink_table_name.table_name,
160            ))
161            .await?;
162        // TODO(discord9): due to undefined behavior in flow's plan in how to transform types in mfp, sometime flow can't deduce correct schema
163        // and require manually create sink table
164        if exists {
165            common_telemetry::warn!("Table already exists, table: {}", sink_table_name);
166        }
167
168        self.collect_source_tables().await?;
169        if self.data.flow_id.is_none() {
170            self.allocate_flow_id().await?;
171        }
172        self.data.state = CreateFlowState::CreateFlows;
173        // determine flow type
174        self.data.flow_type = Some(get_flow_type_from_options(&self.data.task)?);
175
176        Ok(Status::executing(true))
177    }
178
179    async fn on_flownode_create_flows(&mut self) -> Result<Status> {
180        // Safety: must be allocated.
181        let mut create_flow = Vec::with_capacity(self.data.peers.len());
182        for peer in &self.data.peers {
183            let requester = self.context.node_manager.flownode(peer).await;
184            let request = FlowRequest {
185                header: Some(FlowRequestHeader {
186                    tracing_context: TracingContext::from_current_span().to_w3c(),
187                    query_context: Some(self.data.query_context.clone().into()),
188                }),
189                body: Some(PbFlowRequest::Create((&self.data).into())),
190            };
191            create_flow.push(async move {
192                requester
193                    .handle(request)
194                    .await
195                    .map_err(add_peer_context_if_needed(peer.clone()))
196            });
197        }
198        info!(
199            "Creating flow({:?}, type={:?}) on flownodes with peers={:?}",
200            self.data.flow_id, self.data.flow_type, self.data.peers
201        );
202        join_all(create_flow)
203            .await
204            .into_iter()
205            .collect::<Result<Vec<_>>>()?;
206
207        self.data.state = CreateFlowState::CreateMetadata;
208        Ok(Status::executing(true))
209    }
210
211    /// Creates flow metadata.
212    ///
213    /// Abort(not-retry):
214    /// - Failed to create table metadata.
215    async fn on_create_metadata(&mut self) -> Result<Status> {
216        // Safety: The flow id must be allocated.
217        let flow_id = self.data.flow_id.unwrap();
218        let (flow_info, flow_routes) = (&self.data).into();
219        if let Some(prev_flow_value) = self.data.prev_flow_info_value.as_ref()
220            && self.data.task.or_replace
221        {
222            self.context
223                .flow_metadata_manager
224                .update_flow_metadata(flow_id, prev_flow_value, &flow_info, flow_routes)
225                .await?;
226            info!("Replaced flow metadata for flow {flow_id}");
227        } else {
228            self.context
229                .flow_metadata_manager
230                .create_flow_metadata(flow_id, flow_info, flow_routes)
231                .await?;
232            info!("Created flow metadata for flow {flow_id}");
233        }
234
235        self.data.state = CreateFlowState::InvalidateFlowCache;
236        Ok(Status::executing(true))
237    }
238
239    async fn on_broadcast(&mut self) -> Result<Status> {
240        debug_assert!(self.data.state == CreateFlowState::InvalidateFlowCache);
241        // Safety: The flow id must be allocated.
242        let flow_id = self.data.flow_id.unwrap();
243        let ctx = Context {
244            subject: Some("Invalidate flow cache by creating flow".to_string()),
245        };
246
247        self.context
248            .cache_invalidator
249            .invalidate(
250                &ctx,
251                &[
252                    CacheIdent::CreateFlow(CreateFlow {
253                        source_table_ids: self.data.source_table_ids.clone(),
254                        flownodes: self.data.peers.clone(),
255                    }),
256                    CacheIdent::FlowId(flow_id),
257                ],
258            )
259            .await?;
260
261        Ok(Status::done_with_output(flow_id))
262    }
263}
264
265#[async_trait]
266impl Procedure for CreateFlowProcedure {
267    fn type_name(&self) -> &str {
268        Self::TYPE_NAME
269    }
270
271    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
272        let state = &self.data.state;
273
274        let _timer = metrics::METRIC_META_PROCEDURE_CREATE_FLOW
275            .with_label_values(&[state.as_ref()])
276            .start_timer();
277
278        match state {
279            CreateFlowState::Prepare => self.on_prepare().await,
280            CreateFlowState::CreateFlows => self.on_flownode_create_flows().await,
281            CreateFlowState::CreateMetadata => self.on_create_metadata().await,
282            CreateFlowState::InvalidateFlowCache => self.on_broadcast().await,
283        }
284        .map_err(handle_retry_error)
285    }
286
287    fn dump(&self) -> ProcedureResult<String> {
288        serde_json::to_string(&self.data).context(ToJsonSnafu)
289    }
290
291    fn lock_key(&self) -> LockKey {
292        let catalog_name = &self.data.task.catalog_name;
293        let flow_name = &self.data.task.flow_name;
294        let sink_table_name = &self.data.task.sink_table_name;
295
296        LockKey::new(vec![
297            CatalogLock::Read(catalog_name).into(),
298            TableNameLock::new(
299                &sink_table_name.catalog_name,
300                &sink_table_name.schema_name,
301                &sink_table_name.catalog_name,
302            )
303            .into(),
304            FlowNameLock::new(catalog_name, flow_name).into(),
305        ])
306    }
307}
308
309pub fn get_flow_type_from_options(flow_task: &CreateFlowTask) -> Result<FlowType> {
310    let flow_type = flow_task
311        .flow_options
312        .get(FlowType::FLOW_TYPE_KEY)
313        .map(|s| s.as_str());
314    match flow_type {
315        Some(FlowType::BATCHING) => Ok(FlowType::Batching),
316        Some(FlowType::STREAMING) => Ok(FlowType::Streaming),
317        Some(unknown) => UnexpectedSnafu {
318            err_msg: format!("Unknown flow type: {}", unknown),
319        }
320        .fail(),
321        None => Ok(FlowType::Batching),
322    }
323}
324
325/// The state of [CreateFlowProcedure].
326#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)]
327pub enum CreateFlowState {
328    /// Prepares to create the flow.
329    Prepare,
330    /// Creates flows on the flownode.
331    CreateFlows,
332    /// Invalidate flow cache.
333    InvalidateFlowCache,
334    /// Create metadata.
335    CreateMetadata,
336}
337
338/// The type of flow.
339#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
340pub enum FlowType {
341    /// The flow is a batching task.
342    Batching,
343    /// The flow is a streaming task.
344    Streaming,
345}
346
347impl FlowType {
348    pub const BATCHING: &str = "batching";
349    pub const STREAMING: &str = "streaming";
350    pub const FLOW_TYPE_KEY: &str = "flow_type";
351}
352
353impl Default for FlowType {
354    fn default() -> Self {
355        Self::Batching
356    }
357}
358
359impl fmt::Display for FlowType {
360    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
361        match self {
362            FlowType::Batching => write!(f, "{}", FlowType::BATCHING),
363            FlowType::Streaming => write!(f, "{}", FlowType::STREAMING),
364        }
365    }
366}
367
368/// The serializable data.
369#[derive(Debug, Serialize, Deserialize)]
370pub struct CreateFlowData {
371    pub(crate) state: CreateFlowState,
372    pub(crate) task: CreateFlowTask,
373    pub(crate) flow_id: Option<FlowId>,
374    pub(crate) peers: Vec<Peer>,
375    pub(crate) source_table_ids: Vec<TableId>,
376    pub(crate) query_context: QueryContext,
377    /// For verify if prev value is consistent when need to update flow metadata.
378    /// only set when `or_replace` is true.
379    pub(crate) prev_flow_info_value: Option<DeserializedValueWithBytes<FlowInfoValue>>,
380    pub(crate) flow_type: Option<FlowType>,
381}
382
383impl From<&CreateFlowData> for CreateRequest {
384    fn from(value: &CreateFlowData) -> Self {
385        let flow_id = value.flow_id.unwrap();
386        let source_table_ids = &value.source_table_ids;
387
388        let mut req = CreateRequest {
389            flow_id: Some(api::v1::FlowId { id: flow_id }),
390            source_table_ids: source_table_ids
391                .iter()
392                .map(|table_id| api::v1::TableId { id: *table_id })
393                .collect_vec(),
394            sink_table_name: Some(value.task.sink_table_name.clone().into()),
395            // Always be true to ensure idempotent in case of retry
396            create_if_not_exists: true,
397            or_replace: value.task.or_replace,
398            expire_after: value.task.expire_after.map(|value| ExpireAfter { value }),
399            comment: value.task.comment.clone(),
400            sql: value.task.sql.clone(),
401            flow_options: value.task.flow_options.clone(),
402        };
403
404        let flow_type = value.flow_type.unwrap_or_default().to_string();
405        req.flow_options
406            .insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type);
407        req
408    }
409}
410
411impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteValue)>) {
412    fn from(value: &CreateFlowData) -> Self {
413        let CreateFlowTask {
414            catalog_name,
415            flow_name,
416            sink_table_name,
417            expire_after,
418            comment,
419            sql,
420            flow_options: mut options,
421            ..
422        } = value.task.clone();
423
424        let flownode_ids = value
425            .peers
426            .iter()
427            .enumerate()
428            .map(|(idx, peer)| (idx as u32, peer.id))
429            .collect::<BTreeMap<_, _>>();
430        let flow_routes = value
431            .peers
432            .iter()
433            .enumerate()
434            .map(|(idx, peer)| (idx as u32, FlowRouteValue { peer: peer.clone() }))
435            .collect::<Vec<_>>();
436
437        let flow_type = value.flow_type.unwrap_or_default().to_string();
438        options.insert("flow_type".to_string(), flow_type);
439
440        let mut create_time = chrono::Utc::now();
441        if let Some(prev_flow_value) = value.prev_flow_info_value.as_ref()
442            && value.task.or_replace
443        {
444            create_time = prev_flow_value.get_inner_ref().created_time;
445        }
446
447        let flow_info: FlowInfoValue = FlowInfoValue {
448            source_table_ids: value.source_table_ids.clone(),
449            sink_table_name,
450            flownode_ids,
451            catalog_name,
452            query_context: Some(value.query_context.clone()),
453            flow_name,
454            raw_sql: sql,
455            expire_after,
456            comment,
457            options,
458            created_time: create_time,
459            updated_time: chrono::Utc::now(),
460        };
461
462        (flow_info, flow_routes)
463    }
464}