common_meta/ddl/
create_flow.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod metadata;
16
17use std::collections::BTreeMap;
18use std::fmt;
19
20use api::v1::flow::flow_request::Body as PbFlowRequest;
21use api::v1::flow::{CreateRequest, FlowRequest, FlowRequestHeader};
22use api::v1::ExpireAfter;
23use async_trait::async_trait;
24use common_catalog::format_full_flow_name;
25use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
26use common_procedure::{
27    Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
28};
29use common_telemetry::info;
30use common_telemetry::tracing_context::TracingContext;
31use futures::future::join_all;
32use itertools::Itertools;
33use serde::{Deserialize, Serialize};
34use snafu::{ensure, ResultExt};
35use strum::AsRefStr;
36use table::metadata::TableId;
37
38use crate::cache_invalidator::Context;
39use crate::ddl::utils::{add_peer_context_if_needed, map_to_procedure_error};
40use crate::ddl::DdlContext;
41use crate::error::{self, Result, UnexpectedSnafu};
42use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
43use crate::key::flow::flow_info::FlowInfoValue;
44use crate::key::flow::flow_route::FlowRouteValue;
45use crate::key::table_name::TableNameKey;
46use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId};
47use crate::lock_key::{CatalogLock, FlowNameLock, TableNameLock};
48use crate::metrics;
49use crate::peer::Peer;
50use crate::rpc::ddl::{CreateFlowTask, QueryContext};
51
52/// The procedure of flow creation.
53pub struct CreateFlowProcedure {
54    pub context: DdlContext,
55    pub data: CreateFlowData,
56}
57
58impl CreateFlowProcedure {
59    pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateFlow";
60
61    /// Returns a new [CreateFlowProcedure].
62    pub fn new(task: CreateFlowTask, query_context: QueryContext, context: DdlContext) -> Self {
63        Self {
64            context,
65            data: CreateFlowData {
66                task,
67                flow_id: None,
68                peers: vec![],
69                source_table_ids: vec![],
70                query_context,
71                state: CreateFlowState::Prepare,
72                prev_flow_info_value: None,
73                did_replace: false,
74                flow_type: None,
75            },
76        }
77    }
78
79    /// Deserializes from `json`.
80    pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
81        let data = serde_json::from_str(json).context(FromJsonSnafu)?;
82        Ok(CreateFlowProcedure { context, data })
83    }
84
85    pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
86        let catalog_name = &self.data.task.catalog_name;
87        let flow_name = &self.data.task.flow_name;
88        let sink_table_name = &self.data.task.sink_table_name;
89        let create_if_not_exists = self.data.task.create_if_not_exists;
90        let or_replace = self.data.task.or_replace;
91
92        let flow_name_value = self
93            .context
94            .flow_metadata_manager
95            .flow_name_manager()
96            .get(catalog_name, flow_name)
97            .await?;
98
99        if create_if_not_exists && or_replace {
100            // this is forbidden because not clear what does that mean exactly
101            return error::UnsupportedSnafu {
102                operation: "Create flow with both `IF NOT EXISTS` and `OR REPLACE`",
103            }
104            .fail();
105        }
106
107        if let Some(value) = flow_name_value {
108            ensure!(
109                create_if_not_exists || or_replace,
110                error::FlowAlreadyExistsSnafu {
111                    flow_name: format_full_flow_name(catalog_name, flow_name),
112                }
113            );
114
115            let flow_id = value.flow_id();
116            if create_if_not_exists {
117                info!("Flow already exists, flow_id: {}", flow_id);
118                return Ok(Status::done_with_output(flow_id));
119            }
120
121            let flow_id = value.flow_id();
122            let peers = self
123                .context
124                .flow_metadata_manager
125                .flow_route_manager()
126                .routes(flow_id)
127                .await?
128                .into_iter()
129                .map(|(_, value)| value.peer)
130                .collect::<Vec<_>>();
131            self.data.flow_id = Some(flow_id);
132            self.data.peers = peers;
133            info!("Replacing flow, flow_id: {}", flow_id);
134
135            let flow_info_value = self
136                .context
137                .flow_metadata_manager
138                .flow_info_manager()
139                .get_raw(flow_id)
140                .await?;
141
142            ensure!(
143                flow_info_value.is_some(),
144                error::FlowNotFoundSnafu {
145                    flow_name: format_full_flow_name(catalog_name, flow_name),
146                }
147            );
148
149            self.data.prev_flow_info_value = flow_info_value;
150        }
151
152        //  Ensures sink table doesn't exist.
153        let exists = self
154            .context
155            .table_metadata_manager
156            .table_name_manager()
157            .exists(TableNameKey::new(
158                &sink_table_name.catalog_name,
159                &sink_table_name.schema_name,
160                &sink_table_name.table_name,
161            ))
162            .await?;
163        // TODO(discord9): due to undefined behavior in flow's plan in how to transform types in mfp, sometime flow can't deduce correct schema
164        // and require manually create sink table
165        if exists {
166            common_telemetry::warn!("Table already exists, table: {}", sink_table_name);
167        }
168
169        self.collect_source_tables().await?;
170        if self.data.flow_id.is_none() {
171            self.allocate_flow_id().await?;
172        }
173        self.data.state = CreateFlowState::CreateFlows;
174        // determine flow type
175        self.data.flow_type = Some(get_flow_type_from_options(&self.data.task)?);
176
177        Ok(Status::executing(true))
178    }
179
180    async fn on_flownode_create_flows(&mut self) -> Result<Status> {
181        // Safety: must be allocated.
182        let mut create_flow = Vec::with_capacity(self.data.peers.len());
183        for peer in &self.data.peers {
184            let requester = self.context.node_manager.flownode(peer).await;
185            let request = FlowRequest {
186                header: Some(FlowRequestHeader {
187                    tracing_context: TracingContext::from_current_span().to_w3c(),
188                    query_context: Some(self.data.query_context.clone().into()),
189                }),
190                body: Some(PbFlowRequest::Create((&self.data).into())),
191            };
192            create_flow.push(async move {
193                requester
194                    .handle(request)
195                    .await
196                    .map_err(add_peer_context_if_needed(peer.clone()))
197            });
198        }
199        info!(
200            "Creating flow({:?}, type={:?}) on flownodes with peers={:?}",
201            self.data.flow_id, self.data.flow_type, self.data.peers
202        );
203        join_all(create_flow)
204            .await
205            .into_iter()
206            .collect::<Result<Vec<_>>>()?;
207
208        self.data.state = CreateFlowState::CreateMetadata;
209        Ok(Status::executing(true))
210    }
211
212    /// Creates flow metadata.
213    ///
214    /// Abort(not-retry):
215    /// - Failed to create table metadata.
216    async fn on_create_metadata(&mut self) -> Result<Status> {
217        // Safety: The flow id must be allocated.
218        let flow_id = self.data.flow_id.unwrap();
219        let (flow_info, flow_routes) = (&self.data).into();
220        if let Some(prev_flow_value) = self.data.prev_flow_info_value.as_ref()
221            && self.data.task.or_replace
222        {
223            self.context
224                .flow_metadata_manager
225                .update_flow_metadata(flow_id, prev_flow_value, &flow_info, flow_routes)
226                .await?;
227            info!("Replaced flow metadata for flow {flow_id}");
228            self.data.did_replace = true;
229        } else {
230            self.context
231                .flow_metadata_manager
232                .create_flow_metadata(flow_id, flow_info, flow_routes)
233                .await?;
234            info!("Created flow metadata for flow {flow_id}");
235        }
236
237        self.data.state = CreateFlowState::InvalidateFlowCache;
238        Ok(Status::executing(true))
239    }
240
241    async fn on_broadcast(&mut self) -> Result<Status> {
242        debug_assert!(self.data.state == CreateFlowState::InvalidateFlowCache);
243        // Safety: The flow id must be allocated.
244        let flow_id = self.data.flow_id.unwrap();
245        let did_replace = self.data.did_replace;
246        let ctx = Context {
247            subject: Some("Invalidate flow cache by creating flow".to_string()),
248        };
249
250        let mut caches = vec![];
251
252        // if did replaced, invalidate the flow cache with drop the old flow
253        if did_replace {
254            let old_flow_info = self.data.prev_flow_info_value.as_ref().unwrap();
255
256            // only drop flow is needed, since flow name haven't changed, and flow id already invalidated below
257            caches.extend([CacheIdent::DropFlow(DropFlow {
258                flow_id,
259                source_table_ids: old_flow_info.source_table_ids.clone(),
260                flow_part2node_id: old_flow_info.flownode_ids().clone().into_iter().collect(),
261            })]);
262        }
263
264        let (_flow_info, flow_routes) = (&self.data).into();
265        let flow_part2peers = flow_routes
266            .into_iter()
267            .map(|(part_id, route)| (part_id, route.peer))
268            .collect();
269
270        caches.extend([
271            CacheIdent::CreateFlow(CreateFlow {
272                flow_id,
273                source_table_ids: self.data.source_table_ids.clone(),
274                partition_to_peer_mapping: flow_part2peers,
275            }),
276            CacheIdent::FlowId(flow_id),
277        ]);
278
279        self.context
280            .cache_invalidator
281            .invalidate(&ctx, &caches)
282            .await?;
283
284        Ok(Status::done_with_output(flow_id))
285    }
286}
287
288#[async_trait]
289impl Procedure for CreateFlowProcedure {
290    fn type_name(&self) -> &str {
291        Self::TYPE_NAME
292    }
293
294    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
295        let state = &self.data.state;
296
297        let _timer = metrics::METRIC_META_PROCEDURE_CREATE_FLOW
298            .with_label_values(&[state.as_ref()])
299            .start_timer();
300
301        match state {
302            CreateFlowState::Prepare => self.on_prepare().await,
303            CreateFlowState::CreateFlows => self.on_flownode_create_flows().await,
304            CreateFlowState::CreateMetadata => self.on_create_metadata().await,
305            CreateFlowState::InvalidateFlowCache => self.on_broadcast().await,
306        }
307        .map_err(map_to_procedure_error)
308    }
309
310    fn dump(&self) -> ProcedureResult<String> {
311        serde_json::to_string(&self.data).context(ToJsonSnafu)
312    }
313
314    fn lock_key(&self) -> LockKey {
315        let catalog_name = &self.data.task.catalog_name;
316        let flow_name = &self.data.task.flow_name;
317        let sink_table_name = &self.data.task.sink_table_name;
318
319        LockKey::new(vec![
320            CatalogLock::Read(catalog_name).into(),
321            TableNameLock::new(
322                &sink_table_name.catalog_name,
323                &sink_table_name.schema_name,
324                &sink_table_name.catalog_name,
325            )
326            .into(),
327            FlowNameLock::new(catalog_name, flow_name).into(),
328        ])
329    }
330}
331
332pub fn get_flow_type_from_options(flow_task: &CreateFlowTask) -> Result<FlowType> {
333    let flow_type = flow_task
334        .flow_options
335        .get(FlowType::FLOW_TYPE_KEY)
336        .map(|s| s.as_str());
337    match flow_type {
338        Some(FlowType::BATCHING) => Ok(FlowType::Batching),
339        Some(FlowType::STREAMING) => Ok(FlowType::Streaming),
340        Some(unknown) => UnexpectedSnafu {
341            err_msg: format!("Unknown flow type: {}", unknown),
342        }
343        .fail(),
344        None => Ok(FlowType::Batching),
345    }
346}
347
348/// The state of [CreateFlowProcedure].
349#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)]
350pub enum CreateFlowState {
351    /// Prepares to create the flow.
352    Prepare,
353    /// Creates flows on the flownode.
354    CreateFlows,
355    /// Invalidate flow cache.
356    InvalidateFlowCache,
357    /// Create metadata.
358    CreateMetadata,
359}
360
361/// The type of flow.
362#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
363pub enum FlowType {
364    /// The flow is a batching task.
365    Batching,
366    /// The flow is a streaming task.
367    Streaming,
368}
369
370impl FlowType {
371    pub const BATCHING: &str = "batching";
372    pub const STREAMING: &str = "streaming";
373    pub const FLOW_TYPE_KEY: &str = "flow_type";
374}
375
376impl Default for FlowType {
377    fn default() -> Self {
378        Self::Batching
379    }
380}
381
382impl fmt::Display for FlowType {
383    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
384        match self {
385            FlowType::Batching => write!(f, "{}", FlowType::BATCHING),
386            FlowType::Streaming => write!(f, "{}", FlowType::STREAMING),
387        }
388    }
389}
390
391/// The serializable data.
392#[derive(Debug, Serialize, Deserialize)]
393pub struct CreateFlowData {
394    pub(crate) state: CreateFlowState,
395    pub(crate) task: CreateFlowTask,
396    pub(crate) flow_id: Option<FlowId>,
397    pub(crate) peers: Vec<Peer>,
398    pub(crate) source_table_ids: Vec<TableId>,
399    pub(crate) query_context: QueryContext,
400    /// For verify if prev value is consistent when need to update flow metadata.
401    /// only set when `or_replace` is true.
402    pub(crate) prev_flow_info_value: Option<DeserializedValueWithBytes<FlowInfoValue>>,
403    /// Only set to true when replace actually happened.
404    /// This is used to determine whether to invalidate the cache.
405    #[serde(default)]
406    pub(crate) did_replace: bool,
407    pub(crate) flow_type: Option<FlowType>,
408}
409
410impl From<&CreateFlowData> for CreateRequest {
411    fn from(value: &CreateFlowData) -> Self {
412        let flow_id = value.flow_id.unwrap();
413        let source_table_ids = &value.source_table_ids;
414
415        let mut req = CreateRequest {
416            flow_id: Some(api::v1::FlowId { id: flow_id }),
417            source_table_ids: source_table_ids
418                .iter()
419                .map(|table_id| api::v1::TableId { id: *table_id })
420                .collect_vec(),
421            sink_table_name: Some(value.task.sink_table_name.clone().into()),
422            // Always be true to ensure idempotent in case of retry
423            create_if_not_exists: true,
424            or_replace: value.task.or_replace,
425            expire_after: value.task.expire_after.map(|value| ExpireAfter { value }),
426            comment: value.task.comment.clone(),
427            sql: value.task.sql.clone(),
428            flow_options: value.task.flow_options.clone(),
429        };
430
431        let flow_type = value.flow_type.unwrap_or_default().to_string();
432        req.flow_options
433            .insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type);
434        req
435    }
436}
437
438impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteValue)>) {
439    fn from(value: &CreateFlowData) -> Self {
440        let CreateFlowTask {
441            catalog_name,
442            flow_name,
443            sink_table_name,
444            expire_after,
445            comment,
446            sql,
447            flow_options: mut options,
448            ..
449        } = value.task.clone();
450
451        let flownode_ids = value
452            .peers
453            .iter()
454            .enumerate()
455            .map(|(idx, peer)| (idx as u32, peer.id))
456            .collect::<BTreeMap<_, _>>();
457        let flow_routes = value
458            .peers
459            .iter()
460            .enumerate()
461            .map(|(idx, peer)| (idx as u32, FlowRouteValue { peer: peer.clone() }))
462            .collect::<Vec<_>>();
463
464        let flow_type = value.flow_type.unwrap_or_default().to_string();
465        options.insert("flow_type".to_string(), flow_type);
466
467        let mut create_time = chrono::Utc::now();
468        if let Some(prev_flow_value) = value.prev_flow_info_value.as_ref()
469            && value.task.or_replace
470        {
471            create_time = prev_flow_value.get_inner_ref().created_time;
472        }
473
474        let flow_info: FlowInfoValue = FlowInfoValue {
475            source_table_ids: value.source_table_ids.clone(),
476            sink_table_name,
477            flownode_ids,
478            catalog_name,
479            query_context: Some(value.query_context.clone()),
480            flow_name,
481            raw_sql: sql,
482            expire_after,
483            comment,
484            options,
485            created_time: create_time,
486            updated_time: chrono::Utc::now(),
487        };
488
489        (flow_info, flow_routes)
490    }
491}