flow/adapter/
flownode_impl.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! impl `FlowNode` trait for FlowNodeManager so standalone can call them
16use std::collections::{HashMap, HashSet};
17use std::sync::atomic::AtomicBool;
18use std::sync::Arc;
19
20use api::v1::flow::{
21    flow_request, CreateRequest, DirtyWindowRequests, DropRequest, FlowRequest, FlowResponse,
22    FlushFlow,
23};
24use api::v1::region::InsertRequests;
25use catalog::CatalogManager;
26use common_base::Plugins;
27use common_error::ext::BoxedError;
28use common_meta::ddl::create_flow::FlowType;
29use common_meta::error::Result as MetaResult;
30use common_meta::key::flow::FlowMetadataManager;
31use common_runtime::JoinHandle;
32use common_telemetry::{error, info, trace, warn};
33use datatypes::value::Value;
34use futures::TryStreamExt;
35use greptime_proto::v1::flow::DirtyWindowRequest;
36use itertools::Itertools;
37use session::context::QueryContextBuilder;
38use snafu::{ensure, IntoError, OptionExt, ResultExt};
39use store_api::storage::{RegionId, TableId};
40use tokio::sync::{Mutex, RwLock};
41
42use crate::adapter::{CreateFlowArgs, StreamingEngine};
43use crate::batching_mode::engine::BatchingEngine;
44use crate::batching_mode::{FRONTEND_SCAN_TIMEOUT, MIN_REFRESH_DURATION};
45use crate::engine::FlowEngine;
46use crate::error::{
47    CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, FlowNotRecoveredSnafu,
48    IllegalCheckTaskStateSnafu, InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu,
49    NoAvailableFrontendSnafu, SyncCheckTaskSnafu, UnexpectedSnafu,
50};
51use crate::metrics::{METRIC_FLOW_ROWS, METRIC_FLOW_TASK_COUNT};
52use crate::repr::{self, DiffRow};
53use crate::{Error, FlowId};
54
55/// Ref to [`FlowDualEngine`]
56pub type FlowDualEngineRef = Arc<FlowDualEngine>;
57
58/// Manage both streaming and batching mode engine
59///
60/// including create/drop/flush flow
61/// and redirect insert requests to the appropriate engine
62pub struct FlowDualEngine {
63    streaming_engine: Arc<StreamingEngine>,
64    batching_engine: Arc<BatchingEngine>,
65    /// helper struct for faster query flow by table id or vice versa
66    src_table2flow: RwLock<SrcTableToFlow>,
67    flow_metadata_manager: Arc<FlowMetadataManager>,
68    catalog_manager: Arc<dyn CatalogManager>,
69    check_task: tokio::sync::Mutex<Option<ConsistentCheckTask>>,
70    plugins: Plugins,
71    done_recovering: AtomicBool,
72}
73
74impl FlowDualEngine {
75    pub fn new(
76        streaming_engine: Arc<StreamingEngine>,
77        batching_engine: Arc<BatchingEngine>,
78        flow_metadata_manager: Arc<FlowMetadataManager>,
79        catalog_manager: Arc<dyn CatalogManager>,
80        plugins: Plugins,
81    ) -> Self {
82        Self {
83            streaming_engine,
84            batching_engine,
85            src_table2flow: RwLock::new(SrcTableToFlow::default()),
86            flow_metadata_manager,
87            catalog_manager,
88            check_task: Mutex::new(None),
89            plugins,
90            done_recovering: AtomicBool::new(false),
91        }
92    }
93
94    /// Set `done_recovering` to true
95    /// indicate that we are ready to handle requests
96    pub fn set_done_recovering(&self) {
97        info!("FlowDualEngine done recovering");
98        self.done_recovering
99            .store(true, std::sync::atomic::Ordering::Release);
100    }
101
102    /// Check if `done_recovering` is true
103    pub fn is_recover_done(&self) -> bool {
104        self.done_recovering
105            .load(std::sync::atomic::Ordering::Acquire)
106    }
107
108    /// wait for recovering to be done, this will only happen when flownode just started
109    async fn wait_for_all_flow_recover(&self, waiting_req_cnt: usize) -> Result<(), Error> {
110        if self.is_recover_done() {
111            return Ok(());
112        }
113
114        warn!(
115            "FlowDualEngine is not done recovering, {} insert request waiting for recovery",
116            waiting_req_cnt
117        );
118        // wait 3 seconds, check every 1 second
119        // TODO(discord9): make this configurable
120        let mut retry = 0;
121        let max_retry = 3;
122        while retry < max_retry && !self.is_recover_done() {
123            warn!(
124                "FlowDualEngine is not done recovering, retry {} in 1s",
125                retry
126            );
127            tokio::time::sleep(std::time::Duration::from_secs(1)).await;
128            retry += 1;
129        }
130        if retry == max_retry {
131            return FlowNotRecoveredSnafu.fail();
132        } else {
133            info!("FlowDualEngine is done recovering");
134        }
135        // TODO(discord9): also put to centralized logging for flow once it implemented
136        Ok(())
137    }
138
139    pub fn plugins(&self) -> &Plugins {
140        &self.plugins
141    }
142
143    /// Determine if the engine is in distributed mode
144    pub fn is_distributed(&self) -> bool {
145        self.streaming_engine.node_id.is_some()
146    }
147
148    pub fn streaming_engine(&self) -> Arc<StreamingEngine> {
149        self.streaming_engine.clone()
150    }
151
152    pub fn batching_engine(&self) -> Arc<BatchingEngine> {
153        self.batching_engine.clone()
154    }
155
156    /// In distributed mode, scan periodically(1s) until available frontend is found, or timeout,
157    /// in standalone mode, return immediately
158    /// notice here if any frontend appear in cluster info this function will return immediately
159    async fn wait_for_available_frontend(&self, timeout: std::time::Duration) -> Result<(), Error> {
160        if !self.is_distributed() {
161            return Ok(());
162        }
163        let frontend_client = self.batching_engine().frontend_client.clone();
164        let sleep_duration = std::time::Duration::from_millis(1_000);
165        let now = std::time::Instant::now();
166        loop {
167            let frontend_list = frontend_client.scan_for_frontend().await?;
168            if !frontend_list.is_empty() {
169                let fe_list = frontend_list
170                    .iter()
171                    .map(|(_, info)| &info.peer.addr)
172                    .collect::<Vec<_>>();
173                info!("Available frontend found: {:?}", fe_list);
174                return Ok(());
175            }
176            let elapsed = now.elapsed();
177            tokio::time::sleep(sleep_duration).await;
178            info!("Waiting for available frontend, elapsed={:?}", elapsed);
179            if elapsed >= timeout {
180                return NoAvailableFrontendSnafu {
181                    timeout,
182                    context: "No available frontend found in cluster info",
183                }
184                .fail();
185            }
186        }
187    }
188
189    /// Try to sync with check task, this is only used in drop flow&flush flow, so a flow id is required
190    ///
191    /// the need to sync is to make sure flush flow actually get called
192    async fn try_sync_with_check_task(
193        &self,
194        flow_id: FlowId,
195        allow_drop: bool,
196    ) -> Result<(), Error> {
197        // this function rarely get called so adding some log is helpful
198        info!("Try to sync with check task for flow {}", flow_id);
199        let mut retry = 0;
200        let max_retry = 10;
201        // keep trying to trigger consistent check
202        while retry < max_retry {
203            if let Some(task) = self.check_task.lock().await.as_ref() {
204                task.trigger(false, allow_drop).await?;
205                break;
206            }
207            retry += 1;
208            tokio::time::sleep(std::time::Duration::from_millis(500)).await;
209        }
210
211        if retry == max_retry {
212            error!(
213                "Can't sync with check task for flow {} with allow_drop={}",
214                flow_id, allow_drop
215            );
216            return SyncCheckTaskSnafu {
217                flow_id,
218                allow_drop,
219            }
220            .fail();
221        }
222        info!("Successfully sync with check task for flow {}", flow_id);
223
224        Ok(())
225    }
226
227    /// Spawn a task to consistently check if all flow tasks in metasrv is created on flownode,
228    /// so on startup, this will create all missing flow tasks, and constantly check at a interval
229    async fn check_flow_consistent(
230        &self,
231        allow_create: bool,
232        allow_drop: bool,
233    ) -> Result<(), Error> {
234        // use nodeid to determine if this is standalone/distributed mode, and retrieve all flows in this node(in distributed mode)/or all flows(in standalone mode)
235        let nodeid = self.streaming_engine.node_id;
236        let should_exists: Vec<_> = if let Some(nodeid) = nodeid {
237            // nodeid is available, so we only need to check flows on this node
238            // which also means we are in distributed mode
239            let to_be_recover = self
240                .flow_metadata_manager
241                .flownode_flow_manager()
242                .flows(nodeid.into())
243                .try_collect::<Vec<_>>()
244                .await
245                .context(ListFlowsSnafu {
246                    id: Some(nodeid.into()),
247                })?;
248            to_be_recover.into_iter().map(|(id, _)| id).collect()
249        } else {
250            // nodeid is not available, so we need to check all flows
251            // which also means we are in standalone mode
252            let all_catalogs = self
253                .catalog_manager
254                .catalog_names()
255                .await
256                .map_err(BoxedError::new)
257                .context(ExternalSnafu)?;
258            let mut all_flow_ids = vec![];
259            for catalog in all_catalogs {
260                let flows = self
261                    .flow_metadata_manager
262                    .flow_name_manager()
263                    .flow_names(&catalog)
264                    .await
265                    .try_collect::<Vec<_>>()
266                    .await
267                    .map_err(BoxedError::new)
268                    .context(ExternalSnafu)?;
269
270                all_flow_ids.extend(flows.into_iter().map(|(_, id)| id.flow_id()));
271            }
272            all_flow_ids
273        };
274        let should_exists = should_exists
275            .into_iter()
276            .map(|i| i as FlowId)
277            .collect::<HashSet<_>>();
278        let actual_exists = self.list_flows().await?.into_iter().collect::<HashSet<_>>();
279        let to_be_created = should_exists
280            .iter()
281            .filter(|id| !actual_exists.contains(id))
282            .collect::<Vec<_>>();
283        let to_be_dropped = actual_exists
284            .iter()
285            .filter(|id| !should_exists.contains(id))
286            .collect::<Vec<_>>();
287
288        if !to_be_created.is_empty() {
289            if allow_create {
290                info!(
291                    "Recovering {} flows: {:?}",
292                    to_be_created.len(),
293                    to_be_created
294                );
295                let mut errors = vec![];
296                for flow_id in to_be_created.clone() {
297                    let flow_id = *flow_id;
298                    let info = self
299                        .flow_metadata_manager
300                        .flow_info_manager()
301                        .get(flow_id as u32)
302                        .await
303                        .map_err(BoxedError::new)
304                        .context(ExternalSnafu)?
305                        .context(FlowNotFoundSnafu { id: flow_id })?;
306
307                    let sink_table_name = [
308                        info.sink_table_name().catalog_name.clone(),
309                        info.sink_table_name().schema_name.clone(),
310                        info.sink_table_name().table_name.clone(),
311                    ];
312                    let args = CreateFlowArgs {
313                        flow_id,
314                        sink_table_name,
315                        source_table_ids: info.source_table_ids().to_vec(),
316                        // because recover should only happen on restart the `create_if_not_exists` and `or_replace` can be arbitrary value(since flow doesn't exist)
317                        // but for the sake of consistency and to make sure recover of flow actually happen, we set both to true
318                        // (which is also fine since checks for not allow both to be true is on metasrv and we already pass that)
319                        create_if_not_exists: true,
320                        or_replace: true,
321                        expire_after: info.expire_after(),
322                        comment: Some(info.comment().clone()),
323                        sql: info.raw_sql().clone(),
324                        flow_options: info.options().clone(),
325                        query_ctx: info
326                            .query_context()
327                            .clone()
328                            .map(|ctx| {
329                                ctx.try_into()
330                                    .map_err(BoxedError::new)
331                                    .context(ExternalSnafu)
332                            })
333                            .transpose()?
334                            // or use default QueryContext with catalog_name from info
335                            // to keep compatibility with old version
336                            .or_else(|| {
337                                Some(
338                                    QueryContextBuilder::default()
339                                        .current_catalog(info.catalog_name().to_string())
340                                        .build(),
341                                )
342                            }),
343                    };
344                    if let Err(err) = self
345                        .create_flow(args)
346                        .await
347                        .map_err(BoxedError::new)
348                        .with_context(|_| CreateFlowSnafu {
349                            sql: info.raw_sql().clone(),
350                        })
351                    {
352                        errors.push((flow_id, err));
353                    }
354                }
355                if errors.is_empty() {
356                    info!("Recover flows successfully, flows: {:?}", to_be_created);
357                }
358
359                for (flow_id, err) in errors {
360                    warn!("Failed to recreate flow {}, err={:#?}", flow_id, err);
361                }
362            } else {
363                warn!(
364                    "Flows do not exist in flownode for node {:?}, flow_ids={:?}",
365                    nodeid, to_be_created
366                );
367            }
368        }
369        if !to_be_dropped.is_empty() {
370            if allow_drop {
371                info!("Dropping flows: {:?}", to_be_dropped);
372                let mut errors = vec![];
373                for flow_id in to_be_dropped {
374                    let flow_id = *flow_id;
375                    if let Err(err) = self.remove_flow(flow_id).await {
376                        errors.push((flow_id, err));
377                    }
378                }
379                for (flow_id, err) in errors {
380                    warn!("Failed to drop flow {}, err={:#?}", flow_id, err);
381                }
382            } else {
383                warn!(
384                    "Flows do not exist in metadata for node {:?}, flow_ids={:?}",
385                    nodeid, to_be_dropped
386                );
387            }
388        }
389        Ok(())
390    }
391
392    // TODO(discord9): consider sync this with heartbeat(might become necessary in the future)
393    pub async fn start_flow_consistent_check_task(self: &Arc<Self>) -> Result<(), Error> {
394        let mut check_task = self.check_task.lock().await;
395        ensure!(
396            check_task.is_none(),
397            IllegalCheckTaskStateSnafu {
398                reason: "Flow consistent check task already exists",
399            }
400        );
401        let task = ConsistentCheckTask::start_check_task(self).await?;
402        *check_task = Some(task);
403        Ok(())
404    }
405
406    pub async fn stop_flow_consistent_check_task(&self) -> Result<(), Error> {
407        info!("Stopping flow consistent check task");
408        let mut check_task = self.check_task.lock().await;
409
410        ensure!(
411            check_task.is_some(),
412            IllegalCheckTaskStateSnafu {
413                reason: "Flow consistent check task does not exist",
414            }
415        );
416
417        check_task.take().unwrap().stop().await?;
418        info!("Stopped flow consistent check task");
419        Ok(())
420    }
421
422    /// TODO(discord9): also add a `exists` api using flow metadata manager's `exists` method
423    async fn flow_exist_in_metadata(&self, flow_id: FlowId) -> Result<bool, Error> {
424        self.flow_metadata_manager
425            .flow_info_manager()
426            .get(flow_id as u32)
427            .await
428            .map_err(BoxedError::new)
429            .context(ExternalSnafu)
430            .map(|info| info.is_some())
431    }
432}
433
434struct ConsistentCheckTask {
435    handle: JoinHandle<()>,
436    shutdown_tx: tokio::sync::mpsc::Sender<()>,
437    trigger_tx: tokio::sync::mpsc::Sender<(bool, bool, tokio::sync::oneshot::Sender<()>)>,
438}
439
440impl ConsistentCheckTask {
441    async fn start_check_task(engine: &Arc<FlowDualEngine>) -> Result<Self, Error> {
442        let engine = engine.clone();
443        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
444        let (trigger_tx, mut trigger_rx) =
445            tokio::sync::mpsc::channel::<(bool, bool, tokio::sync::oneshot::Sender<()>)>(10);
446        let handle = common_runtime::spawn_global(async move {
447            // first check if available frontend is found
448            if let Err(err) = engine
449                .wait_for_available_frontend(FRONTEND_SCAN_TIMEOUT)
450                .await
451            {
452                warn!("No frontend is available yet:\n {err:?}");
453            }
454
455            // then do recover flows, if failed, always retry
456            let mut recover_retry = 0;
457            while let Err(err) = engine.check_flow_consistent(true, false).await {
458                recover_retry += 1;
459                error!(
460                    "Failed to recover flows:\n {err:?}, retry {} in {}s",
461                    recover_retry,
462                    MIN_REFRESH_DURATION.as_secs()
463                );
464                tokio::time::sleep(MIN_REFRESH_DURATION).await;
465            }
466
467            engine.set_done_recovering();
468
469            // then do check flows, with configurable allow_create and allow_drop
470            let (mut allow_create, mut allow_drop) = (false, false);
471            let mut ret_signal: Option<tokio::sync::oneshot::Sender<()>> = None;
472            loop {
473                if let Err(err) = engine.check_flow_consistent(allow_create, allow_drop).await {
474                    error!(err; "Failed to check flow consistent");
475                }
476                if let Some(done) = ret_signal.take() {
477                    let _ = done.send(());
478                }
479                tokio::select! {
480                    _ = rx.recv() => break,
481                    incoming = trigger_rx.recv() => if let Some(incoming) = incoming {
482                        (allow_create, allow_drop) = (incoming.0, incoming.1);
483                        ret_signal = Some(incoming.2);
484                    },
485                    _ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
486                        (allow_create, allow_drop) = (false, false);
487                    },
488                }
489            }
490        });
491        Ok(ConsistentCheckTask {
492            handle,
493            shutdown_tx: tx,
494            trigger_tx,
495        })
496    }
497
498    async fn trigger(&self, allow_create: bool, allow_drop: bool) -> Result<(), Error> {
499        let (tx, rx) = tokio::sync::oneshot::channel();
500        self.trigger_tx
501            .send((allow_create, allow_drop, tx))
502            .await
503            .map_err(|_| {
504                IllegalCheckTaskStateSnafu {
505                    reason: "Failed to send trigger signal",
506                }
507                .build()
508            })?;
509        rx.await.map_err(|_| {
510            IllegalCheckTaskStateSnafu {
511                reason: "Failed to receive trigger signal",
512            }
513            .build()
514        })?;
515        Ok(())
516    }
517
518    async fn stop(self) -> Result<(), Error> {
519        self.shutdown_tx.send(()).await.map_err(|_| {
520            IllegalCheckTaskStateSnafu {
521                reason: "Failed to send shutdown signal",
522            }
523            .build()
524        })?;
525        // abort so no need to wait
526        self.handle.abort();
527        Ok(())
528    }
529}
530
531#[derive(Default)]
532struct SrcTableToFlow {
533    /// mapping of table ids to flow ids for streaming mode
534    stream: HashMap<TableId, HashSet<FlowId>>,
535    /// mapping of table ids to flow ids for batching mode
536    batch: HashMap<TableId, HashSet<FlowId>>,
537    /// mapping of flow ids to (flow type, source table ids)
538    flow_infos: HashMap<FlowId, (FlowType, Vec<TableId>)>,
539}
540
541impl SrcTableToFlow {
542    fn in_stream(&self, table_id: TableId) -> bool {
543        self.stream.contains_key(&table_id)
544    }
545    fn in_batch(&self, table_id: TableId) -> bool {
546        self.batch.contains_key(&table_id)
547    }
548    fn add_flow(&mut self, flow_id: FlowId, flow_type: FlowType, src_table_ids: Vec<TableId>) {
549        let mapping = match flow_type {
550            FlowType::Streaming => &mut self.stream,
551            FlowType::Batching => &mut self.batch,
552        };
553
554        for src_table in src_table_ids.clone() {
555            mapping
556                .entry(src_table)
557                .and_modify(|flows| {
558                    flows.insert(flow_id);
559                })
560                .or_insert_with(|| {
561                    let mut set = HashSet::new();
562                    set.insert(flow_id);
563                    set
564                });
565        }
566        self.flow_infos.insert(flow_id, (flow_type, src_table_ids));
567    }
568
569    fn remove_flow(&mut self, flow_id: FlowId) {
570        let mapping = match self.get_flow_type(flow_id) {
571            Some(FlowType::Streaming) => &mut self.stream,
572            Some(FlowType::Batching) => &mut self.batch,
573            None => return,
574        };
575        if let Some((_, src_table_ids)) = self.flow_infos.remove(&flow_id) {
576            for src_table in src_table_ids {
577                if let Some(flows) = mapping.get_mut(&src_table) {
578                    flows.remove(&flow_id);
579                }
580            }
581        }
582    }
583
584    fn get_flow_type(&self, flow_id: FlowId) -> Option<FlowType> {
585        self.flow_infos
586            .get(&flow_id)
587            .map(|(flow_type, _)| flow_type)
588            .cloned()
589    }
590}
591
592impl FlowEngine for FlowDualEngine {
593    async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
594        let flow_type = args
595            .flow_options
596            .get(FlowType::FLOW_TYPE_KEY)
597            .map(|s| s.as_str());
598
599        let flow_type = match flow_type {
600            Some(FlowType::BATCHING) => FlowType::Batching,
601            Some(FlowType::STREAMING) => FlowType::Streaming,
602            None => FlowType::Batching,
603            Some(flow_type) => {
604                return InternalSnafu {
605                    reason: format!("Invalid flow type: {}", flow_type),
606                }
607                .fail()
608            }
609        };
610
611        let flow_id = args.flow_id;
612        let src_table_ids = args.source_table_ids.clone();
613
614        let res = match flow_type {
615            FlowType::Batching => self.batching_engine.create_flow(args).await,
616            FlowType::Streaming => self.streaming_engine.create_flow(args).await,
617        }?;
618
619        self.src_table2flow
620            .write()
621            .await
622            .add_flow(flow_id, flow_type, src_table_ids);
623
624        Ok(res)
625    }
626
627    async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
628        let flow_type = self.src_table2flow.read().await.get_flow_type(flow_id);
629
630        match flow_type {
631            Some(FlowType::Batching) => self.batching_engine.remove_flow(flow_id).await,
632            Some(FlowType::Streaming) => self.streaming_engine.remove_flow(flow_id).await,
633            None => {
634                // this can happen if flownode just restart, and is stilling creating the flow
635                // since now that this flow should dropped, we need to trigger the consistent check and allow drop
636                // this rely on drop flow ddl delete metadata first, see src/common/meta/src/ddl/drop_flow.rs
637                warn!(
638                    "Flow {} is not exist in the underlying engine, but exist in metadata",
639                    flow_id
640                );
641                self.try_sync_with_check_task(flow_id, true).await?;
642
643                Ok(())
644            }
645        }?;
646        // remove mapping
647        self.src_table2flow.write().await.remove_flow(flow_id);
648        Ok(())
649    }
650
651    async fn flush_flow(&self, flow_id: FlowId) -> Result<usize, Error> {
652        // sync with check task
653        self.try_sync_with_check_task(flow_id, false).await?;
654        let flow_type = self.src_table2flow.read().await.get_flow_type(flow_id);
655        match flow_type {
656            Some(FlowType::Batching) => self.batching_engine.flush_flow(flow_id).await,
657            Some(FlowType::Streaming) => self.streaming_engine.flush_flow(flow_id).await,
658            None => {
659                warn!(
660                    "Currently flow={flow_id} doesn't exist in flownode, ignore flush_flow request"
661                );
662                Ok(0)
663            }
664        }
665    }
666
667    async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error> {
668        let flow_type = self.src_table2flow.read().await.get_flow_type(flow_id);
669        // not using `flow_type.is_some()` to make sure the flow is actually exist in the underlying engine
670        match flow_type {
671            Some(FlowType::Batching) => self.batching_engine.flow_exist(flow_id).await,
672            Some(FlowType::Streaming) => self.streaming_engine.flow_exist(flow_id).await,
673            None => Ok(false),
674        }
675    }
676
677    async fn list_flows(&self) -> Result<impl IntoIterator<Item = FlowId>, Error> {
678        let stream_flows = self.streaming_engine.list_flows().await?;
679        let batch_flows = self.batching_engine.list_flows().await?;
680
681        Ok(stream_flows.into_iter().chain(batch_flows))
682    }
683
684    async fn handle_flow_inserts(
685        &self,
686        request: api::v1::region::InsertRequests,
687    ) -> Result<(), Error> {
688        self.wait_for_all_flow_recover(request.requests.len())
689            .await?;
690        // TODO(discord9): make as little clone as possible
691        let mut to_stream_engine = Vec::with_capacity(request.requests.len());
692        let mut to_batch_engine = request.requests;
693
694        let mut batching_row_cnt = 0;
695        let mut streaming_row_cnt = 0;
696
697        {
698            // not locking this, or recover flows will be starved when also handling flow inserts
699            let src_table2flow = self.src_table2flow.read().await;
700            to_batch_engine.retain(|req| {
701                let region_id = RegionId::from(req.region_id);
702                let table_id = region_id.table_id();
703                let is_in_stream = src_table2flow.in_stream(table_id);
704                let is_in_batch = src_table2flow.in_batch(table_id);
705                if is_in_stream {
706                    streaming_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
707                    to_stream_engine.push(req.clone());
708                }
709                if is_in_batch {
710                    batching_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
711                    return true;
712                }
713                if !is_in_batch && !is_in_stream {
714                    // TODO(discord9): also put to centralized logging for flow once it implemented
715                    warn!("Table {} is not any flow's source table", table_id)
716                }
717                false
718            });
719            // drop(src_table2flow);
720            // can't use drop due to https://github.com/rust-lang/rust/pull/128846
721        }
722
723        METRIC_FLOW_ROWS
724            .with_label_values(&["in-streaming"])
725            .inc_by(streaming_row_cnt as u64);
726
727        METRIC_FLOW_ROWS
728            .with_label_values(&["in-batching"])
729            .inc_by(batching_row_cnt as u64);
730
731        let streaming_engine = self.streaming_engine.clone();
732        let stream_handler: JoinHandle<Result<(), Error>> =
733            common_runtime::spawn_global(async move {
734                streaming_engine
735                    .handle_flow_inserts(api::v1::region::InsertRequests {
736                        requests: to_stream_engine,
737                    })
738                    .await?;
739                Ok(())
740            });
741        self.batching_engine
742            .handle_flow_inserts(api::v1::region::InsertRequests {
743                requests: to_batch_engine,
744            })
745            .await?;
746        stream_handler.await.context(JoinTaskSnafu)??;
747
748        Ok(())
749    }
750}
751
752#[async_trait::async_trait]
753impl common_meta::node_manager::Flownode for FlowDualEngine {
754    async fn handle(&self, request: FlowRequest) -> MetaResult<FlowResponse> {
755        let query_ctx = request
756            .header
757            .and_then(|h| h.query_context)
758            .map(|ctx| ctx.into());
759        match request.body {
760            Some(flow_request::Body::Create(CreateRequest {
761                flow_id: Some(task_id),
762                source_table_ids,
763                sink_table_name: Some(sink_table_name),
764                create_if_not_exists,
765                expire_after,
766                comment,
767                sql,
768                flow_options,
769                or_replace,
770            })) => {
771                let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec();
772                let sink_table_name = [
773                    sink_table_name.catalog_name,
774                    sink_table_name.schema_name,
775                    sink_table_name.table_name,
776                ];
777                let expire_after = expire_after.map(|e| e.value);
778                let args = CreateFlowArgs {
779                    flow_id: task_id.id as u64,
780                    sink_table_name,
781                    source_table_ids,
782                    create_if_not_exists,
783                    or_replace,
784                    expire_after,
785                    comment: Some(comment),
786                    sql: sql.clone(),
787                    flow_options,
788                    query_ctx,
789                };
790                let ret = self
791                    .create_flow(args)
792                    .await
793                    .map_err(BoxedError::new)
794                    .with_context(|_| CreateFlowSnafu { sql: sql.clone() })
795                    .map_err(to_meta_err(snafu::location!()))?;
796                METRIC_FLOW_TASK_COUNT.inc();
797                Ok(FlowResponse {
798                    affected_flows: ret
799                        .map(|id| greptime_proto::v1::FlowId { id: id as u32 })
800                        .into_iter()
801                        .collect_vec(),
802                    ..Default::default()
803                })
804            }
805            Some(flow_request::Body::Drop(DropRequest {
806                flow_id: Some(flow_id),
807            })) => {
808                self.remove_flow(flow_id.id as u64)
809                    .await
810                    .map_err(to_meta_err(snafu::location!()))?;
811                METRIC_FLOW_TASK_COUNT.dec();
812                Ok(Default::default())
813            }
814            Some(flow_request::Body::Flush(FlushFlow {
815                flow_id: Some(flow_id),
816            })) => {
817                let row = self
818                    .flush_flow(flow_id.id as u64)
819                    .await
820                    .map_err(to_meta_err(snafu::location!()))?;
821                Ok(FlowResponse {
822                    affected_flows: vec![flow_id],
823                    affected_rows: row as u64,
824                    ..Default::default()
825                })
826            }
827            other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
828        }
829    }
830
831    async fn handle_inserts(&self, request: InsertRequests) -> MetaResult<FlowResponse> {
832        FlowEngine::handle_flow_inserts(self, request)
833            .await
834            .map(|_| Default::default())
835            .map_err(to_meta_err(snafu::location!()))
836    }
837
838    async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> MetaResult<FlowResponse> {
839        self.batching_engine()
840            .handle_mark_dirty_time_window(DirtyWindowRequests {
841                requests: vec![req],
842            })
843            .await
844            .map_err(to_meta_err(snafu::location!()))
845    }
846}
847
848/// return a function to convert `crate::error::Error` to `common_meta::error::Error`
849fn to_meta_err(
850    location: snafu::Location,
851) -> impl FnOnce(crate::error::Error) -> common_meta::error::Error {
852    move |err: crate::error::Error| -> common_meta::error::Error {
853        match err {
854            crate::error::Error::FlowNotFound { id, .. } => {
855                common_meta::error::Error::FlowNotFound {
856                    flow_name: format!("flow_id={id}"),
857                    location,
858                }
859            }
860            _ => common_meta::error::Error::External {
861                location,
862                source: BoxedError::new(err),
863            },
864        }
865    }
866}
867
868impl FlowEngine for StreamingEngine {
869    async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
870        self.create_flow_inner(args).await
871    }
872
873    async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
874        self.remove_flow_inner(flow_id).await
875    }
876
877    async fn flush_flow(&self, flow_id: FlowId) -> Result<usize, Error> {
878        self.flush_flow_inner(flow_id).await
879    }
880
881    async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error> {
882        self.flow_exist_inner(flow_id).await
883    }
884
885    async fn list_flows(&self) -> Result<impl IntoIterator<Item = FlowId>, Error> {
886        Ok(self
887            .flow_err_collectors
888            .read()
889            .await
890            .keys()
891            .cloned()
892            .collect::<Vec<_>>())
893    }
894
895    async fn handle_flow_inserts(
896        &self,
897        request: api::v1::region::InsertRequests,
898    ) -> Result<(), Error> {
899        self.handle_inserts_inner(request).await
900    }
901}
902
903/// Simple helper enum for fetching value from row with default value
904#[derive(Debug, Clone)]
905enum FetchFromRow {
906    Idx(usize),
907    Default(Value),
908}
909
910impl FetchFromRow {
911    /// Panic if idx is out of bound
912    fn fetch(&self, row: &repr::Row) -> Value {
913        match self {
914            FetchFromRow::Idx(idx) => row.get(*idx).unwrap().clone(),
915            FetchFromRow::Default(v) => v.clone(),
916        }
917    }
918}
919
920impl StreamingEngine {
921    async fn handle_inserts_inner(
922        &self,
923        request: InsertRequests,
924    ) -> std::result::Result<(), Error> {
925        // using try_read to ensure two things:
926        // 1. flush wouldn't happen until inserts before it is inserted
927        // 2. inserts happening concurrently with flush wouldn't be block by flush
928        let _flush_lock = self.flush_lock.try_read();
929        for write_request in request.requests {
930            let region_id = write_request.region_id;
931            let table_id = RegionId::from(region_id).table_id();
932
933            let (insert_schema, rows_proto) = write_request
934                .rows
935                .map(|r| (r.schema, r.rows))
936                .unwrap_or_default();
937
938            // TODO(discord9): reconsider time assignment mechanism
939            let now = self.tick_manager.tick();
940
941            let (table_types, fetch_order) = {
942                let ctx = self.node_context.read().await;
943
944                // TODO(discord9): also check schema version so that altered table can be reported
945                let table_schema = ctx.table_source.table_from_id(&table_id).await?;
946                let default_vals = table_schema
947                    .default_values
948                    .iter()
949                    .zip(table_schema.relation_desc.typ().column_types.iter())
950                    .map(|(v, ty)| {
951                        v.as_ref().and_then(|v| {
952                            match v.create_default(ty.scalar_type(), ty.nullable()) {
953                                Ok(v) => Some(v),
954                                Err(err) => {
955                                    common_telemetry::error!(err; "Failed to create default value");
956                                    None
957                                }
958                            }
959                        })
960                    })
961                    .collect_vec();
962
963                let table_types = table_schema
964                    .relation_desc
965                    .typ()
966                    .column_types
967                    .clone()
968                    .into_iter()
969                    .map(|t| t.scalar_type)
970                    .collect_vec();
971                let table_col_names = table_schema.relation_desc.names;
972                let table_col_names = table_col_names
973                    .iter().enumerate()
974                    .map(|(idx,name)| match name {
975                        Some(name) => Ok(name.clone()),
976                        None => InternalSnafu {
977                            reason: format!("Expect column {idx} of table id={table_id} to have name in table schema, found None"),
978                        }
979                        .fail(),
980                    })
981                    .collect::<Result<Vec<_>, _>>()?;
982                let name_to_col = HashMap::<_, _>::from_iter(
983                    insert_schema
984                        .iter()
985                        .enumerate()
986                        .map(|(i, name)| (&name.column_name, i)),
987                );
988
989                let fetch_order: Vec<FetchFromRow> = table_col_names
990                    .iter()
991                    .zip(default_vals.into_iter())
992                    .map(|(col_name, col_default_val)| {
993                        name_to_col
994                            .get(col_name)
995                            .copied()
996                            .map(FetchFromRow::Idx)
997                            .or_else(|| col_default_val.clone().map(FetchFromRow::Default))
998                            .with_context(|| UnexpectedSnafu {
999                                reason: format!(
1000                                    "Column not found: {}, default_value: {:?}",
1001                                    col_name, col_default_val
1002                                ),
1003                            })
1004                    })
1005                    .try_collect()?;
1006
1007                trace!("Reordering columns: {:?}", fetch_order);
1008                (table_types, fetch_order)
1009            };
1010
1011            // TODO(discord9): use column instead of row
1012            let rows: Vec<DiffRow> = rows_proto
1013                .into_iter()
1014                .map(|r| {
1015                    let r = repr::Row::from(r);
1016                    let reordered = fetch_order.iter().map(|i| i.fetch(&r)).collect_vec();
1017                    repr::Row::new(reordered)
1018                })
1019                .map(|r| (r, now, 1))
1020                .collect_vec();
1021            if let Err(err) = self
1022                .handle_write_request(region_id.into(), rows, &table_types)
1023                .await
1024            {
1025                let err = BoxedError::new(err);
1026                let flow_ids = self
1027                    .node_context
1028                    .read()
1029                    .await
1030                    .get_flow_ids(table_id)
1031                    .into_iter()
1032                    .flatten()
1033                    .cloned()
1034                    .collect_vec();
1035                let err = InsertIntoFlowSnafu {
1036                    region_id,
1037                    flow_ids,
1038                }
1039                .into_error(err);
1040                common_telemetry::error!(err; "Failed to handle write request");
1041                return Err(err);
1042            }
1043        }
1044        Ok(())
1045    }
1046}