Skip to main content

flow/adapter/
flownode_impl.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! impl `FlowNode` trait for FlowNodeManager so standalone can call them
16use std::collections::{HashMap, HashSet};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19
20use api::v1::flow::{
21    CreateRequest, DirtyWindowRequests, DropRequest, FlowRequest, FlowResponse, FlushFlow,
22    flow_request,
23};
24use api::v1::region::InsertRequests;
25use catalog::CatalogManager;
26use common_base::Plugins;
27use common_error::ext::BoxedError;
28use common_meta::ddl::create_flow::{
29    FlowType, INTERNAL_EVAL_SCHEDULE_KEY, effective_eval_schedule_from_flow_info,
30};
31use common_meta::error::Result as MetaResult;
32use common_meta::key::flow::FlowMetadataManager;
33use common_meta::key::flow::flow_info::FlowScheduleConfig;
34use common_meta::key::flow::flow_state::FlowStat;
35use common_runtime::JoinHandle;
36use common_telemetry::{error, info, trace, warn};
37use datatypes::value::Value;
38use futures::TryStreamExt;
39use itertools::Itertools;
40use operator::utils::try_to_session_query_context;
41use session::context::QueryContextBuilder;
42use snafu::{IntoError, OptionExt, ResultExt, ensure};
43use store_api::storage::{RegionId, TableId};
44use tokio::sync::{Mutex, RwLock};
45
46use crate::adapter::{CreateFlowArgs, StreamingEngine};
47use crate::batching_mode::engine::BatchingEngine;
48use crate::engine::{FlowEngine, FlowStatProvider};
49use crate::error::{
50    CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, FlowNotRecoveredSnafu,
51    IllegalCheckTaskStateSnafu, InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu,
52    NoAvailableFrontendSnafu, SyncCheckTaskSnafu, UnexpectedSnafu, UnsupportedSnafu,
53};
54use crate::metrics::{METRIC_FLOW_ROWS, METRIC_FLOW_TASK_COUNT};
55use crate::repr::{self, DiffRow};
56use crate::utils::StateReportHandler;
57use crate::{Error, FlowId};
58
59/// Ref to [`FlowDualEngine`]
60pub type FlowDualEngineRef = Arc<FlowDualEngine>;
61
62/// Manage both streaming and batching mode engine
63///
64/// including create/drop/flush flow
65/// and redirect insert requests to the appropriate engine
66pub struct FlowDualEngine {
67    streaming_engine: Arc<StreamingEngine>,
68    batching_engine: Arc<BatchingEngine>,
69    /// receive a oneshot sender to send state report
70    state_report_handler: RwLock<Option<StateReportHandler>>,
71    /// helper struct for faster query flow by table id or vice versa
72    src_table2flow: RwLock<SrcTableToFlow>,
73    flow_metadata_manager: Arc<FlowMetadataManager>,
74    catalog_manager: Arc<dyn CatalogManager>,
75    check_task: tokio::sync::Mutex<Option<ConsistentCheckTask>>,
76    plugins: Plugins,
77    done_recovering: AtomicBool,
78}
79
80impl FlowDualEngine {
81    pub fn new(
82        streaming_engine: Arc<StreamingEngine>,
83        batching_engine: Arc<BatchingEngine>,
84        flow_metadata_manager: Arc<FlowMetadataManager>,
85        catalog_manager: Arc<dyn CatalogManager>,
86        plugins: Plugins,
87    ) -> Self {
88        Self {
89            streaming_engine,
90            batching_engine,
91            state_report_handler: Default::default(),
92            src_table2flow: RwLock::new(SrcTableToFlow::default()),
93            flow_metadata_manager,
94            catalog_manager,
95            check_task: Mutex::new(None),
96            plugins,
97            done_recovering: AtomicBool::new(false),
98        }
99    }
100
101    /// Set `done_recovering` to true
102    /// indicate that we are ready to handle requests
103    pub fn set_done_recovering(&self) {
104        info!("FlowDualEngine done recovering");
105        self.done_recovering
106            .store(true, std::sync::atomic::Ordering::Release);
107    }
108
109    /// Check if `done_recovering` is true
110    pub fn is_recover_done(&self) -> bool {
111        self.done_recovering
112            .load(std::sync::atomic::Ordering::Acquire)
113    }
114
115    /// wait for recovering to be done, this will only happen when flownode just started
116    async fn wait_for_all_flow_recover(&self, waiting_req_cnt: usize) -> Result<(), Error> {
117        if self.is_recover_done() {
118            return Ok(());
119        }
120
121        warn!(
122            "FlowDualEngine is not done recovering, {} insert request waiting for recovery",
123            waiting_req_cnt
124        );
125        // wait 3 seconds, check every 1 second
126        // TODO(discord9): make this configurable
127        let mut retry = 0;
128        let max_retry = 3;
129        while retry < max_retry && !self.is_recover_done() {
130            warn!(
131                "FlowDualEngine is not done recovering, retry {} in 1s",
132                retry
133            );
134            tokio::time::sleep(std::time::Duration::from_secs(1)).await;
135            retry += 1;
136        }
137        if retry == max_retry {
138            return FlowNotRecoveredSnafu.fail();
139        } else {
140            info!("FlowDualEngine is done recovering");
141        }
142        // TODO(discord9): also put to centralized logging for flow once it implemented
143        Ok(())
144    }
145
146    pub fn plugins(&self) -> &Plugins {
147        &self.plugins
148    }
149
150    /// Determine if the engine is in distributed mode
151    pub fn is_distributed(&self) -> bool {
152        self.streaming_engine.node_id.is_some()
153    }
154
155    pub fn streaming_engine(&self) -> Arc<StreamingEngine> {
156        self.streaming_engine.clone()
157    }
158
159    pub fn batching_engine(&self) -> Arc<BatchingEngine> {
160        self.batching_engine.clone()
161    }
162
163    pub async fn set_state_report_handler(&self, handler: StateReportHandler) {
164        *self.state_report_handler.write().await = Some(handler);
165    }
166
167    pub async fn gen_state_report(&self) -> FlowStat {
168        let streaming = self.streaming_engine.flow_stat().await;
169        let batching = self.batching_engine.flow_stat().await;
170
171        let mut state_size = streaming.state_size;
172        state_size.extend(batching.state_size);
173
174        let mut last_exec_time_map = streaming.last_exec_time_map;
175        last_exec_time_map.extend(batching.last_exec_time_map);
176
177        FlowStat {
178            state_size,
179            last_exec_time_map,
180        }
181    }
182
183    /// Start state report task, which receives a sender from heartbeat task and sends report back.
184    ///
185    /// if heartbeat task is shutdown, this future exits too.
186    pub async fn start_state_report_task(self: Arc<Self>) -> Option<JoinHandle<()>> {
187        let state_report_handler = self.state_report_handler.write().await.take();
188        if let Some(mut handler) = state_report_handler {
189            let zelf = self.clone();
190            let handler = common_runtime::spawn_global(async move {
191                while let Some(ret_handler) = handler.recv().await {
192                    let state_report = zelf.gen_state_report().await;
193                    ret_handler.send(state_report).unwrap_or_else(|err| {
194                        common_telemetry::error!(err; "Send state report error");
195                    });
196                }
197            });
198            Some(handler)
199        } else {
200            None
201        }
202    }
203
204    /// In distributed mode, scan periodically(1s) until all advertised frontends
205    /// accept unauthenticated queries, or timeout. In standalone mode, return
206    /// immediately.
207    async fn wait_for_available_frontend(&self, timeout: std::time::Duration) -> Result<(), Error> {
208        if !self.is_distributed() {
209            return Ok(());
210        }
211        let frontend_client = self.batching_engine().frontend_client.clone();
212        let sleep_duration = std::time::Duration::from_millis(1_000);
213        let now = std::time::Instant::now();
214        loop {
215            let frontend_list = frontend_client.scan_for_frontend().await?;
216            if !frontend_list.is_empty() {
217                let fe_list = frontend_list
218                    .iter()
219                    .map(|peer| &peer.addr)
220                    .collect::<Vec<_>>();
221                let probe_failures = frontend_client
222                    .check_all_frontends_without_auth(&frontend_list)
223                    .await?;
224                if probe_failures.is_empty() {
225                    info!(
226                        "Available frontend found and unauthenticated probe succeeded: {:?}",
227                        fe_list
228                    );
229                    return Ok(());
230                }
231                warn!(
232                    "Unauthenticated frontend probe failed, will retry. frontends={:?}, failures={:?}",
233                    fe_list, probe_failures
234                );
235            }
236            let elapsed = now.elapsed();
237            tokio::time::sleep(sleep_duration).await;
238            info!("Waiting for available frontend, elapsed={:?}", elapsed);
239            if elapsed >= timeout {
240                return NoAvailableFrontendSnafu {
241                    timeout,
242                    context: "No frontend accepted unauthenticated flownode probe",
243                }
244                .fail();
245            }
246        }
247    }
248
249    /// Try to sync with check task, this is only used in drop flow&flush flow, so a flow id is required
250    ///
251    /// the need to sync is to make sure flush flow actually get called
252    async fn try_sync_with_check_task(
253        &self,
254        flow_id: FlowId,
255        allow_drop: bool,
256    ) -> Result<(), Error> {
257        // this function rarely get called so adding some log is helpful
258        info!("Try to sync with check task for flow {}", flow_id);
259        let mut retry = 0;
260        let max_retry = 10;
261        // keep trying to trigger consistent check
262        while retry < max_retry {
263            if let Some(task) = self.check_task.lock().await.as_ref() {
264                task.trigger(false, allow_drop).await?;
265                break;
266            }
267            retry += 1;
268            tokio::time::sleep(std::time::Duration::from_millis(500)).await;
269        }
270
271        if retry == max_retry {
272            error!(
273                "Can't sync with check task for flow {} with allow_drop={}",
274                flow_id, allow_drop
275            );
276            return SyncCheckTaskSnafu {
277                flow_id,
278                allow_drop,
279            }
280            .fail();
281        }
282        info!("Successfully sync with check task for flow {}", flow_id);
283
284        Ok(())
285    }
286
287    /// Spawn a task to consistently check if all flow tasks in metasrv is created on flownode,
288    /// so on startup, this will create all missing flow tasks, and constantly check at a interval
289    async fn check_flow_consistent(
290        &self,
291        allow_create: bool,
292        allow_drop: bool,
293    ) -> Result<(), Error> {
294        // use nodeid to determine if this is standalone/distributed mode, and retrieve all flows in this node(in distributed mode)/or all flows(in standalone mode)
295        let nodeid = self.streaming_engine.node_id;
296        let should_exists: Vec<_> = if let Some(nodeid) = nodeid {
297            // nodeid is available, so we only need to check flows on this node
298            // which also means we are in distributed mode
299            let to_be_recover = self
300                .flow_metadata_manager
301                .flownode_flow_manager()
302                .flows(nodeid.into())
303                .try_collect::<Vec<_>>()
304                .await
305                .context(ListFlowsSnafu {
306                    id: Some(nodeid.into()),
307                })?;
308            to_be_recover.into_iter().map(|(id, _)| id).collect()
309        } else {
310            // nodeid is not available, so we need to check all flows
311            // which also means we are in standalone mode
312            let all_catalogs = self
313                .catalog_manager
314                .catalog_names()
315                .await
316                .map_err(BoxedError::new)
317                .context(ExternalSnafu)?;
318            let mut all_flow_ids = vec![];
319            for catalog in all_catalogs {
320                let flows = self
321                    .flow_metadata_manager
322                    .flow_name_manager()
323                    .flow_names(&catalog)
324                    .await
325                    .try_collect::<Vec<_>>()
326                    .await
327                    .map_err(BoxedError::new)
328                    .context(ExternalSnafu)?;
329
330                all_flow_ids.extend(flows.into_iter().map(|(_, id)| id.flow_id()));
331            }
332            all_flow_ids
333        };
334        let should_exists = should_exists
335            .into_iter()
336            .map(|i| i as FlowId)
337            .collect::<HashSet<_>>();
338        let actual_exists = self.list_flows().await?.into_iter().collect::<HashSet<_>>();
339        let to_be_created = should_exists
340            .iter()
341            .filter(|id| !actual_exists.contains(id))
342            .collect::<Vec<_>>();
343        let to_be_dropped = actual_exists
344            .iter()
345            .filter(|id| !should_exists.contains(id))
346            .collect::<Vec<_>>();
347
348        if !to_be_created.is_empty() {
349            if allow_create {
350                info!(
351                    "Recovering {} flows: {:?}",
352                    to_be_created.len(),
353                    to_be_created
354                );
355                let mut errors = vec![];
356                for flow_id in to_be_created.clone() {
357                    let flow_id = *flow_id;
358                    let info = self
359                        .flow_metadata_manager
360                        .flow_info_manager()
361                        .get(flow_id as u32)
362                        .await
363                        .map_err(BoxedError::new)
364                        .context(ExternalSnafu)?
365                        .context(FlowNotFoundSnafu { id: flow_id })?;
366
367                    let sink_table_name = [
368                        info.sink_table_name().catalog_name.clone(),
369                        info.sink_table_name().schema_name.clone(),
370                        info.sink_table_name().table_name.clone(),
371                    ];
372                    let args = CreateFlowArgs {
373                        flow_id,
374                        sink_table_name,
375                        source_table_ids: info.source_table_ids().to_vec(),
376                        // because recover should only happen on restart the `create_if_not_exists` and `or_replace` can be arbitrary value(since flow doesn't exist)
377                        // but for the sake of consistency and to make sure recover of flow actually happen, we set both to true
378                        // (which is also fine since checks for not allow both to be true is on metasrv and we already pass that)
379                        create_if_not_exists: true,
380                        or_replace: true,
381                        expire_after: info.expire_after(),
382                        eval_interval: info.eval_interval(),
383                        comment: Some(info.comment().clone()),
384                        sql: info.raw_sql().clone(),
385                        flow_options: info.options().clone(),
386                        eval_schedule: effective_eval_schedule_from_flow_info(&info),
387                        query_ctx: info
388                            .query_context()
389                            .clone()
390                            .map(|ctx| {
391                                try_to_session_query_context(ctx)
392                                    .map_err(BoxedError::new)
393                                    .context(ExternalSnafu)
394                            })
395                            .transpose()?
396                            // or use default QueryContext with catalog_name from info
397                            // to keep compatibility with old version
398                            .or_else(|| {
399                                Some(
400                                    QueryContextBuilder::default()
401                                        .current_catalog(info.catalog_name().clone())
402                                        .build(),
403                                )
404                            }),
405                    };
406                    if let Err(err) = self
407                        .create_flow(args)
408                        .await
409                        .map_err(BoxedError::new)
410                        .with_context(|_| CreateFlowSnafu {
411                            sql: info.raw_sql().clone(),
412                        })
413                    {
414                        errors.push((flow_id, err));
415                    }
416                }
417                if errors.is_empty() {
418                    info!("Recover flows successfully, flows: {:?}", to_be_created);
419                }
420
421                for (flow_id, err) in errors {
422                    warn!("Failed to recreate flow {}, err={:#?}", flow_id, err);
423                }
424            } else {
425                warn!(
426                    "Flows do not exist in flownode for node {:?}, flow_ids={:?}",
427                    nodeid, to_be_created
428                );
429            }
430        }
431        if !to_be_dropped.is_empty() {
432            if allow_drop {
433                info!("Dropping flows: {:?}", to_be_dropped);
434                let mut errors = vec![];
435                for flow_id in to_be_dropped {
436                    let flow_id = *flow_id;
437                    if let Err(err) = self.remove_flow(flow_id).await {
438                        errors.push((flow_id, err));
439                    }
440                }
441                for (flow_id, err) in errors {
442                    warn!("Failed to drop flow {}, err={:#?}", flow_id, err);
443                }
444            } else {
445                warn!(
446                    "Flows do not exist in metadata for node {:?}, flow_ids={:?}",
447                    nodeid, to_be_dropped
448                );
449            }
450        }
451        Ok(())
452    }
453
454    // TODO(discord9): consider sync this with heartbeat(might become necessary in the future)
455    pub async fn start_flow_consistent_check_task(self: &Arc<Self>) -> Result<(), Error> {
456        let mut check_task = self.check_task.lock().await;
457        ensure!(
458            check_task.is_none(),
459            IllegalCheckTaskStateSnafu {
460                reason: "Flow consistent check task already exists",
461            }
462        );
463        let task = ConsistentCheckTask::start_check_task(self).await?;
464        *check_task = Some(task);
465        Ok(())
466    }
467
468    pub async fn stop_flow_consistent_check_task(&self) -> Result<(), Error> {
469        info!("Stopping flow consistent check task");
470        let mut check_task = self.check_task.lock().await;
471
472        ensure!(
473            check_task.is_some(),
474            IllegalCheckTaskStateSnafu {
475                reason: "Flow consistent check task does not exist",
476            }
477        );
478
479        check_task.take().unwrap().stop().await?;
480        info!("Stopped flow consistent check task");
481        Ok(())
482    }
483
484    /// Reconciles in-memory flow tasks from persisted metadata.
485    pub async fn reconcile_flows_from_metadata(&self) -> Result<(), Error> {
486        self.check_flow_consistent(true, true).await
487    }
488
489    /// TODO(discord9): also add a `exists` api using flow metadata manager's `exists` method
490    async fn flow_exist_in_metadata(&self, flow_id: FlowId) -> Result<bool, Error> {
491        self.flow_metadata_manager
492            .flow_info_manager()
493            .get(flow_id as u32)
494            .await
495            .map_err(BoxedError::new)
496            .context(ExternalSnafu)
497            .map(|info| info.is_some())
498    }
499}
500
501struct ConsistentCheckTask {
502    handle: JoinHandle<()>,
503    shutdown_tx: tokio::sync::mpsc::Sender<()>,
504    trigger_tx: tokio::sync::mpsc::Sender<(bool, bool, tokio::sync::oneshot::Sender<()>)>,
505}
506
507impl ConsistentCheckTask {
508    async fn start_check_task(engine: &Arc<FlowDualEngine>) -> Result<Self, Error> {
509        let engine = engine.clone();
510        let min_refresh_duration = engine
511            .batching_engine()
512            .batch_opts
513            .experimental_min_refresh_duration;
514        let frontend_scan_timeout = engine
515            .batching_engine()
516            .batch_opts
517            .experimental_frontend_scan_timeout;
518        engine
519            .wait_for_available_frontend(frontend_scan_timeout)
520            .await?;
521        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
522        let (trigger_tx, mut trigger_rx) =
523            tokio::sync::mpsc::channel::<(bool, bool, tokio::sync::oneshot::Sender<()>)>(10);
524        let handle = common_runtime::spawn_global(async move {
525            // Recover flows after the startup frontend probe succeeds.
526            let mut recover_retry = 0;
527            while let Err(err) = engine.check_flow_consistent(true, false).await {
528                recover_retry += 1;
529                error!(
530                    "Failed to recover flows:\n {err:?}, retry {} in {}s",
531                    recover_retry,
532                    min_refresh_duration.as_secs()
533                );
534                tokio::time::sleep(min_refresh_duration).await;
535            }
536
537            engine.set_done_recovering();
538
539            // then do check flows, with configurable allow_create and allow_drop
540            let (mut allow_create, mut allow_drop) = (false, false);
541            let mut ret_signal: Option<tokio::sync::oneshot::Sender<()>> = None;
542            loop {
543                if let Err(err) = engine.check_flow_consistent(allow_create, allow_drop).await {
544                    error!(err; "Failed to check flow consistent");
545                }
546                if let Some(done) = ret_signal.take() {
547                    let _ = done.send(());
548                }
549                tokio::select! {
550                    _ = rx.recv() => break,
551                    incoming = trigger_rx.recv() => if let Some(incoming) = incoming {
552                        (allow_create, allow_drop) = (incoming.0, incoming.1);
553                        ret_signal = Some(incoming.2);
554                    },
555                    _ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
556                        (allow_create, allow_drop) = (false, false);
557                    },
558                }
559            }
560        });
561        Ok(ConsistentCheckTask {
562            handle,
563            shutdown_tx: tx,
564            trigger_tx,
565        })
566    }
567
568    async fn trigger(&self, allow_create: bool, allow_drop: bool) -> Result<(), Error> {
569        let (tx, rx) = tokio::sync::oneshot::channel();
570        self.trigger_tx
571            .send((allow_create, allow_drop, tx))
572            .await
573            .map_err(|_| {
574                IllegalCheckTaskStateSnafu {
575                    reason: "Failed to send trigger signal",
576                }
577                .build()
578            })?;
579        rx.await.map_err(|_| {
580            IllegalCheckTaskStateSnafu {
581                reason: "Failed to receive trigger signal",
582            }
583            .build()
584        })?;
585        Ok(())
586    }
587
588    async fn stop(self) -> Result<(), Error> {
589        self.shutdown_tx.send(()).await.map_err(|_| {
590            IllegalCheckTaskStateSnafu {
591                reason: "Failed to send shutdown signal",
592            }
593            .build()
594        })?;
595        // abort so no need to wait
596        self.handle.abort();
597        Ok(())
598    }
599}
600
601#[derive(Default)]
602struct SrcTableToFlow {
603    /// mapping of table ids to flow ids for streaming mode
604    stream: HashMap<TableId, HashSet<FlowId>>,
605    /// mapping of table ids to flow ids for batching mode
606    batch: HashMap<TableId, HashSet<FlowId>>,
607    /// mapping of flow ids to (flow type, source table ids)
608    flow_infos: HashMap<FlowId, (FlowType, Vec<TableId>)>,
609}
610
611impl SrcTableToFlow {
612    fn in_stream(&self, table_id: TableId) -> bool {
613        self.stream.contains_key(&table_id)
614    }
615    fn in_batch(&self, table_id: TableId) -> bool {
616        self.batch.contains_key(&table_id)
617    }
618    fn add_flow(&mut self, flow_id: FlowId, flow_type: FlowType, src_table_ids: Vec<TableId>) {
619        let mapping = match flow_type {
620            FlowType::Streaming => &mut self.stream,
621            FlowType::Batching => &mut self.batch,
622        };
623
624        for src_table in src_table_ids.clone() {
625            mapping
626                .entry(src_table)
627                .and_modify(|flows| {
628                    flows.insert(flow_id);
629                })
630                .or_insert_with(|| {
631                    let mut set = HashSet::new();
632                    set.insert(flow_id);
633                    set
634                });
635        }
636        self.flow_infos.insert(flow_id, (flow_type, src_table_ids));
637    }
638
639    fn remove_flow(&mut self, flow_id: FlowId) {
640        let mapping = match self.get_flow_type(flow_id) {
641            Some(FlowType::Streaming) => &mut self.stream,
642            Some(FlowType::Batching) => &mut self.batch,
643            None => return,
644        };
645        if let Some((_, src_table_ids)) = self.flow_infos.remove(&flow_id) {
646            for src_table in src_table_ids {
647                if let Some(flows) = mapping.get_mut(&src_table) {
648                    flows.remove(&flow_id);
649                }
650            }
651        }
652    }
653
654    fn get_flow_type(&self, flow_id: FlowId) -> Option<FlowType> {
655        self.flow_infos
656            .get(&flow_id)
657            .map(|(flow_type, _)| flow_type)
658            .cloned()
659    }
660}
661
662impl FlowEngine for FlowDualEngine {
663    async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
664        let flow_type = args
665            .flow_options
666            .get(FlowType::FLOW_TYPE_KEY)
667            .map(|s| s.as_str());
668
669        let flow_type = match flow_type {
670            Some(FlowType::BATCHING) => FlowType::Batching,
671            Some(FlowType::STREAMING) => FlowType::Streaming,
672            None => FlowType::Batching,
673            Some(flow_type) => {
674                return InternalSnafu {
675                    reason: format!("Invalid flow type: {}", flow_type),
676                }
677                .fail();
678            }
679        };
680
681        let flow_id = args.flow_id;
682        let src_table_ids = args.source_table_ids.clone();
683
684        let res = match flow_type {
685            FlowType::Batching => self.batching_engine.create_flow(args).await,
686            FlowType::Streaming => self.streaming_engine.create_flow(args).await,
687        }?;
688
689        self.src_table2flow
690            .write()
691            .await
692            .add_flow(flow_id, flow_type, src_table_ids);
693
694        Ok(res)
695    }
696
697    async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
698        let flow_type = self.src_table2flow.read().await.get_flow_type(flow_id);
699
700        match flow_type {
701            Some(FlowType::Batching) => self.batching_engine.remove_flow(flow_id).await,
702            Some(FlowType::Streaming) => self.streaming_engine.remove_flow(flow_id).await,
703            None => {
704                // this can happen if flownode just restart, and is stilling creating the flow
705                // since now that this flow should dropped, we need to trigger the consistent check and allow drop
706                // this rely on drop flow ddl delete metadata first, see src/common/meta/src/ddl/drop_flow.rs
707                warn!(
708                    "Flow {} is not exist in the underlying engine, but exist in metadata",
709                    flow_id
710                );
711                self.try_sync_with_check_task(flow_id, true).await?;
712
713                Ok(())
714            }
715        }?;
716        // remove mapping
717        self.src_table2flow.write().await.remove_flow(flow_id);
718        Ok(())
719    }
720
721    async fn flush_flow(&self, flow_id: FlowId) -> Result<usize, Error> {
722        // sync with check task
723        self.try_sync_with_check_task(flow_id, false).await?;
724        let flow_type = self.src_table2flow.read().await.get_flow_type(flow_id);
725        match flow_type {
726            Some(FlowType::Batching) => self.batching_engine.flush_flow(flow_id).await,
727            Some(FlowType::Streaming) => self.streaming_engine.flush_flow(flow_id).await,
728            None => {
729                warn!(
730                    "Currently flow={flow_id} doesn't exist in flownode, ignore flush_flow request"
731                );
732                Ok(0)
733            }
734        }
735    }
736
737    async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error> {
738        let flow_type = self.src_table2flow.read().await.get_flow_type(flow_id);
739        // not using `flow_type.is_some()` to make sure the flow is actually exist in the underlying engine
740        match flow_type {
741            Some(FlowType::Batching) => self.batching_engine.flow_exist(flow_id).await,
742            Some(FlowType::Streaming) => self.streaming_engine.flow_exist(flow_id).await,
743            None => Ok(false),
744        }
745    }
746
747    async fn list_flows(&self) -> Result<impl IntoIterator<Item = FlowId>, Error> {
748        let stream_flows = self.streaming_engine.list_flows().await?;
749        let batch_flows = self.batching_engine.list_flows().await?;
750
751        Ok(stream_flows.into_iter().chain(batch_flows))
752    }
753
754    async fn handle_flow_inserts(
755        &self,
756        request: api::v1::region::InsertRequests,
757    ) -> Result<(), Error> {
758        self.wait_for_all_flow_recover(request.requests.len())
759            .await?;
760        // TODO(discord9): make as little clone as possible
761        let mut to_stream_engine = Vec::with_capacity(request.requests.len());
762        let mut to_batch_engine = request.requests;
763
764        let mut batching_row_cnt = 0;
765        let mut streaming_row_cnt = 0;
766
767        {
768            // not locking this, or recover flows will be starved when also handling flow inserts
769            let src_table2flow = self.src_table2flow.read().await;
770            to_batch_engine.retain(|req| {
771                let region_id = RegionId::from(req.region_id);
772                let table_id = region_id.table_id();
773                let is_in_stream = src_table2flow.in_stream(table_id);
774                let is_in_batch = src_table2flow.in_batch(table_id);
775                if is_in_stream {
776                    streaming_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
777                    to_stream_engine.push(req.clone());
778                }
779                if is_in_batch {
780                    batching_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
781                    return true;
782                }
783                if !is_in_batch && !is_in_stream {
784                    // TODO(discord9): also put to centralized logging for flow once it implemented
785                    warn!("Table {} is not any flow's source table", table_id)
786                }
787                false
788            });
789            // drop(src_table2flow);
790            // can't use drop due to https://github.com/rust-lang/rust/pull/128846
791        }
792
793        METRIC_FLOW_ROWS
794            .with_label_values(&["in-streaming"])
795            .inc_by(streaming_row_cnt as u64);
796
797        METRIC_FLOW_ROWS
798            .with_label_values(&["in-batching"])
799            .inc_by(batching_row_cnt as u64);
800
801        let streaming_engine = self.streaming_engine.clone();
802        let stream_handler: JoinHandle<Result<(), Error>> =
803            common_runtime::spawn_global(async move {
804                streaming_engine
805                    .handle_flow_inserts(api::v1::region::InsertRequests {
806                        requests: to_stream_engine,
807                    })
808                    .await?;
809                Ok(())
810            });
811        self.batching_engine
812            .handle_flow_inserts(api::v1::region::InsertRequests {
813                requests: to_batch_engine,
814            })
815            .await?;
816        stream_handler.await.context(JoinTaskSnafu)??;
817
818        Ok(())
819    }
820
821    async fn handle_mark_window_dirty(
822        &self,
823        req: api::v1::flow::DirtyWindowRequests,
824    ) -> Result<(), Error> {
825        self.batching_engine.handle_mark_window_dirty(req).await
826    }
827}
828
829#[async_trait::async_trait]
830impl common_meta::node_manager::Flownode for FlowDualEngine {
831    async fn handle(&self, request: FlowRequest) -> MetaResult<FlowResponse> {
832        let query_ctx = request
833            .header
834            .and_then(|h| h.query_context)
835            .map(|ctx| ctx.into());
836        match request.body {
837            Some(flow_request::Body::Create(CreateRequest {
838                flow_id: Some(task_id),
839                source_table_ids,
840                sink_table_name: Some(sink_table_name),
841                create_if_not_exists,
842                expire_after,
843                eval_interval,
844                comment,
845                sql,
846                mut flow_options,
847                or_replace,
848            })) => {
849                let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec();
850                let sink_table_name = [
851                    sink_table_name.catalog_name,
852                    sink_table_name.schema_name,
853                    sink_table_name.table_name,
854                ];
855                let expire_after = expire_after.map(|e| e.value);
856
857                let eval_schedule = decode_internal_eval_schedule(&mut flow_options)
858                    .map_err(to_meta_err(snafu::location!()))?;
859
860                let args = CreateFlowArgs {
861                    flow_id: task_id.id as u64,
862                    sink_table_name,
863                    source_table_ids,
864                    create_if_not_exists,
865                    or_replace,
866                    expire_after,
867                    eval_interval: eval_interval.map(|e| e.seconds),
868                    comment: Some(comment),
869                    sql: sql.clone(),
870                    flow_options,
871                    query_ctx,
872                    eval_schedule,
873                };
874                let ret = self
875                    .create_flow(args)
876                    .await
877                    .map_err(BoxedError::new)
878                    .with_context(|_| CreateFlowSnafu { sql: sql.clone() })
879                    .map_err(to_meta_err(snafu::location!()))?;
880                METRIC_FLOW_TASK_COUNT.inc();
881                Ok(FlowResponse {
882                    affected_flows: ret
883                        .map(|id| greptime_proto::v1::FlowId { id: id as u32 })
884                        .into_iter()
885                        .collect_vec(),
886                    ..Default::default()
887                })
888            }
889            Some(flow_request::Body::Drop(DropRequest {
890                flow_id: Some(flow_id),
891            })) => {
892                self.remove_flow(flow_id.id as u64)
893                    .await
894                    .map_err(to_meta_err(snafu::location!()))?;
895                METRIC_FLOW_TASK_COUNT.dec();
896                Ok(Default::default())
897            }
898            Some(flow_request::Body::Flush(FlushFlow {
899                flow_id: Some(flow_id),
900            })) => {
901                let row = self
902                    .flush_flow(flow_id.id as u64)
903                    .await
904                    .map_err(to_meta_err(snafu::location!()))?;
905                Ok(FlowResponse {
906                    affected_flows: vec![flow_id],
907                    affected_rows: row as u64,
908                    ..Default::default()
909                })
910            }
911            other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
912        }
913    }
914
915    async fn handle_inserts(&self, request: InsertRequests) -> MetaResult<FlowResponse> {
916        FlowEngine::handle_flow_inserts(self, request)
917            .await
918            .map(|_| Default::default())
919            .map_err(to_meta_err(snafu::location!()))
920    }
921
922    async fn handle_mark_window_dirty(&self, req: DirtyWindowRequests) -> MetaResult<FlowResponse> {
923        self.batching_engine()
924            .handle_mark_dirty_time_window(req)
925            .await
926            .map(|_| FlowResponse::default())
927            .map_err(to_meta_err(snafu::location!()))
928    }
929}
930
931/// Decode typed schedule config from the internal transient key emitted by metasrv.
932/// Malformed JSON is an internal error rather than a reason to silently fall back.
933fn decode_internal_eval_schedule(
934    flow_options: &mut HashMap<String, String>,
935) -> Result<Option<FlowScheduleConfig>, Error> {
936    match flow_options.remove(INTERNAL_EVAL_SCHEDULE_KEY) {
937        Some(json) => serde_json::from_str::<FlowScheduleConfig>(&json)
938            .map(Some)
939            .map_err(|err| {
940                InternalSnafu {
941                    reason: format!("Invalid internal eval schedule payload: {err}"),
942                }
943                .build()
944            }),
945        None => Ok(None),
946    }
947}
948
949/// return a function to convert `crate::error::Error` to `common_meta::error::Error`
950fn to_meta_err(
951    location: snafu::Location,
952) -> impl FnOnce(crate::error::Error) -> common_meta::error::Error {
953    move |err: crate::error::Error| -> common_meta::error::Error {
954        match err {
955            crate::error::Error::FlowNotFound { id, .. } => {
956                common_meta::error::Error::FlowNotFound {
957                    flow_name: format!("flow_id={id}"),
958                    location,
959                }
960            }
961            _ => common_meta::error::Error::External {
962                location,
963                source: BoxedError::new(err),
964            },
965        }
966    }
967}
968
969impl FlowEngine for StreamingEngine {
970    async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
971        self.create_flow_inner(args).await
972    }
973
974    async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
975        self.remove_flow_inner(flow_id).await
976    }
977
978    async fn flush_flow(&self, flow_id: FlowId) -> Result<usize, Error> {
979        self.flush_flow_inner(flow_id).await
980    }
981
982    async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error> {
983        self.flow_exist_inner(flow_id).await
984    }
985
986    async fn list_flows(&self) -> Result<impl IntoIterator<Item = FlowId>, Error> {
987        Ok(self
988            .flow_err_collectors
989            .read()
990            .await
991            .keys()
992            .cloned()
993            .collect::<Vec<_>>())
994    }
995
996    async fn handle_flow_inserts(
997        &self,
998        request: api::v1::region::InsertRequests,
999    ) -> Result<(), Error> {
1000        self.handle_inserts_inner(request).await
1001    }
1002
1003    async fn handle_mark_window_dirty(
1004        &self,
1005        _req: api::v1::flow::DirtyWindowRequests,
1006    ) -> Result<(), Error> {
1007        UnsupportedSnafu {
1008            reason: "handle_mark_window_dirty in streaming engine",
1009        }
1010        .fail()
1011    }
1012}
1013
1014/// Simple helper enum for fetching value from row with default value
1015#[derive(Debug, Clone)]
1016enum FetchFromRow {
1017    Idx(usize),
1018    Default(Value),
1019}
1020
1021impl FetchFromRow {
1022    /// Panic if idx is out of bound
1023    fn fetch(&self, row: &repr::Row) -> Value {
1024        match self {
1025            FetchFromRow::Idx(idx) => row.get(*idx).unwrap().clone(),
1026            FetchFromRow::Default(v) => v.clone(),
1027        }
1028    }
1029}
1030
1031impl StreamingEngine {
1032    async fn handle_inserts_inner(
1033        &self,
1034        request: InsertRequests,
1035    ) -> std::result::Result<(), Error> {
1036        // using try_read to ensure two things:
1037        // 1. flush wouldn't happen until inserts before it is inserted
1038        // 2. inserts happening concurrently with flush wouldn't be block by flush
1039        let _flush_lock = self.flush_lock.try_read();
1040        for write_request in request.requests {
1041            let region_id = write_request.region_id;
1042            let table_id = RegionId::from(region_id).table_id();
1043
1044            let (insert_schema, rows_proto) = write_request
1045                .rows
1046                .map(|r| (r.schema, r.rows))
1047                .unwrap_or_default();
1048
1049            // TODO(discord9): reconsider time assignment mechanism
1050            let now = self.tick_manager.tick();
1051
1052            let (table_types, fetch_order) = {
1053                let ctx = self.node_context.read().await;
1054
1055                // TODO(discord9): also check schema version so that altered table can be reported
1056                let table_schema = ctx.table_source.table_from_id(&table_id).await?;
1057                let default_vals = table_schema
1058                    .default_values
1059                    .iter()
1060                    .zip(table_schema.relation_desc.typ().column_types.iter())
1061                    .map(|(v, ty)| {
1062                        v.as_ref().and_then(|v| {
1063                            match v.create_default(ty.scalar_type(), ty.nullable()) {
1064                                Ok(v) => Some(v),
1065                                Err(err) => {
1066                                    common_telemetry::error!(err; "Failed to create default value");
1067                                    None
1068                                }
1069                            }
1070                        })
1071                    })
1072                    .collect_vec();
1073
1074                let table_types = table_schema
1075                    .relation_desc
1076                    .typ()
1077                    .column_types
1078                    .clone()
1079                    .into_iter()
1080                    .map(|t| t.scalar_type)
1081                    .collect_vec();
1082                let table_col_names = table_schema.relation_desc.names;
1083                let table_col_names = table_col_names
1084                    .iter().enumerate()
1085                    .map(|(idx,name)| match name {
1086                        Some(name) => Ok(name.clone()),
1087                        None => InternalSnafu {
1088                            reason: format!("Expect column {idx} of table id={table_id} to have name in table schema, found None"),
1089                        }
1090                        .fail(),
1091                    })
1092                    .collect::<Result<Vec<_>, _>>()?;
1093                let name_to_col = HashMap::<_, _>::from_iter(
1094                    insert_schema
1095                        .iter()
1096                        .enumerate()
1097                        .map(|(i, name)| (&name.column_name, i)),
1098                );
1099
1100                let fetch_order: Vec<FetchFromRow> = table_col_names
1101                    .iter()
1102                    .zip(default_vals)
1103                    .map(|(col_name, col_default_val)| {
1104                        name_to_col
1105                            .get(col_name)
1106                            .copied()
1107                            .map(FetchFromRow::Idx)
1108                            .or_else(|| col_default_val.clone().map(FetchFromRow::Default))
1109                            .with_context(|| UnexpectedSnafu {
1110                                reason: format!(
1111                                    "Column not found: {}, default_value: {:?}",
1112                                    col_name, col_default_val
1113                                ),
1114                            })
1115                    })
1116                    .try_collect()?;
1117
1118                trace!("Reordering columns: {:?}", fetch_order);
1119                (table_types, fetch_order)
1120            };
1121
1122            // TODO(discord9): use column instead of row
1123            let rows: Vec<DiffRow> = rows_proto
1124                .into_iter()
1125                .map(|r| {
1126                    let r = repr::Row::from(r);
1127                    let reordered = fetch_order.iter().map(|i| i.fetch(&r)).collect_vec();
1128                    repr::Row::new(reordered)
1129                })
1130                .map(|r| (r, now, 1))
1131                .collect_vec();
1132            if let Err(err) = self
1133                .handle_write_request(region_id.into(), rows, &table_types)
1134                .await
1135            {
1136                let err = BoxedError::new(err);
1137                let flow_ids = self
1138                    .node_context
1139                    .read()
1140                    .await
1141                    .get_flow_ids(table_id)
1142                    .into_iter()
1143                    .flatten()
1144                    .cloned()
1145                    .collect_vec();
1146                let err = InsertIntoFlowSnafu {
1147                    region_id,
1148                    flow_ids,
1149                }
1150                .into_error(err);
1151                common_telemetry::error!(err; "Failed to handle write request");
1152                return Err(err);
1153            }
1154        }
1155        Ok(())
1156    }
1157}
1158
1159#[cfg(test)]
1160mod tests {
1161    use std::collections::HashMap;
1162
1163    use common_meta::ddl::create_flow::INTERNAL_EVAL_SCHEDULE_KEY;
1164
1165    use super::decode_internal_eval_schedule;
1166    use crate::error::Error;
1167
1168    #[test]
1169    fn test_malformed_internal_eval_schedule_json_is_error() {
1170        let mut flow_options = HashMap::new();
1171        flow_options.insert(
1172            INTERNAL_EVAL_SCHEDULE_KEY.to_string(),
1173            "not-json".to_string(),
1174        );
1175
1176        let err = decode_internal_eval_schedule(&mut flow_options).unwrap_err();
1177        assert!(matches!(
1178            err,
1179            Error::Internal { reason, .. }
1180                if reason.contains("Invalid internal eval schedule payload")
1181        ));
1182        assert!(!flow_options.contains_key(INTERNAL_EVAL_SCHEDULE_KEY));
1183    }
1184}