flow/adapter/
flownode_impl.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! impl `FlowNode` trait for FlowNodeManager so standalone can call them
16use std::collections::{HashMap, HashSet};
17use std::sync::atomic::AtomicBool;
18use std::sync::Arc;
19
20use api::v1::flow::{
21    flow_request, CreateRequest, DirtyWindowRequests, DropRequest, FlowRequest, FlowResponse,
22    FlushFlow,
23};
24use api::v1::region::InsertRequests;
25use catalog::CatalogManager;
26use common_base::Plugins;
27use common_error::ext::BoxedError;
28use common_meta::ddl::create_flow::FlowType;
29use common_meta::error::Result as MetaResult;
30use common_meta::key::flow::FlowMetadataManager;
31use common_runtime::JoinHandle;
32use common_telemetry::{error, info, trace, warn};
33use datatypes::value::Value;
34use futures::TryStreamExt;
35use greptime_proto::v1::flow::DirtyWindowRequest;
36use itertools::Itertools;
37use session::context::QueryContextBuilder;
38use snafu::{ensure, IntoError, OptionExt, ResultExt};
39use store_api::storage::{RegionId, TableId};
40use tokio::sync::{Mutex, RwLock};
41
42use crate::adapter::{CreateFlowArgs, StreamingEngine};
43use crate::batching_mode::engine::BatchingEngine;
44use crate::engine::FlowEngine;
45use crate::error::{
46    CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, FlowNotRecoveredSnafu,
47    IllegalCheckTaskStateSnafu, InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu,
48    NoAvailableFrontendSnafu, SyncCheckTaskSnafu, UnexpectedSnafu,
49};
50use crate::metrics::{METRIC_FLOW_ROWS, METRIC_FLOW_TASK_COUNT};
51use crate::repr::{self, DiffRow};
52use crate::{Error, FlowId};
53
54/// Ref to [`FlowDualEngine`]
55pub type FlowDualEngineRef = Arc<FlowDualEngine>;
56
57/// Manage both streaming and batching mode engine
58///
59/// including create/drop/flush flow
60/// and redirect insert requests to the appropriate engine
61pub struct FlowDualEngine {
62    streaming_engine: Arc<StreamingEngine>,
63    batching_engine: Arc<BatchingEngine>,
64    /// helper struct for faster query flow by table id or vice versa
65    src_table2flow: RwLock<SrcTableToFlow>,
66    flow_metadata_manager: Arc<FlowMetadataManager>,
67    catalog_manager: Arc<dyn CatalogManager>,
68    check_task: tokio::sync::Mutex<Option<ConsistentCheckTask>>,
69    plugins: Plugins,
70    done_recovering: AtomicBool,
71}
72
73impl FlowDualEngine {
74    pub fn new(
75        streaming_engine: Arc<StreamingEngine>,
76        batching_engine: Arc<BatchingEngine>,
77        flow_metadata_manager: Arc<FlowMetadataManager>,
78        catalog_manager: Arc<dyn CatalogManager>,
79        plugins: Plugins,
80    ) -> Self {
81        Self {
82            streaming_engine,
83            batching_engine,
84            src_table2flow: RwLock::new(SrcTableToFlow::default()),
85            flow_metadata_manager,
86            catalog_manager,
87            check_task: Mutex::new(None),
88            plugins,
89            done_recovering: AtomicBool::new(false),
90        }
91    }
92
93    /// Set `done_recovering` to true
94    /// indicate that we are ready to handle requests
95    pub fn set_done_recovering(&self) {
96        info!("FlowDualEngine done recovering");
97        self.done_recovering
98            .store(true, std::sync::atomic::Ordering::Release);
99    }
100
101    /// Check if `done_recovering` is true
102    pub fn is_recover_done(&self) -> bool {
103        self.done_recovering
104            .load(std::sync::atomic::Ordering::Acquire)
105    }
106
107    /// wait for recovering to be done, this will only happen when flownode just started
108    async fn wait_for_all_flow_recover(&self, waiting_req_cnt: usize) -> Result<(), Error> {
109        if self.is_recover_done() {
110            return Ok(());
111        }
112
113        warn!(
114            "FlowDualEngine is not done recovering, {} insert request waiting for recovery",
115            waiting_req_cnt
116        );
117        // wait 3 seconds, check every 1 second
118        // TODO(discord9): make this configurable
119        let mut retry = 0;
120        let max_retry = 3;
121        while retry < max_retry && !self.is_recover_done() {
122            warn!(
123                "FlowDualEngine is not done recovering, retry {} in 1s",
124                retry
125            );
126            tokio::time::sleep(std::time::Duration::from_secs(1)).await;
127            retry += 1;
128        }
129        if retry == max_retry {
130            return FlowNotRecoveredSnafu.fail();
131        } else {
132            info!("FlowDualEngine is done recovering");
133        }
134        // TODO(discord9): also put to centralized logging for flow once it implemented
135        Ok(())
136    }
137
138    pub fn plugins(&self) -> &Plugins {
139        &self.plugins
140    }
141
142    /// Determine if the engine is in distributed mode
143    pub fn is_distributed(&self) -> bool {
144        self.streaming_engine.node_id.is_some()
145    }
146
147    pub fn streaming_engine(&self) -> Arc<StreamingEngine> {
148        self.streaming_engine.clone()
149    }
150
151    pub fn batching_engine(&self) -> Arc<BatchingEngine> {
152        self.batching_engine.clone()
153    }
154
155    /// In distributed mode, scan periodically(1s) until available frontend is found, or timeout,
156    /// in standalone mode, return immediately
157    /// notice here if any frontend appear in cluster info this function will return immediately
158    async fn wait_for_available_frontend(&self, timeout: std::time::Duration) -> Result<(), Error> {
159        if !self.is_distributed() {
160            return Ok(());
161        }
162        let frontend_client = self.batching_engine().frontend_client.clone();
163        let sleep_duration = std::time::Duration::from_millis(1_000);
164        let now = std::time::Instant::now();
165        loop {
166            let frontend_list = frontend_client.scan_for_frontend().await?;
167            if !frontend_list.is_empty() {
168                let fe_list = frontend_list
169                    .iter()
170                    .map(|(_, info)| &info.peer.addr)
171                    .collect::<Vec<_>>();
172                info!("Available frontend found: {:?}", fe_list);
173                return Ok(());
174            }
175            let elapsed = now.elapsed();
176            tokio::time::sleep(sleep_duration).await;
177            info!("Waiting for available frontend, elapsed={:?}", elapsed);
178            if elapsed >= timeout {
179                return NoAvailableFrontendSnafu {
180                    timeout,
181                    context: "No available frontend found in cluster info",
182                }
183                .fail();
184            }
185        }
186    }
187
188    /// Try to sync with check task, this is only used in drop flow&flush flow, so a flow id is required
189    ///
190    /// the need to sync is to make sure flush flow actually get called
191    async fn try_sync_with_check_task(
192        &self,
193        flow_id: FlowId,
194        allow_drop: bool,
195    ) -> Result<(), Error> {
196        // this function rarely get called so adding some log is helpful
197        info!("Try to sync with check task for flow {}", flow_id);
198        let mut retry = 0;
199        let max_retry = 10;
200        // keep trying to trigger consistent check
201        while retry < max_retry {
202            if let Some(task) = self.check_task.lock().await.as_ref() {
203                task.trigger(false, allow_drop).await?;
204                break;
205            }
206            retry += 1;
207            tokio::time::sleep(std::time::Duration::from_millis(500)).await;
208        }
209
210        if retry == max_retry {
211            error!(
212                "Can't sync with check task for flow {} with allow_drop={}",
213                flow_id, allow_drop
214            );
215            return SyncCheckTaskSnafu {
216                flow_id,
217                allow_drop,
218            }
219            .fail();
220        }
221        info!("Successfully sync with check task for flow {}", flow_id);
222
223        Ok(())
224    }
225
226    /// Spawn a task to consistently check if all flow tasks in metasrv is created on flownode,
227    /// so on startup, this will create all missing flow tasks, and constantly check at a interval
228    async fn check_flow_consistent(
229        &self,
230        allow_create: bool,
231        allow_drop: bool,
232    ) -> Result<(), Error> {
233        // use nodeid to determine if this is standalone/distributed mode, and retrieve all flows in this node(in distributed mode)/or all flows(in standalone mode)
234        let nodeid = self.streaming_engine.node_id;
235        let should_exists: Vec<_> = if let Some(nodeid) = nodeid {
236            // nodeid is available, so we only need to check flows on this node
237            // which also means we are in distributed mode
238            let to_be_recover = self
239                .flow_metadata_manager
240                .flownode_flow_manager()
241                .flows(nodeid.into())
242                .try_collect::<Vec<_>>()
243                .await
244                .context(ListFlowsSnafu {
245                    id: Some(nodeid.into()),
246                })?;
247            to_be_recover.into_iter().map(|(id, _)| id).collect()
248        } else {
249            // nodeid is not available, so we need to check all flows
250            // which also means we are in standalone mode
251            let all_catalogs = self
252                .catalog_manager
253                .catalog_names()
254                .await
255                .map_err(BoxedError::new)
256                .context(ExternalSnafu)?;
257            let mut all_flow_ids = vec![];
258            for catalog in all_catalogs {
259                let flows = self
260                    .flow_metadata_manager
261                    .flow_name_manager()
262                    .flow_names(&catalog)
263                    .await
264                    .try_collect::<Vec<_>>()
265                    .await
266                    .map_err(BoxedError::new)
267                    .context(ExternalSnafu)?;
268
269                all_flow_ids.extend(flows.into_iter().map(|(_, id)| id.flow_id()));
270            }
271            all_flow_ids
272        };
273        let should_exists = should_exists
274            .into_iter()
275            .map(|i| i as FlowId)
276            .collect::<HashSet<_>>();
277        let actual_exists = self.list_flows().await?.into_iter().collect::<HashSet<_>>();
278        let to_be_created = should_exists
279            .iter()
280            .filter(|id| !actual_exists.contains(id))
281            .collect::<Vec<_>>();
282        let to_be_dropped = actual_exists
283            .iter()
284            .filter(|id| !should_exists.contains(id))
285            .collect::<Vec<_>>();
286
287        if !to_be_created.is_empty() {
288            if allow_create {
289                info!(
290                    "Recovering {} flows: {:?}",
291                    to_be_created.len(),
292                    to_be_created
293                );
294                let mut errors = vec![];
295                for flow_id in to_be_created.clone() {
296                    let flow_id = *flow_id;
297                    let info = self
298                        .flow_metadata_manager
299                        .flow_info_manager()
300                        .get(flow_id as u32)
301                        .await
302                        .map_err(BoxedError::new)
303                        .context(ExternalSnafu)?
304                        .context(FlowNotFoundSnafu { id: flow_id })?;
305
306                    let sink_table_name = [
307                        info.sink_table_name().catalog_name.clone(),
308                        info.sink_table_name().schema_name.clone(),
309                        info.sink_table_name().table_name.clone(),
310                    ];
311                    let args = CreateFlowArgs {
312                        flow_id,
313                        sink_table_name,
314                        source_table_ids: info.source_table_ids().to_vec(),
315                        // because recover should only happen on restart the `create_if_not_exists` and `or_replace` can be arbitrary value(since flow doesn't exist)
316                        // but for the sake of consistency and to make sure recover of flow actually happen, we set both to true
317                        // (which is also fine since checks for not allow both to be true is on metasrv and we already pass that)
318                        create_if_not_exists: true,
319                        or_replace: true,
320                        expire_after: info.expire_after(),
321                        comment: Some(info.comment().clone()),
322                        sql: info.raw_sql().clone(),
323                        flow_options: info.options().clone(),
324                        query_ctx: info
325                            .query_context()
326                            .clone()
327                            .map(|ctx| {
328                                ctx.try_into()
329                                    .map_err(BoxedError::new)
330                                    .context(ExternalSnafu)
331                            })
332                            .transpose()?
333                            // or use default QueryContext with catalog_name from info
334                            // to keep compatibility with old version
335                            .or_else(|| {
336                                Some(
337                                    QueryContextBuilder::default()
338                                        .current_catalog(info.catalog_name().to_string())
339                                        .build(),
340                                )
341                            }),
342                    };
343                    if let Err(err) = self
344                        .create_flow(args)
345                        .await
346                        .map_err(BoxedError::new)
347                        .with_context(|_| CreateFlowSnafu {
348                            sql: info.raw_sql().clone(),
349                        })
350                    {
351                        errors.push((flow_id, err));
352                    }
353                }
354                if errors.is_empty() {
355                    info!("Recover flows successfully, flows: {:?}", to_be_created);
356                }
357
358                for (flow_id, err) in errors {
359                    warn!("Failed to recreate flow {}, err={:#?}", flow_id, err);
360                }
361            } else {
362                warn!(
363                    "Flows do not exist in flownode for node {:?}, flow_ids={:?}",
364                    nodeid, to_be_created
365                );
366            }
367        }
368        if !to_be_dropped.is_empty() {
369            if allow_drop {
370                info!("Dropping flows: {:?}", to_be_dropped);
371                let mut errors = vec![];
372                for flow_id in to_be_dropped {
373                    let flow_id = *flow_id;
374                    if let Err(err) = self.remove_flow(flow_id).await {
375                        errors.push((flow_id, err));
376                    }
377                }
378                for (flow_id, err) in errors {
379                    warn!("Failed to drop flow {}, err={:#?}", flow_id, err);
380                }
381            } else {
382                warn!(
383                    "Flows do not exist in metadata for node {:?}, flow_ids={:?}",
384                    nodeid, to_be_dropped
385                );
386            }
387        }
388        Ok(())
389    }
390
391    // TODO(discord9): consider sync this with heartbeat(might become necessary in the future)
392    pub async fn start_flow_consistent_check_task(self: &Arc<Self>) -> Result<(), Error> {
393        let mut check_task = self.check_task.lock().await;
394        ensure!(
395            check_task.is_none(),
396            IllegalCheckTaskStateSnafu {
397                reason: "Flow consistent check task already exists",
398            }
399        );
400        let task = ConsistentCheckTask::start_check_task(self).await?;
401        *check_task = Some(task);
402        Ok(())
403    }
404
405    pub async fn stop_flow_consistent_check_task(&self) -> Result<(), Error> {
406        info!("Stopping flow consistent check task");
407        let mut check_task = self.check_task.lock().await;
408
409        ensure!(
410            check_task.is_some(),
411            IllegalCheckTaskStateSnafu {
412                reason: "Flow consistent check task does not exist",
413            }
414        );
415
416        check_task.take().unwrap().stop().await?;
417        info!("Stopped flow consistent check task");
418        Ok(())
419    }
420
421    /// TODO(discord9): also add a `exists` api using flow metadata manager's `exists` method
422    async fn flow_exist_in_metadata(&self, flow_id: FlowId) -> Result<bool, Error> {
423        self.flow_metadata_manager
424            .flow_info_manager()
425            .get(flow_id as u32)
426            .await
427            .map_err(BoxedError::new)
428            .context(ExternalSnafu)
429            .map(|info| info.is_some())
430    }
431}
432
433struct ConsistentCheckTask {
434    handle: JoinHandle<()>,
435    shutdown_tx: tokio::sync::mpsc::Sender<()>,
436    trigger_tx: tokio::sync::mpsc::Sender<(bool, bool, tokio::sync::oneshot::Sender<()>)>,
437}
438
439impl ConsistentCheckTask {
440    async fn start_check_task(engine: &Arc<FlowDualEngine>) -> Result<Self, Error> {
441        let engine = engine.clone();
442        let min_refresh_duration = engine
443            .batching_engine()
444            .batch_opts
445            .experimental_min_refresh_duration;
446        let frontend_scan_timeout = engine
447            .batching_engine()
448            .batch_opts
449            .experimental_frontend_scan_timeout;
450        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
451        let (trigger_tx, mut trigger_rx) =
452            tokio::sync::mpsc::channel::<(bool, bool, tokio::sync::oneshot::Sender<()>)>(10);
453        let handle = common_runtime::spawn_global(async move {
454            // first check if available frontend is found
455            if let Err(err) = engine
456                .wait_for_available_frontend(frontend_scan_timeout)
457                .await
458            {
459                warn!("No frontend is available yet:\n {err:?}");
460            }
461
462            // then do recover flows, if failed, always retry
463            let mut recover_retry = 0;
464            while let Err(err) = engine.check_flow_consistent(true, false).await {
465                recover_retry += 1;
466                error!(
467                    "Failed to recover flows:\n {err:?}, retry {} in {}s",
468                    recover_retry,
469                    min_refresh_duration.as_secs()
470                );
471                tokio::time::sleep(min_refresh_duration).await;
472            }
473
474            engine.set_done_recovering();
475
476            // then do check flows, with configurable allow_create and allow_drop
477            let (mut allow_create, mut allow_drop) = (false, false);
478            let mut ret_signal: Option<tokio::sync::oneshot::Sender<()>> = None;
479            loop {
480                if let Err(err) = engine.check_flow_consistent(allow_create, allow_drop).await {
481                    error!(err; "Failed to check flow consistent");
482                }
483                if let Some(done) = ret_signal.take() {
484                    let _ = done.send(());
485                }
486                tokio::select! {
487                    _ = rx.recv() => break,
488                    incoming = trigger_rx.recv() => if let Some(incoming) = incoming {
489                        (allow_create, allow_drop) = (incoming.0, incoming.1);
490                        ret_signal = Some(incoming.2);
491                    },
492                    _ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
493                        (allow_create, allow_drop) = (false, false);
494                    },
495                }
496            }
497        });
498        Ok(ConsistentCheckTask {
499            handle,
500            shutdown_tx: tx,
501            trigger_tx,
502        })
503    }
504
505    async fn trigger(&self, allow_create: bool, allow_drop: bool) -> Result<(), Error> {
506        let (tx, rx) = tokio::sync::oneshot::channel();
507        self.trigger_tx
508            .send((allow_create, allow_drop, tx))
509            .await
510            .map_err(|_| {
511                IllegalCheckTaskStateSnafu {
512                    reason: "Failed to send trigger signal",
513                }
514                .build()
515            })?;
516        rx.await.map_err(|_| {
517            IllegalCheckTaskStateSnafu {
518                reason: "Failed to receive trigger signal",
519            }
520            .build()
521        })?;
522        Ok(())
523    }
524
525    async fn stop(self) -> Result<(), Error> {
526        self.shutdown_tx.send(()).await.map_err(|_| {
527            IllegalCheckTaskStateSnafu {
528                reason: "Failed to send shutdown signal",
529            }
530            .build()
531        })?;
532        // abort so no need to wait
533        self.handle.abort();
534        Ok(())
535    }
536}
537
538#[derive(Default)]
539struct SrcTableToFlow {
540    /// mapping of table ids to flow ids for streaming mode
541    stream: HashMap<TableId, HashSet<FlowId>>,
542    /// mapping of table ids to flow ids for batching mode
543    batch: HashMap<TableId, HashSet<FlowId>>,
544    /// mapping of flow ids to (flow type, source table ids)
545    flow_infos: HashMap<FlowId, (FlowType, Vec<TableId>)>,
546}
547
548impl SrcTableToFlow {
549    fn in_stream(&self, table_id: TableId) -> bool {
550        self.stream.contains_key(&table_id)
551    }
552    fn in_batch(&self, table_id: TableId) -> bool {
553        self.batch.contains_key(&table_id)
554    }
555    fn add_flow(&mut self, flow_id: FlowId, flow_type: FlowType, src_table_ids: Vec<TableId>) {
556        let mapping = match flow_type {
557            FlowType::Streaming => &mut self.stream,
558            FlowType::Batching => &mut self.batch,
559        };
560
561        for src_table in src_table_ids.clone() {
562            mapping
563                .entry(src_table)
564                .and_modify(|flows| {
565                    flows.insert(flow_id);
566                })
567                .or_insert_with(|| {
568                    let mut set = HashSet::new();
569                    set.insert(flow_id);
570                    set
571                });
572        }
573        self.flow_infos.insert(flow_id, (flow_type, src_table_ids));
574    }
575
576    fn remove_flow(&mut self, flow_id: FlowId) {
577        let mapping = match self.get_flow_type(flow_id) {
578            Some(FlowType::Streaming) => &mut self.stream,
579            Some(FlowType::Batching) => &mut self.batch,
580            None => return,
581        };
582        if let Some((_, src_table_ids)) = self.flow_infos.remove(&flow_id) {
583            for src_table in src_table_ids {
584                if let Some(flows) = mapping.get_mut(&src_table) {
585                    flows.remove(&flow_id);
586                }
587            }
588        }
589    }
590
591    fn get_flow_type(&self, flow_id: FlowId) -> Option<FlowType> {
592        self.flow_infos
593            .get(&flow_id)
594            .map(|(flow_type, _)| flow_type)
595            .cloned()
596    }
597}
598
599impl FlowEngine for FlowDualEngine {
600    async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
601        let flow_type = args
602            .flow_options
603            .get(FlowType::FLOW_TYPE_KEY)
604            .map(|s| s.as_str());
605
606        let flow_type = match flow_type {
607            Some(FlowType::BATCHING) => FlowType::Batching,
608            Some(FlowType::STREAMING) => FlowType::Streaming,
609            None => FlowType::Batching,
610            Some(flow_type) => {
611                return InternalSnafu {
612                    reason: format!("Invalid flow type: {}", flow_type),
613                }
614                .fail()
615            }
616        };
617
618        let flow_id = args.flow_id;
619        let src_table_ids = args.source_table_ids.clone();
620
621        let res = match flow_type {
622            FlowType::Batching => self.batching_engine.create_flow(args).await,
623            FlowType::Streaming => self.streaming_engine.create_flow(args).await,
624        }?;
625
626        self.src_table2flow
627            .write()
628            .await
629            .add_flow(flow_id, flow_type, src_table_ids);
630
631        Ok(res)
632    }
633
634    async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
635        let flow_type = self.src_table2flow.read().await.get_flow_type(flow_id);
636
637        match flow_type {
638            Some(FlowType::Batching) => self.batching_engine.remove_flow(flow_id).await,
639            Some(FlowType::Streaming) => self.streaming_engine.remove_flow(flow_id).await,
640            None => {
641                // this can happen if flownode just restart, and is stilling creating the flow
642                // since now that this flow should dropped, we need to trigger the consistent check and allow drop
643                // this rely on drop flow ddl delete metadata first, see src/common/meta/src/ddl/drop_flow.rs
644                warn!(
645                    "Flow {} is not exist in the underlying engine, but exist in metadata",
646                    flow_id
647                );
648                self.try_sync_with_check_task(flow_id, true).await?;
649
650                Ok(())
651            }
652        }?;
653        // remove mapping
654        self.src_table2flow.write().await.remove_flow(flow_id);
655        Ok(())
656    }
657
658    async fn flush_flow(&self, flow_id: FlowId) -> Result<usize, Error> {
659        // sync with check task
660        self.try_sync_with_check_task(flow_id, false).await?;
661        let flow_type = self.src_table2flow.read().await.get_flow_type(flow_id);
662        match flow_type {
663            Some(FlowType::Batching) => self.batching_engine.flush_flow(flow_id).await,
664            Some(FlowType::Streaming) => self.streaming_engine.flush_flow(flow_id).await,
665            None => {
666                warn!(
667                    "Currently flow={flow_id} doesn't exist in flownode, ignore flush_flow request"
668                );
669                Ok(0)
670            }
671        }
672    }
673
674    async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error> {
675        let flow_type = self.src_table2flow.read().await.get_flow_type(flow_id);
676        // not using `flow_type.is_some()` to make sure the flow is actually exist in the underlying engine
677        match flow_type {
678            Some(FlowType::Batching) => self.batching_engine.flow_exist(flow_id).await,
679            Some(FlowType::Streaming) => self.streaming_engine.flow_exist(flow_id).await,
680            None => Ok(false),
681        }
682    }
683
684    async fn list_flows(&self) -> Result<impl IntoIterator<Item = FlowId>, Error> {
685        let stream_flows = self.streaming_engine.list_flows().await?;
686        let batch_flows = self.batching_engine.list_flows().await?;
687
688        Ok(stream_flows.into_iter().chain(batch_flows))
689    }
690
691    async fn handle_flow_inserts(
692        &self,
693        request: api::v1::region::InsertRequests,
694    ) -> Result<(), Error> {
695        self.wait_for_all_flow_recover(request.requests.len())
696            .await?;
697        // TODO(discord9): make as little clone as possible
698        let mut to_stream_engine = Vec::with_capacity(request.requests.len());
699        let mut to_batch_engine = request.requests;
700
701        let mut batching_row_cnt = 0;
702        let mut streaming_row_cnt = 0;
703
704        {
705            // not locking this, or recover flows will be starved when also handling flow inserts
706            let src_table2flow = self.src_table2flow.read().await;
707            to_batch_engine.retain(|req| {
708                let region_id = RegionId::from(req.region_id);
709                let table_id = region_id.table_id();
710                let is_in_stream = src_table2flow.in_stream(table_id);
711                let is_in_batch = src_table2flow.in_batch(table_id);
712                if is_in_stream {
713                    streaming_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
714                    to_stream_engine.push(req.clone());
715                }
716                if is_in_batch {
717                    batching_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
718                    return true;
719                }
720                if !is_in_batch && !is_in_stream {
721                    // TODO(discord9): also put to centralized logging for flow once it implemented
722                    warn!("Table {} is not any flow's source table", table_id)
723                }
724                false
725            });
726            // drop(src_table2flow);
727            // can't use drop due to https://github.com/rust-lang/rust/pull/128846
728        }
729
730        METRIC_FLOW_ROWS
731            .with_label_values(&["in-streaming"])
732            .inc_by(streaming_row_cnt as u64);
733
734        METRIC_FLOW_ROWS
735            .with_label_values(&["in-batching"])
736            .inc_by(batching_row_cnt as u64);
737
738        let streaming_engine = self.streaming_engine.clone();
739        let stream_handler: JoinHandle<Result<(), Error>> =
740            common_runtime::spawn_global(async move {
741                streaming_engine
742                    .handle_flow_inserts(api::v1::region::InsertRequests {
743                        requests: to_stream_engine,
744                    })
745                    .await?;
746                Ok(())
747            });
748        self.batching_engine
749            .handle_flow_inserts(api::v1::region::InsertRequests {
750                requests: to_batch_engine,
751            })
752            .await?;
753        stream_handler.await.context(JoinTaskSnafu)??;
754
755        Ok(())
756    }
757}
758
759#[async_trait::async_trait]
760impl common_meta::node_manager::Flownode for FlowDualEngine {
761    async fn handle(&self, request: FlowRequest) -> MetaResult<FlowResponse> {
762        let query_ctx = request
763            .header
764            .and_then(|h| h.query_context)
765            .map(|ctx| ctx.into());
766        match request.body {
767            Some(flow_request::Body::Create(CreateRequest {
768                flow_id: Some(task_id),
769                source_table_ids,
770                sink_table_name: Some(sink_table_name),
771                create_if_not_exists,
772                expire_after,
773                comment,
774                sql,
775                flow_options,
776                or_replace,
777            })) => {
778                let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec();
779                let sink_table_name = [
780                    sink_table_name.catalog_name,
781                    sink_table_name.schema_name,
782                    sink_table_name.table_name,
783                ];
784                let expire_after = expire_after.map(|e| e.value);
785                let args = CreateFlowArgs {
786                    flow_id: task_id.id as u64,
787                    sink_table_name,
788                    source_table_ids,
789                    create_if_not_exists,
790                    or_replace,
791                    expire_after,
792                    comment: Some(comment),
793                    sql: sql.clone(),
794                    flow_options,
795                    query_ctx,
796                };
797                let ret = self
798                    .create_flow(args)
799                    .await
800                    .map_err(BoxedError::new)
801                    .with_context(|_| CreateFlowSnafu { sql: sql.clone() })
802                    .map_err(to_meta_err(snafu::location!()))?;
803                METRIC_FLOW_TASK_COUNT.inc();
804                Ok(FlowResponse {
805                    affected_flows: ret
806                        .map(|id| greptime_proto::v1::FlowId { id: id as u32 })
807                        .into_iter()
808                        .collect_vec(),
809                    ..Default::default()
810                })
811            }
812            Some(flow_request::Body::Drop(DropRequest {
813                flow_id: Some(flow_id),
814            })) => {
815                self.remove_flow(flow_id.id as u64)
816                    .await
817                    .map_err(to_meta_err(snafu::location!()))?;
818                METRIC_FLOW_TASK_COUNT.dec();
819                Ok(Default::default())
820            }
821            Some(flow_request::Body::Flush(FlushFlow {
822                flow_id: Some(flow_id),
823            })) => {
824                let row = self
825                    .flush_flow(flow_id.id as u64)
826                    .await
827                    .map_err(to_meta_err(snafu::location!()))?;
828                Ok(FlowResponse {
829                    affected_flows: vec![flow_id],
830                    affected_rows: row as u64,
831                    ..Default::default()
832                })
833            }
834            other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
835        }
836    }
837
838    async fn handle_inserts(&self, request: InsertRequests) -> MetaResult<FlowResponse> {
839        FlowEngine::handle_flow_inserts(self, request)
840            .await
841            .map(|_| Default::default())
842            .map_err(to_meta_err(snafu::location!()))
843    }
844
845    async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> MetaResult<FlowResponse> {
846        self.batching_engine()
847            .handle_mark_dirty_time_window(DirtyWindowRequests {
848                requests: vec![req],
849            })
850            .await
851            .map_err(to_meta_err(snafu::location!()))
852    }
853}
854
855/// return a function to convert `crate::error::Error` to `common_meta::error::Error`
856fn to_meta_err(
857    location: snafu::Location,
858) -> impl FnOnce(crate::error::Error) -> common_meta::error::Error {
859    move |err: crate::error::Error| -> common_meta::error::Error {
860        match err {
861            crate::error::Error::FlowNotFound { id, .. } => {
862                common_meta::error::Error::FlowNotFound {
863                    flow_name: format!("flow_id={id}"),
864                    location,
865                }
866            }
867            _ => common_meta::error::Error::External {
868                location,
869                source: BoxedError::new(err),
870            },
871        }
872    }
873}
874
875impl FlowEngine for StreamingEngine {
876    async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
877        self.create_flow_inner(args).await
878    }
879
880    async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
881        self.remove_flow_inner(flow_id).await
882    }
883
884    async fn flush_flow(&self, flow_id: FlowId) -> Result<usize, Error> {
885        self.flush_flow_inner(flow_id).await
886    }
887
888    async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error> {
889        self.flow_exist_inner(flow_id).await
890    }
891
892    async fn list_flows(&self) -> Result<impl IntoIterator<Item = FlowId>, Error> {
893        Ok(self
894            .flow_err_collectors
895            .read()
896            .await
897            .keys()
898            .cloned()
899            .collect::<Vec<_>>())
900    }
901
902    async fn handle_flow_inserts(
903        &self,
904        request: api::v1::region::InsertRequests,
905    ) -> Result<(), Error> {
906        self.handle_inserts_inner(request).await
907    }
908}
909
910/// Simple helper enum for fetching value from row with default value
911#[derive(Debug, Clone)]
912enum FetchFromRow {
913    Idx(usize),
914    Default(Value),
915}
916
917impl FetchFromRow {
918    /// Panic if idx is out of bound
919    fn fetch(&self, row: &repr::Row) -> Value {
920        match self {
921            FetchFromRow::Idx(idx) => row.get(*idx).unwrap().clone(),
922            FetchFromRow::Default(v) => v.clone(),
923        }
924    }
925}
926
927impl StreamingEngine {
928    async fn handle_inserts_inner(
929        &self,
930        request: InsertRequests,
931    ) -> std::result::Result<(), Error> {
932        // using try_read to ensure two things:
933        // 1. flush wouldn't happen until inserts before it is inserted
934        // 2. inserts happening concurrently with flush wouldn't be block by flush
935        let _flush_lock = self.flush_lock.try_read();
936        for write_request in request.requests {
937            let region_id = write_request.region_id;
938            let table_id = RegionId::from(region_id).table_id();
939
940            let (insert_schema, rows_proto) = write_request
941                .rows
942                .map(|r| (r.schema, r.rows))
943                .unwrap_or_default();
944
945            // TODO(discord9): reconsider time assignment mechanism
946            let now = self.tick_manager.tick();
947
948            let (table_types, fetch_order) = {
949                let ctx = self.node_context.read().await;
950
951                // TODO(discord9): also check schema version so that altered table can be reported
952                let table_schema = ctx.table_source.table_from_id(&table_id).await?;
953                let default_vals = table_schema
954                    .default_values
955                    .iter()
956                    .zip(table_schema.relation_desc.typ().column_types.iter())
957                    .map(|(v, ty)| {
958                        v.as_ref().and_then(|v| {
959                            match v.create_default(ty.scalar_type(), ty.nullable()) {
960                                Ok(v) => Some(v),
961                                Err(err) => {
962                                    common_telemetry::error!(err; "Failed to create default value");
963                                    None
964                                }
965                            }
966                        })
967                    })
968                    .collect_vec();
969
970                let table_types = table_schema
971                    .relation_desc
972                    .typ()
973                    .column_types
974                    .clone()
975                    .into_iter()
976                    .map(|t| t.scalar_type)
977                    .collect_vec();
978                let table_col_names = table_schema.relation_desc.names;
979                let table_col_names = table_col_names
980                    .iter().enumerate()
981                    .map(|(idx,name)| match name {
982                        Some(name) => Ok(name.clone()),
983                        None => InternalSnafu {
984                            reason: format!("Expect column {idx} of table id={table_id} to have name in table schema, found None"),
985                        }
986                        .fail(),
987                    })
988                    .collect::<Result<Vec<_>, _>>()?;
989                let name_to_col = HashMap::<_, _>::from_iter(
990                    insert_schema
991                        .iter()
992                        .enumerate()
993                        .map(|(i, name)| (&name.column_name, i)),
994                );
995
996                let fetch_order: Vec<FetchFromRow> = table_col_names
997                    .iter()
998                    .zip(default_vals.into_iter())
999                    .map(|(col_name, col_default_val)| {
1000                        name_to_col
1001                            .get(col_name)
1002                            .copied()
1003                            .map(FetchFromRow::Idx)
1004                            .or_else(|| col_default_val.clone().map(FetchFromRow::Default))
1005                            .with_context(|| UnexpectedSnafu {
1006                                reason: format!(
1007                                    "Column not found: {}, default_value: {:?}",
1008                                    col_name, col_default_val
1009                                ),
1010                            })
1011                    })
1012                    .try_collect()?;
1013
1014                trace!("Reordering columns: {:?}", fetch_order);
1015                (table_types, fetch_order)
1016            };
1017
1018            // TODO(discord9): use column instead of row
1019            let rows: Vec<DiffRow> = rows_proto
1020                .into_iter()
1021                .map(|r| {
1022                    let r = repr::Row::from(r);
1023                    let reordered = fetch_order.iter().map(|i| i.fetch(&r)).collect_vec();
1024                    repr::Row::new(reordered)
1025                })
1026                .map(|r| (r, now, 1))
1027                .collect_vec();
1028            if let Err(err) = self
1029                .handle_write_request(region_id.into(), rows, &table_types)
1030                .await
1031            {
1032                let err = BoxedError::new(err);
1033                let flow_ids = self
1034                    .node_context
1035                    .read()
1036                    .await
1037                    .get_flow_ids(table_id)
1038                    .into_iter()
1039                    .flatten()
1040                    .cloned()
1041                    .collect_vec();
1042                let err = InsertIntoFlowSnafu {
1043                    region_id,
1044                    flow_ids,
1045                }
1046                .into_error(err);
1047                common_telemetry::error!(err; "Failed to handle write request");
1048                return Err(err);
1049            }
1050        }
1051        Ok(())
1052    }
1053}