Skip to main content

common_meta/ddl/
create_flow.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod metadata;
16
17use std::collections::{BTreeMap, HashMap};
18use std::fmt;
19
20use api::v1::ExpireAfter;
21use api::v1::flow::flow_request::Body as PbFlowRequest;
22use api::v1::flow::{CreateRequest, FlowRequest, FlowRequestHeader};
23use async_trait::async_trait;
24use common_catalog::format_full_flow_name;
25use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
26use common_procedure::{
27    Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
28};
29use common_telemetry::info;
30use common_telemetry::tracing_context::TracingContext;
31use futures::future::join_all;
32use itertools::Itertools;
33use serde::{Deserialize, Serialize};
34use snafu::{ResultExt, ensure};
35use strum::AsRefStr;
36use table::metadata::TableId;
37use table::table_name::TableName;
38
39use crate::cache_invalidator::Context;
40use crate::ddl::DdlContext;
41use crate::ddl::utils::{add_peer_context_if_needed, map_to_procedure_error};
42use crate::error::{self, Result, UnexpectedSnafu};
43use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
44use crate::key::flow::flow_info::{FlowInfoValue, FlowStatus};
45use crate::key::flow::flow_route::FlowRouteValue;
46use crate::key::table_name::TableNameKey;
47use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId};
48use crate::lock_key::{CatalogLock, FlowNameLock};
49use crate::metrics;
50use crate::peer::Peer;
51use crate::rpc::ddl::{CreateFlowTask, FlowQueryContext, QueryContext};
52
53/// The procedure of flow creation.
54pub struct CreateFlowProcedure {
55    pub context: DdlContext,
56    pub data: CreateFlowData,
57}
58
59impl CreateFlowProcedure {
60    pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateFlow";
61
62    /// Returns a new [CreateFlowProcedure].
63    pub fn new(task: CreateFlowTask, query_context: QueryContext, context: DdlContext) -> Self {
64        Self {
65            context,
66            data: CreateFlowData {
67                task,
68                flow_id: None,
69                peers: vec![],
70                source_table_ids: vec![],
71                unresolved_source_table_names: vec![],
72                flow_context: query_context.into(), // Convert to FlowQueryContext
73                state: CreateFlowState::Prepare,
74                prev_flow_info_value: None,
75                did_replace: false,
76                flow_type: None,
77            },
78        }
79    }
80
81    /// Deserializes from `json`.
82    pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
83        let data = serde_json::from_str(json).context(FromJsonSnafu)?;
84        Ok(CreateFlowProcedure { context, data })
85    }
86
87    pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
88        let catalog_name = &self.data.task.catalog_name;
89        let flow_name = &self.data.task.flow_name;
90        let sink_table_name = &self.data.task.sink_table_name;
91        let create_if_not_exists = self.data.task.create_if_not_exists;
92        let or_replace = self.data.task.or_replace;
93
94        validate_flow_options(&self.data.task)?;
95
96        let flow_name_value = self
97            .context
98            .flow_metadata_manager
99            .flow_name_manager()
100            .get(catalog_name, flow_name)
101            .await?;
102
103        if create_if_not_exists && or_replace {
104            // this is forbidden because not clear what does that mean exactly
105            return error::UnsupportedSnafu {
106                operation: "Create flow with both `IF NOT EXISTS` and `OR REPLACE`",
107            }
108            .fail();
109        }
110
111        if let Some(value) = flow_name_value {
112            ensure!(
113                create_if_not_exists || or_replace,
114                error::FlowAlreadyExistsSnafu {
115                    flow_name: format_full_flow_name(catalog_name, flow_name),
116                }
117            );
118
119            let flow_id = value.flow_id();
120            if create_if_not_exists {
121                info!("Flow already exists, flow_id: {}", flow_id);
122                return Ok(Status::done_with_output(flow_id));
123            }
124
125            let flow_id = value.flow_id();
126            let peers = self
127                .context
128                .flow_metadata_manager
129                .flow_route_manager()
130                .routes(flow_id)
131                .await?
132                .into_iter()
133                .map(|(_, value)| value.peer)
134                .collect::<Vec<_>>();
135            self.data.flow_id = Some(flow_id);
136            self.data.peers = peers;
137            info!("Replacing flow, flow_id: {}", flow_id);
138
139            let flow_info_value = self
140                .context
141                .flow_metadata_manager
142                .flow_info_manager()
143                .get_raw(flow_id)
144                .await?;
145
146            ensure!(
147                flow_info_value.is_some(),
148                error::FlowNotFoundSnafu {
149                    flow_name: format_full_flow_name(catalog_name, flow_name),
150                }
151            );
152
153            self.data.prev_flow_info_value = flow_info_value;
154        }
155
156        //  Ensures sink table doesn't exist.
157        let exists = self
158            .context
159            .table_metadata_manager
160            .table_name_manager()
161            .exists(TableNameKey::new(
162                &sink_table_name.catalog_name,
163                &sink_table_name.schema_name,
164                &sink_table_name.table_name,
165            ))
166            .await?;
167        // TODO(discord9): due to undefined behavior in flow's plan in how to transform types in mfp, sometime flow can't deduce correct schema
168        // and require manually create sink table
169        if exists {
170            common_telemetry::warn!("Table already exists, table: {}", sink_table_name);
171        }
172
173        self.collect_source_tables().await?;
174        ensure!(
175            self.data.unresolved_source_table_names.is_empty()
176                || defer_on_missing_source(&self.data.task)?,
177            error::UnsupportedSnafu {
178                operation: format!(
179                    "Create flow with missing source tables requires WITH ('{DEFER_ON_MISSING_SOURCE_KEY}'='true'): {}",
180                    self.data
181                        .unresolved_source_table_names
182                        .iter()
183                        .map(ToString::to_string)
184                        .join(", ")
185                )
186            }
187        );
188        self.ensure_supported_replace_transition()?;
189
190        // Validate that source and sink tables are not the same
191        let sink_table_name = &self.data.task.sink_table_name;
192        if self
193            .data
194            .task
195            .source_table_names
196            .iter()
197            .any(|source| source == sink_table_name)
198        {
199            return error::UnsupportedSnafu {
200                operation: format!(
201                    "Creating flow with source and sink table being the same: {}",
202                    sink_table_name
203                ),
204            }
205            .fail();
206        }
207
208        if self.data.flow_id.is_none() {
209            self.allocate_flow_id().await?;
210        }
211        self.data.flow_type = Some(get_flow_type_from_options(&self.data.task)?);
212
213        self.data.state = if self.data.is_pending() {
214            self.data.peers.clear();
215            CreateFlowState::CreateMetadata
216        } else {
217            CreateFlowState::CreateFlows
218        };
219
220        Ok(Status::executing(true))
221    }
222
223    fn ensure_supported_replace_transition(&self) -> Result<()> {
224        if !self.data.task.or_replace {
225            return Ok(());
226        }
227
228        let Some(prev_flow_info) = self.data.prev_flow_info_value.as_ref() else {
229            return Ok(());
230        };
231        let prev_pending = prev_flow_info.get_inner_ref().is_pending();
232        let new_pending = self.data.is_pending();
233        ensure!(
234            prev_pending == new_pending,
235            error::UnsupportedSnafu {
236                operation: "Replacing between pending and active flow states is not supported yet"
237            }
238        );
239
240        Ok(())
241    }
242
243    async fn on_flownode_create_flows(&mut self) -> Result<Status> {
244        // Safety: must be allocated.
245        let mut create_flow = Vec::with_capacity(self.data.peers.len());
246        for peer in &self.data.peers {
247            let requester = self.context.node_manager.flownode(peer).await;
248            let request = FlowRequest {
249                header: Some(FlowRequestHeader {
250                    tracing_context: TracingContext::from_current_span().to_w3c(),
251                    // Convert FlowQueryContext to QueryContext
252                    query_context: Some(QueryContext::from(self.data.flow_context.clone()).into()),
253                }),
254                body: Some(PbFlowRequest::Create((&self.data).into())),
255            };
256            create_flow.push(async move {
257                requester
258                    .handle(request)
259                    .await
260                    .map_err(add_peer_context_if_needed(peer.clone()))
261            });
262        }
263        info!(
264            "Creating flow({:?}, type={:?}) on flownodes with peers={:?}",
265            self.data.flow_id, self.data.flow_type, self.data.peers
266        );
267        join_all(create_flow)
268            .await
269            .into_iter()
270            .collect::<Result<Vec<_>>>()?;
271
272        self.data.state = CreateFlowState::CreateMetadata;
273        Ok(Status::executing(true))
274    }
275
276    /// Creates flow metadata.
277    ///
278    /// Abort(not-retry):
279    /// - Failed to create table metadata.
280    async fn on_create_metadata(&mut self) -> Result<Status> {
281        // Safety: The flow id must be allocated.
282        let flow_id = self.data.flow_id.unwrap();
283        let (flow_info, flow_routes) = (&self.data).into();
284        if let Some(prev_flow_value) = self.data.prev_flow_info_value.as_ref()
285            && self.data.task.or_replace
286        {
287            self.context
288                .flow_metadata_manager
289                .update_flow_metadata(flow_id, prev_flow_value, &flow_info, flow_routes)
290                .await?;
291            info!("Replaced flow metadata for flow {flow_id}");
292            self.data.did_replace = true;
293        } else {
294            self.context
295                .flow_metadata_manager
296                .create_flow_metadata(flow_id, flow_info, flow_routes)
297                .await?;
298            info!("Created flow metadata for flow {flow_id}");
299        }
300
301        self.data.state = CreateFlowState::InvalidateFlowCache;
302        Ok(Status::executing(true))
303    }
304
305    async fn on_broadcast(&mut self) -> Result<Status> {
306        debug_assert!(self.data.state == CreateFlowState::InvalidateFlowCache);
307        // Safety: The flow id must be allocated.
308        let flow_id = self.data.flow_id.unwrap();
309        let did_replace = self.data.did_replace;
310        let ctx = Context {
311            subject: Some("Invalidate flow cache by creating flow".to_string()),
312        };
313
314        let mut caches = vec![];
315
316        // if did replaced, invalidate the flow cache with drop the old flow
317        if did_replace {
318            let old_flow_info = self.data.prev_flow_info_value.as_ref().unwrap();
319
320            // only drop flow is needed, since flow name haven't changed, and flow id already invalidated below
321            caches.extend([CacheIdent::DropFlow(DropFlow {
322                flow_id,
323                source_table_ids: old_flow_info.source_table_ids.clone(),
324                flow_part2node_id: old_flow_info.flownode_ids().clone().into_iter().collect(),
325            })]);
326        }
327
328        let (_flow_info, flow_routes) = (&self.data).into();
329        let flow_part2peers = flow_routes
330            .into_iter()
331            .map(|(part_id, route)| (part_id, route.peer))
332            .collect();
333
334        caches.extend([
335            CacheIdent::CreateFlow(CreateFlow {
336                flow_id,
337                source_table_ids: self.data.source_table_ids.clone(),
338                partition_to_peer_mapping: flow_part2peers,
339            }),
340            CacheIdent::FlowId(flow_id),
341        ]);
342
343        self.context
344            .cache_invalidator
345            .invalidate(&ctx, &caches)
346            .await?;
347
348        Ok(Status::done_with_output(flow_id))
349    }
350}
351
352#[async_trait]
353impl Procedure for CreateFlowProcedure {
354    fn type_name(&self) -> &str {
355        Self::TYPE_NAME
356    }
357
358    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
359        let state = &self.data.state;
360
361        let _timer = metrics::METRIC_META_PROCEDURE_CREATE_FLOW
362            .with_label_values(&[state.as_ref()])
363            .start_timer();
364
365        match state {
366            CreateFlowState::Prepare => self.on_prepare().await,
367            CreateFlowState::CreateFlows => self.on_flownode_create_flows().await,
368            CreateFlowState::CreateMetadata => self.on_create_metadata().await,
369            CreateFlowState::InvalidateFlowCache => self.on_broadcast().await,
370        }
371        .map_err(map_to_procedure_error)
372    }
373
374    fn dump(&self) -> ProcedureResult<String> {
375        serde_json::to_string(&self.data).context(ToJsonSnafu)
376    }
377
378    fn lock_key(&self) -> LockKey {
379        let catalog_name = &self.data.task.catalog_name;
380        let flow_name = &self.data.task.flow_name;
381
382        LockKey::new(vec![
383            CatalogLock::Read(catalog_name).into(),
384            FlowNameLock::new(catalog_name, flow_name).into(),
385        ])
386    }
387}
388
389pub fn get_flow_type_from_options(flow_task: &CreateFlowTask) -> Result<FlowType> {
390    let flow_type = flow_task
391        .flow_options
392        .get(FlowType::FLOW_TYPE_KEY)
393        .map(|s| s.as_str());
394    match flow_type {
395        Some(FlowType::BATCHING) => Ok(FlowType::Batching),
396        Some(FlowType::STREAMING) => Ok(FlowType::Streaming),
397        Some(unknown) => UnexpectedSnafu {
398            err_msg: format!("Unknown flow type: {}", unknown),
399        }
400        .fail(),
401        None => Ok(FlowType::Batching),
402    }
403}
404
405/// The flow option key for creating pending flow metadata when source tables do not exist.
406pub const DEFER_ON_MISSING_SOURCE_KEY: &str = "defer_on_missing_source";
407
408pub fn defer_on_missing_source(flow_task: &CreateFlowTask) -> Result<bool> {
409    flow_task
410        .flow_options
411        .get(DEFER_ON_MISSING_SOURCE_KEY)
412        .map(|value| {
413            value
414                .trim()
415                .to_ascii_lowercase()
416                .parse::<bool>()
417                .map_err(|_| {
418                    error::UnexpectedSnafu {
419                        err_msg: format!(
420                            "Invalid flow option '{DEFER_ON_MISSING_SOURCE_KEY}': {value}"
421                        ),
422                    }
423                    .build()
424                })
425        })
426        .transpose()
427        .map(|value| value.unwrap_or(false))
428}
429
430pub fn validate_flow_options(flow_task: &CreateFlowTask) -> Result<()> {
431    for key in flow_task.flow_options.keys() {
432        match key.as_str() {
433            DEFER_ON_MISSING_SOURCE_KEY
434            | FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY
435            | FlowType::FLOW_TYPE_KEY => {}
436            unknown => {
437                return UnexpectedSnafu {
438                    err_msg: format!(
439                        "Unknown flow option '{unknown}', supported user options: {DEFER_ON_MISSING_SOURCE_KEY}, {FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY}"
440                    ),
441                }
442                .fail();
443            }
444        }
445    }
446
447    defer_on_missing_source(flow_task)?;
448    get_flow_type_from_options(flow_task)?;
449    Ok(())
450}
451
452fn user_runtime_flow_options(options: &HashMap<String, String>) -> HashMap<String, String> {
453    let mut options = options.clone();
454    options.remove(DEFER_ON_MISSING_SOURCE_KEY);
455    options
456}
457
458fn metadata_flow_options(options: &HashMap<String, String>) -> HashMap<String, String> {
459    options.clone()
460}
461
462/// The state of [CreateFlowProcedure].
463#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)]
464pub enum CreateFlowState {
465    /// Prepares to create the flow.
466    Prepare,
467    /// Creates flows on the flownode.
468    CreateFlows,
469    /// Invalidate flow cache.
470    InvalidateFlowCache,
471    /// Create metadata.
472    CreateMetadata,
473}
474
475/// The type of flow.
476#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
477pub enum FlowType {
478    /// The flow is a batching task.
479    #[default]
480    Batching,
481    /// The flow is a streaming task.
482    Streaming,
483}
484
485pub const FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY: &str =
486    "experimental_enable_incremental_read";
487
488impl FlowType {
489    pub const BATCHING: &str = "batching";
490    pub const STREAMING: &str = "streaming";
491    pub const FLOW_TYPE_KEY: &str = "flow_type";
492}
493
494impl fmt::Display for FlowType {
495    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
496        match self {
497            FlowType::Batching => write!(f, "{}", FlowType::BATCHING),
498            FlowType::Streaming => write!(f, "{}", FlowType::STREAMING),
499        }
500    }
501}
502
503/// The serializable data.
504#[derive(Debug, Serialize, Deserialize)]
505pub struct CreateFlowData {
506    pub(crate) state: CreateFlowState,
507    pub(crate) task: CreateFlowTask,
508    pub(crate) flow_id: Option<FlowId>,
509    pub(crate) peers: Vec<Peer>,
510    pub(crate) source_table_ids: Vec<TableId>,
511    #[serde(default)]
512    pub(crate) unresolved_source_table_names: Vec<TableName>,
513    /// Use alias for backward compatibility with QueryContext serialized data
514    #[serde(alias = "query_context")]
515    pub(crate) flow_context: FlowQueryContext,
516    /// For verify if prev value is consistent when need to update flow metadata.
517    /// only set when `or_replace` is true.
518    pub(crate) prev_flow_info_value: Option<DeserializedValueWithBytes<FlowInfoValue>>,
519    /// Only set to true when replace actually happened.
520    /// This is used to determine whether to invalidate the cache.
521    #[serde(default)]
522    pub(crate) did_replace: bool,
523    pub(crate) flow_type: Option<FlowType>,
524}
525
526impl CreateFlowData {
527    pub(crate) fn is_pending(&self) -> bool {
528        !self.unresolved_source_table_names.is_empty()
529    }
530
531    pub(crate) fn is_active(&self) -> bool {
532        !self.is_pending()
533    }
534}
535
536impl From<&CreateFlowData> for CreateRequest {
537    fn from(value: &CreateFlowData) -> Self {
538        let flow_id = value.flow_id.unwrap();
539        let source_table_ids = &value.source_table_ids;
540
541        let mut req = CreateRequest {
542            flow_id: Some(api::v1::FlowId { id: flow_id }),
543            source_table_ids: source_table_ids
544                .iter()
545                .map(|table_id| api::v1::TableId { id: *table_id })
546                .collect_vec(),
547            sink_table_name: Some(value.task.sink_table_name.clone().into()),
548            // Always be true to ensure idempotent in case of retry
549            create_if_not_exists: true,
550            or_replace: value.task.or_replace,
551            expire_after: value.task.expire_after.map(|value| ExpireAfter { value }),
552            eval_interval: value
553                .task
554                .eval_interval_secs
555                .map(|seconds| api::v1::EvalInterval { seconds }),
556            comment: value.task.comment.clone(),
557            sql: value.task.sql.clone(),
558            flow_options: user_runtime_flow_options(&value.task.flow_options),
559        };
560
561        let flow_type = value.flow_type.unwrap_or_default().to_string();
562        req.flow_options
563            .insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type);
564        req
565    }
566}
567
568impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteValue)>) {
569    fn from(value: &CreateFlowData) -> Self {
570        let CreateFlowTask {
571            catalog_name,
572            flow_name,
573            sink_table_name,
574            expire_after,
575            eval_interval_secs: eval_interval,
576            comment,
577            sql,
578            ..
579        } = value.task.clone();
580        let mut options = metadata_flow_options(&value.task.flow_options);
581
582        let flownode_ids = value
583            .peers
584            .iter()
585            .enumerate()
586            .map(|(idx, peer)| (idx as u32, peer.id))
587            .collect::<BTreeMap<_, _>>();
588        let flow_routes = value
589            .peers
590            .iter()
591            .enumerate()
592            .map(|(idx, peer)| (idx as u32, FlowRouteValue { peer: peer.clone() }))
593            .collect::<Vec<_>>();
594
595        let flow_type = value.flow_type.unwrap_or_default().to_string();
596        options.insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type);
597
598        let mut create_time = chrono::Utc::now();
599        if let Some(prev_flow_value) = value.prev_flow_info_value.as_ref()
600            && value.task.or_replace
601        {
602            create_time = prev_flow_value.get_inner_ref().created_time;
603        }
604
605        let flow_info: FlowInfoValue = FlowInfoValue {
606            source_table_ids: value.source_table_ids.clone(),
607            all_source_table_names: value.task.source_table_names.clone(),
608            unresolved_source_table_names: value.unresolved_source_table_names.clone(),
609            sink_table_name,
610            flownode_ids,
611            catalog_name,
612            // Convert FlowQueryContext back to QueryContext for storage
613            query_context: Some(QueryContext::from(value.flow_context.clone())),
614            flow_name,
615            raw_sql: sql,
616            expire_after,
617            eval_interval_secs: eval_interval,
618            comment,
619            options,
620            status: if value.is_active() {
621                FlowStatus::Active
622            } else {
623                FlowStatus::PendingSources
624            },
625            created_time: create_time,
626            updated_time: chrono::Utc::now(),
627        };
628
629        (flow_info, flow_routes)
630    }
631}