flow/adapter/
flownode_impl.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! impl `FlowNode` trait for FlowNodeManager so standalone can call them
16use std::collections::{HashMap, HashSet};
17use std::sync::atomic::AtomicBool;
18use std::sync::Arc;
19
20use api::v1::flow::{
21    flow_request, CreateRequest, DropRequest, FlowRequest, FlowResponse, FlushFlow,
22};
23use api::v1::region::InsertRequests;
24use catalog::CatalogManager;
25use common_base::Plugins;
26use common_error::ext::BoxedError;
27use common_meta::ddl::create_flow::FlowType;
28use common_meta::error::Result as MetaResult;
29use common_meta::key::flow::FlowMetadataManager;
30use common_runtime::JoinHandle;
31use common_telemetry::{error, info, trace, warn};
32use datatypes::value::Value;
33use futures::TryStreamExt;
34use itertools::Itertools;
35use session::context::QueryContextBuilder;
36use snafu::{ensure, IntoError, OptionExt, ResultExt};
37use store_api::storage::{RegionId, TableId};
38use tokio::sync::{Mutex, RwLock};
39
40use crate::adapter::{CreateFlowArgs, StreamingEngine};
41use crate::batching_mode::engine::BatchingEngine;
42use crate::batching_mode::{FRONTEND_SCAN_TIMEOUT, MIN_REFRESH_DURATION};
43use crate::engine::FlowEngine;
44use crate::error::{
45    CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, FlowNotRecoveredSnafu,
46    IllegalCheckTaskStateSnafu, InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu,
47    NoAvailableFrontendSnafu, SyncCheckTaskSnafu, UnexpectedSnafu,
48};
49use crate::metrics::METRIC_FLOW_TASK_COUNT;
50use crate::repr::{self, DiffRow};
51use crate::{Error, FlowId};
52
53/// Ref to [`FlowDualEngine`]
54pub type FlowDualEngineRef = Arc<FlowDualEngine>;
55
56/// Manage both streaming and batching mode engine
57///
58/// including create/drop/flush flow
59/// and redirect insert requests to the appropriate engine
60pub struct FlowDualEngine {
61    streaming_engine: Arc<StreamingEngine>,
62    batching_engine: Arc<BatchingEngine>,
63    /// helper struct for faster query flow by table id or vice versa
64    src_table2flow: RwLock<SrcTableToFlow>,
65    flow_metadata_manager: Arc<FlowMetadataManager>,
66    catalog_manager: Arc<dyn CatalogManager>,
67    check_task: tokio::sync::Mutex<Option<ConsistentCheckTask>>,
68    plugins: Plugins,
69    done_recovering: AtomicBool,
70}
71
72impl FlowDualEngine {
73    pub fn new(
74        streaming_engine: Arc<StreamingEngine>,
75        batching_engine: Arc<BatchingEngine>,
76        flow_metadata_manager: Arc<FlowMetadataManager>,
77        catalog_manager: Arc<dyn CatalogManager>,
78        plugins: Plugins,
79    ) -> Self {
80        Self {
81            streaming_engine,
82            batching_engine,
83            src_table2flow: RwLock::new(SrcTableToFlow::default()),
84            flow_metadata_manager,
85            catalog_manager,
86            check_task: Mutex::new(None),
87            plugins,
88            done_recovering: AtomicBool::new(false),
89        }
90    }
91
92    /// Set `done_recovering` to true
93    /// indicate that we are ready to handle requests
94    pub fn set_done_recovering(&self) {
95        info!("FlowDualEngine done recovering");
96        self.done_recovering
97            .store(true, std::sync::atomic::Ordering::Release);
98    }
99
100    /// Check if `done_recovering` is true
101    pub fn is_recover_done(&self) -> bool {
102        self.done_recovering
103            .load(std::sync::atomic::Ordering::Acquire)
104    }
105
106    /// wait for recovering to be done, this will only happen when flownode just started
107    async fn wait_for_all_flow_recover(&self, waiting_req_cnt: usize) -> Result<(), Error> {
108        if self.is_recover_done() {
109            return Ok(());
110        }
111
112        warn!(
113            "FlowDualEngine is not done recovering, {} insert request waiting for recovery",
114            waiting_req_cnt
115        );
116        // wait 3 seconds, check every 1 second
117        // TODO(discord9): make this configurable
118        let mut retry = 0;
119        let max_retry = 3;
120        while retry < max_retry && !self.is_recover_done() {
121            warn!(
122                "FlowDualEngine is not done recovering, retry {} in 1s",
123                retry
124            );
125            tokio::time::sleep(std::time::Duration::from_secs(1)).await;
126            retry += 1;
127        }
128        if retry == max_retry {
129            return FlowNotRecoveredSnafu.fail();
130        } else {
131            info!("FlowDualEngine is done recovering");
132        }
133        // TODO(discord9): also put to centralized logging for flow once it implemented
134        Ok(())
135    }
136
137    pub fn plugins(&self) -> &Plugins {
138        &self.plugins
139    }
140
141    /// Determine if the engine is in distributed mode
142    pub fn is_distributed(&self) -> bool {
143        self.streaming_engine.node_id.is_some()
144    }
145
146    pub fn streaming_engine(&self) -> Arc<StreamingEngine> {
147        self.streaming_engine.clone()
148    }
149
150    pub fn batching_engine(&self) -> Arc<BatchingEngine> {
151        self.batching_engine.clone()
152    }
153
154    /// In distributed mode, scan periodically(1s) until available frontend is found, or timeout,
155    /// in standalone mode, return immediately
156    /// notice here if any frontend appear in cluster info this function will return immediately
157    async fn wait_for_available_frontend(&self, timeout: std::time::Duration) -> Result<(), Error> {
158        if !self.is_distributed() {
159            return Ok(());
160        }
161        let frontend_client = self.batching_engine().frontend_client.clone();
162        let sleep_duration = std::time::Duration::from_millis(1_000);
163        let now = std::time::Instant::now();
164        loop {
165            let frontend_list = frontend_client.scan_for_frontend().await?;
166            if !frontend_list.is_empty() {
167                let fe_list = frontend_list
168                    .iter()
169                    .map(|(_, info)| &info.peer.addr)
170                    .collect::<Vec<_>>();
171                info!("Available frontend found: {:?}", fe_list);
172                return Ok(());
173            }
174            let elapsed = now.elapsed();
175            tokio::time::sleep(sleep_duration).await;
176            info!("Waiting for available frontend, elapsed={:?}", elapsed);
177            if elapsed >= timeout {
178                return NoAvailableFrontendSnafu {
179                    timeout,
180                    context: "No available frontend found in cluster info",
181                }
182                .fail();
183            }
184        }
185    }
186
187    /// Try to sync with check task, this is only used in drop flow&flush flow, so a flow id is required
188    ///
189    /// the need to sync is to make sure flush flow actually get called
190    async fn try_sync_with_check_task(
191        &self,
192        flow_id: FlowId,
193        allow_drop: bool,
194    ) -> Result<(), Error> {
195        // this function rarely get called so adding some log is helpful
196        info!("Try to sync with check task for flow {}", flow_id);
197        let mut retry = 0;
198        let max_retry = 10;
199        // keep trying to trigger consistent check
200        while retry < max_retry {
201            if let Some(task) = self.check_task.lock().await.as_ref() {
202                task.trigger(false, allow_drop).await?;
203                break;
204            }
205            retry += 1;
206            tokio::time::sleep(std::time::Duration::from_millis(500)).await;
207        }
208
209        if retry == max_retry {
210            error!(
211                "Can't sync with check task for flow {} with allow_drop={}",
212                flow_id, allow_drop
213            );
214            return SyncCheckTaskSnafu {
215                flow_id,
216                allow_drop,
217            }
218            .fail();
219        }
220        info!("Successfully sync with check task for flow {}", flow_id);
221
222        Ok(())
223    }
224
225    /// Spawn a task to consistently check if all flow tasks in metasrv is created on flownode,
226    /// so on startup, this will create all missing flow tasks, and constantly check at a interval
227    async fn check_flow_consistent(
228        &self,
229        allow_create: bool,
230        allow_drop: bool,
231    ) -> Result<(), Error> {
232        // use nodeid to determine if this is standalone/distributed mode, and retrieve all flows in this node(in distributed mode)/or all flows(in standalone mode)
233        let nodeid = self.streaming_engine.node_id;
234        let should_exists: Vec<_> = if let Some(nodeid) = nodeid {
235            // nodeid is available, so we only need to check flows on this node
236            // which also means we are in distributed mode
237            let to_be_recover = self
238                .flow_metadata_manager
239                .flownode_flow_manager()
240                .flows(nodeid.into())
241                .try_collect::<Vec<_>>()
242                .await
243                .context(ListFlowsSnafu {
244                    id: Some(nodeid.into()),
245                })?;
246            to_be_recover.into_iter().map(|(id, _)| id).collect()
247        } else {
248            // nodeid is not available, so we need to check all flows
249            // which also means we are in standalone mode
250            let all_catalogs = self
251                .catalog_manager
252                .catalog_names()
253                .await
254                .map_err(BoxedError::new)
255                .context(ExternalSnafu)?;
256            let mut all_flow_ids = vec![];
257            for catalog in all_catalogs {
258                let flows = self
259                    .flow_metadata_manager
260                    .flow_name_manager()
261                    .flow_names(&catalog)
262                    .await
263                    .try_collect::<Vec<_>>()
264                    .await
265                    .map_err(BoxedError::new)
266                    .context(ExternalSnafu)?;
267
268                all_flow_ids.extend(flows.into_iter().map(|(_, id)| id.flow_id()));
269            }
270            all_flow_ids
271        };
272        let should_exists = should_exists
273            .into_iter()
274            .map(|i| i as FlowId)
275            .collect::<HashSet<_>>();
276        let actual_exists = self.list_flows().await?.into_iter().collect::<HashSet<_>>();
277        let to_be_created = should_exists
278            .iter()
279            .filter(|id| !actual_exists.contains(id))
280            .collect::<Vec<_>>();
281        let to_be_dropped = actual_exists
282            .iter()
283            .filter(|id| !should_exists.contains(id))
284            .collect::<Vec<_>>();
285
286        if !to_be_created.is_empty() {
287            if allow_create {
288                info!(
289                    "Recovering {} flows: {:?}",
290                    to_be_created.len(),
291                    to_be_created
292                );
293                let mut errors = vec![];
294                for flow_id in to_be_created.clone() {
295                    let flow_id = *flow_id;
296                    let info = self
297                        .flow_metadata_manager
298                        .flow_info_manager()
299                        .get(flow_id as u32)
300                        .await
301                        .map_err(BoxedError::new)
302                        .context(ExternalSnafu)?
303                        .context(FlowNotFoundSnafu { id: flow_id })?;
304
305                    let sink_table_name = [
306                        info.sink_table_name().catalog_name.clone(),
307                        info.sink_table_name().schema_name.clone(),
308                        info.sink_table_name().table_name.clone(),
309                    ];
310                    let args = CreateFlowArgs {
311                        flow_id,
312                        sink_table_name,
313                        source_table_ids: info.source_table_ids().to_vec(),
314                        // because recover should only happen on restart the `create_if_not_exists` and `or_replace` can be arbitrary value(since flow doesn't exist)
315                        // but for the sake of consistency and to make sure recover of flow actually happen, we set both to true
316                        // (which is also fine since checks for not allow both to be true is on metasrv and we already pass that)
317                        create_if_not_exists: true,
318                        or_replace: true,
319                        expire_after: info.expire_after(),
320                        comment: Some(info.comment().clone()),
321                        sql: info.raw_sql().clone(),
322                        flow_options: info.options().clone(),
323                        query_ctx: info
324                            .query_context()
325                            .clone()
326                            .map(|ctx| {
327                                ctx.try_into()
328                                    .map_err(BoxedError::new)
329                                    .context(ExternalSnafu)
330                            })
331                            .transpose()?
332                            // or use default QueryContext with catalog_name from info
333                            // to keep compatibility with old version
334                            .or_else(|| {
335                                Some(
336                                    QueryContextBuilder::default()
337                                        .current_catalog(info.catalog_name().to_string())
338                                        .build(),
339                                )
340                            }),
341                    };
342                    if let Err(err) = self
343                        .create_flow(args)
344                        .await
345                        .map_err(BoxedError::new)
346                        .with_context(|_| CreateFlowSnafu {
347                            sql: info.raw_sql().clone(),
348                        })
349                    {
350                        errors.push((flow_id, err));
351                    }
352                }
353                if errors.is_empty() {
354                    info!("Recover flows successfully, flows: {:?}", to_be_created);
355                }
356
357                for (flow_id, err) in errors {
358                    warn!("Failed to recreate flow {}, err={:#?}", flow_id, err);
359                }
360            } else {
361                warn!(
362                    "Flows do not exist in flownode for node {:?}, flow_ids={:?}",
363                    nodeid, to_be_created
364                );
365            }
366        }
367        if !to_be_dropped.is_empty() {
368            if allow_drop {
369                info!("Dropping flows: {:?}", to_be_dropped);
370                let mut errors = vec![];
371                for flow_id in to_be_dropped {
372                    let flow_id = *flow_id;
373                    if let Err(err) = self.remove_flow(flow_id).await {
374                        errors.push((flow_id, err));
375                    }
376                }
377                for (flow_id, err) in errors {
378                    warn!("Failed to drop flow {}, err={:#?}", flow_id, err);
379                }
380            } else {
381                warn!(
382                    "Flows do not exist in metadata for node {:?}, flow_ids={:?}",
383                    nodeid, to_be_dropped
384                );
385            }
386        }
387        Ok(())
388    }
389
390    // TODO(discord9): consider sync this with heartbeat(might become necessary in the future)
391    pub async fn start_flow_consistent_check_task(self: &Arc<Self>) -> Result<(), Error> {
392        let mut check_task = self.check_task.lock().await;
393        ensure!(
394            check_task.is_none(),
395            IllegalCheckTaskStateSnafu {
396                reason: "Flow consistent check task already exists",
397            }
398        );
399        let task = ConsistentCheckTask::start_check_task(self).await?;
400        *check_task = Some(task);
401        Ok(())
402    }
403
404    pub async fn stop_flow_consistent_check_task(&self) -> Result<(), Error> {
405        info!("Stopping flow consistent check task");
406        let mut check_task = self.check_task.lock().await;
407
408        ensure!(
409            check_task.is_some(),
410            IllegalCheckTaskStateSnafu {
411                reason: "Flow consistent check task does not exist",
412            }
413        );
414
415        check_task.take().unwrap().stop().await?;
416        info!("Stopped flow consistent check task");
417        Ok(())
418    }
419
420    /// TODO(discord9): also add a `exists` api using flow metadata manager's `exists` method
421    async fn flow_exist_in_metadata(&self, flow_id: FlowId) -> Result<bool, Error> {
422        self.flow_metadata_manager
423            .flow_info_manager()
424            .get(flow_id as u32)
425            .await
426            .map_err(BoxedError::new)
427            .context(ExternalSnafu)
428            .map(|info| info.is_some())
429    }
430}
431
432struct ConsistentCheckTask {
433    handle: JoinHandle<()>,
434    shutdown_tx: tokio::sync::mpsc::Sender<()>,
435    trigger_tx: tokio::sync::mpsc::Sender<(bool, bool, tokio::sync::oneshot::Sender<()>)>,
436}
437
438impl ConsistentCheckTask {
439    async fn start_check_task(engine: &Arc<FlowDualEngine>) -> Result<Self, Error> {
440        let engine = engine.clone();
441        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
442        let (trigger_tx, mut trigger_rx) =
443            tokio::sync::mpsc::channel::<(bool, bool, tokio::sync::oneshot::Sender<()>)>(10);
444        let handle = common_runtime::spawn_global(async move {
445            // first check if available frontend is found
446            if let Err(err) = engine
447                .wait_for_available_frontend(FRONTEND_SCAN_TIMEOUT)
448                .await
449            {
450                warn!("No frontend is available yet:\n {err:?}");
451            }
452
453            // then do recover flows, if failed, always retry
454            let mut recover_retry = 0;
455            while let Err(err) = engine.check_flow_consistent(true, false).await {
456                recover_retry += 1;
457                error!(
458                    "Failed to recover flows:\n {err:?}, retry {} in {}s",
459                    recover_retry,
460                    MIN_REFRESH_DURATION.as_secs()
461                );
462                tokio::time::sleep(MIN_REFRESH_DURATION).await;
463            }
464
465            engine.set_done_recovering();
466
467            // then do check flows, with configurable allow_create and allow_drop
468            let (mut allow_create, mut allow_drop) = (false, false);
469            let mut ret_signal: Option<tokio::sync::oneshot::Sender<()>> = None;
470            loop {
471                if let Err(err) = engine.check_flow_consistent(allow_create, allow_drop).await {
472                    error!(err; "Failed to check flow consistent");
473                }
474                if let Some(done) = ret_signal.take() {
475                    let _ = done.send(());
476                }
477                tokio::select! {
478                    _ = rx.recv() => break,
479                    incoming = trigger_rx.recv() => if let Some(incoming) = incoming {
480                        (allow_create, allow_drop) = (incoming.0, incoming.1);
481                        ret_signal = Some(incoming.2);
482                    },
483                    _ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
484                        (allow_create, allow_drop) = (false, false);
485                    },
486                }
487            }
488        });
489        Ok(ConsistentCheckTask {
490            handle,
491            shutdown_tx: tx,
492            trigger_tx,
493        })
494    }
495
496    async fn trigger(&self, allow_create: bool, allow_drop: bool) -> Result<(), Error> {
497        let (tx, rx) = tokio::sync::oneshot::channel();
498        self.trigger_tx
499            .send((allow_create, allow_drop, tx))
500            .await
501            .map_err(|_| {
502                IllegalCheckTaskStateSnafu {
503                    reason: "Failed to send trigger signal",
504                }
505                .build()
506            })?;
507        rx.await.map_err(|_| {
508            IllegalCheckTaskStateSnafu {
509                reason: "Failed to receive trigger signal",
510            }
511            .build()
512        })?;
513        Ok(())
514    }
515
516    async fn stop(self) -> Result<(), Error> {
517        self.shutdown_tx.send(()).await.map_err(|_| {
518            IllegalCheckTaskStateSnafu {
519                reason: "Failed to send shutdown signal",
520            }
521            .build()
522        })?;
523        // abort so no need to wait
524        self.handle.abort();
525        Ok(())
526    }
527}
528
529#[derive(Default)]
530struct SrcTableToFlow {
531    /// mapping of table ids to flow ids for streaming mode
532    stream: HashMap<TableId, HashSet<FlowId>>,
533    /// mapping of table ids to flow ids for batching mode
534    batch: HashMap<TableId, HashSet<FlowId>>,
535    /// mapping of flow ids to (flow type, source table ids)
536    flow_infos: HashMap<FlowId, (FlowType, Vec<TableId>)>,
537}
538
539impl SrcTableToFlow {
540    fn in_stream(&self, table_id: TableId) -> bool {
541        self.stream.contains_key(&table_id)
542    }
543    fn in_batch(&self, table_id: TableId) -> bool {
544        self.batch.contains_key(&table_id)
545    }
546    fn add_flow(&mut self, flow_id: FlowId, flow_type: FlowType, src_table_ids: Vec<TableId>) {
547        let mapping = match flow_type {
548            FlowType::Streaming => &mut self.stream,
549            FlowType::Batching => &mut self.batch,
550        };
551
552        for src_table in src_table_ids.clone() {
553            mapping
554                .entry(src_table)
555                .and_modify(|flows| {
556                    flows.insert(flow_id);
557                })
558                .or_insert_with(|| {
559                    let mut set = HashSet::new();
560                    set.insert(flow_id);
561                    set
562                });
563        }
564        self.flow_infos.insert(flow_id, (flow_type, src_table_ids));
565    }
566
567    fn remove_flow(&mut self, flow_id: FlowId) {
568        let mapping = match self.get_flow_type(flow_id) {
569            Some(FlowType::Streaming) => &mut self.stream,
570            Some(FlowType::Batching) => &mut self.batch,
571            None => return,
572        };
573        if let Some((_, src_table_ids)) = self.flow_infos.remove(&flow_id) {
574            for src_table in src_table_ids {
575                if let Some(flows) = mapping.get_mut(&src_table) {
576                    flows.remove(&flow_id);
577                }
578            }
579        }
580    }
581
582    fn get_flow_type(&self, flow_id: FlowId) -> Option<FlowType> {
583        self.flow_infos
584            .get(&flow_id)
585            .map(|(flow_type, _)| flow_type)
586            .cloned()
587    }
588}
589
590impl FlowEngine for FlowDualEngine {
591    async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
592        let flow_type = args
593            .flow_options
594            .get(FlowType::FLOW_TYPE_KEY)
595            .map(|s| s.as_str());
596
597        let flow_type = match flow_type {
598            Some(FlowType::BATCHING) => FlowType::Batching,
599            Some(FlowType::STREAMING) => FlowType::Streaming,
600            None => FlowType::Batching,
601            Some(flow_type) => {
602                return InternalSnafu {
603                    reason: format!("Invalid flow type: {}", flow_type),
604                }
605                .fail()
606            }
607        };
608
609        let flow_id = args.flow_id;
610        let src_table_ids = args.source_table_ids.clone();
611
612        let res = match flow_type {
613            FlowType::Batching => self.batching_engine.create_flow(args).await,
614            FlowType::Streaming => self.streaming_engine.create_flow(args).await,
615        }?;
616
617        self.src_table2flow
618            .write()
619            .await
620            .add_flow(flow_id, flow_type, src_table_ids);
621
622        Ok(res)
623    }
624
625    async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
626        let flow_type = self.src_table2flow.read().await.get_flow_type(flow_id);
627
628        match flow_type {
629            Some(FlowType::Batching) => self.batching_engine.remove_flow(flow_id).await,
630            Some(FlowType::Streaming) => self.streaming_engine.remove_flow(flow_id).await,
631            None => {
632                // this can happen if flownode just restart, and is stilling creating the flow
633                // since now that this flow should dropped, we need to trigger the consistent check and allow drop
634                // this rely on drop flow ddl delete metadata first, see src/common/meta/src/ddl/drop_flow.rs
635                warn!(
636                    "Flow {} is not exist in the underlying engine, but exist in metadata",
637                    flow_id
638                );
639                self.try_sync_with_check_task(flow_id, true).await?;
640
641                Ok(())
642            }
643        }?;
644        // remove mapping
645        self.src_table2flow.write().await.remove_flow(flow_id);
646        Ok(())
647    }
648
649    async fn flush_flow(&self, flow_id: FlowId) -> Result<usize, Error> {
650        // sync with check task
651        self.try_sync_with_check_task(flow_id, false).await?;
652        let flow_type = self.src_table2flow.read().await.get_flow_type(flow_id);
653        match flow_type {
654            Some(FlowType::Batching) => self.batching_engine.flush_flow(flow_id).await,
655            Some(FlowType::Streaming) => self.streaming_engine.flush_flow(flow_id).await,
656            None => {
657                warn!(
658                    "Currently flow={flow_id} doesn't exist in flownode, ignore flush_flow request"
659                );
660                Ok(0)
661            }
662        }
663    }
664
665    async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error> {
666        let flow_type = self.src_table2flow.read().await.get_flow_type(flow_id);
667        // not using `flow_type.is_some()` to make sure the flow is actually exist in the underlying engine
668        match flow_type {
669            Some(FlowType::Batching) => self.batching_engine.flow_exist(flow_id).await,
670            Some(FlowType::Streaming) => self.streaming_engine.flow_exist(flow_id).await,
671            None => Ok(false),
672        }
673    }
674
675    async fn list_flows(&self) -> Result<impl IntoIterator<Item = FlowId>, Error> {
676        let stream_flows = self.streaming_engine.list_flows().await?;
677        let batch_flows = self.batching_engine.list_flows().await?;
678
679        Ok(stream_flows.into_iter().chain(batch_flows))
680    }
681
682    async fn handle_flow_inserts(
683        &self,
684        request: api::v1::region::InsertRequests,
685    ) -> Result<(), Error> {
686        self.wait_for_all_flow_recover(request.requests.len())
687            .await?;
688        // TODO(discord9): make as little clone as possible
689        let mut to_stream_engine = Vec::with_capacity(request.requests.len());
690        let mut to_batch_engine = request.requests;
691
692        {
693            // not locking this, or recover flows will be starved when also handling flow inserts
694            let src_table2flow = self.src_table2flow.read().await;
695            to_batch_engine.retain(|req| {
696                let region_id = RegionId::from(req.region_id);
697                let table_id = region_id.table_id();
698                let is_in_stream = src_table2flow.in_stream(table_id);
699                let is_in_batch = src_table2flow.in_batch(table_id);
700                if is_in_stream {
701                    to_stream_engine.push(req.clone());
702                }
703                if is_in_batch {
704                    return true;
705                }
706                if !is_in_batch && !is_in_stream {
707                    // TODO(discord9): also put to centralized logging for flow once it implemented
708                    warn!("Table {} is not any flow's source table", table_id)
709                }
710                false
711            });
712            // drop(src_table2flow);
713            // can't use drop due to https://github.com/rust-lang/rust/pull/128846
714        }
715
716        let streaming_engine = self.streaming_engine.clone();
717        let stream_handler: JoinHandle<Result<(), Error>> =
718            common_runtime::spawn_global(async move {
719                streaming_engine
720                    .handle_flow_inserts(api::v1::region::InsertRequests {
721                        requests: to_stream_engine,
722                    })
723                    .await?;
724                Ok(())
725            });
726        self.batching_engine
727            .handle_flow_inserts(api::v1::region::InsertRequests {
728                requests: to_batch_engine,
729            })
730            .await?;
731        stream_handler.await.context(JoinTaskSnafu)??;
732
733        Ok(())
734    }
735}
736
737#[async_trait::async_trait]
738impl common_meta::node_manager::Flownode for FlowDualEngine {
739    async fn handle(&self, request: FlowRequest) -> MetaResult<FlowResponse> {
740        let query_ctx = request
741            .header
742            .and_then(|h| h.query_context)
743            .map(|ctx| ctx.into());
744        match request.body {
745            Some(flow_request::Body::Create(CreateRequest {
746                flow_id: Some(task_id),
747                source_table_ids,
748                sink_table_name: Some(sink_table_name),
749                create_if_not_exists,
750                expire_after,
751                comment,
752                sql,
753                flow_options,
754                or_replace,
755            })) => {
756                let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec();
757                let sink_table_name = [
758                    sink_table_name.catalog_name,
759                    sink_table_name.schema_name,
760                    sink_table_name.table_name,
761                ];
762                let expire_after = expire_after.map(|e| e.value);
763                let args = CreateFlowArgs {
764                    flow_id: task_id.id as u64,
765                    sink_table_name,
766                    source_table_ids,
767                    create_if_not_exists,
768                    or_replace,
769                    expire_after,
770                    comment: Some(comment),
771                    sql: sql.clone(),
772                    flow_options,
773                    query_ctx,
774                };
775                let ret = self
776                    .create_flow(args)
777                    .await
778                    .map_err(BoxedError::new)
779                    .with_context(|_| CreateFlowSnafu { sql: sql.clone() })
780                    .map_err(to_meta_err(snafu::location!()))?;
781                METRIC_FLOW_TASK_COUNT.inc();
782                Ok(FlowResponse {
783                    affected_flows: ret
784                        .map(|id| greptime_proto::v1::FlowId { id: id as u32 })
785                        .into_iter()
786                        .collect_vec(),
787                    ..Default::default()
788                })
789            }
790            Some(flow_request::Body::Drop(DropRequest {
791                flow_id: Some(flow_id),
792            })) => {
793                self.remove_flow(flow_id.id as u64)
794                    .await
795                    .map_err(to_meta_err(snafu::location!()))?;
796                METRIC_FLOW_TASK_COUNT.dec();
797                Ok(Default::default())
798            }
799            Some(flow_request::Body::Flush(FlushFlow {
800                flow_id: Some(flow_id),
801            })) => {
802                let row = self
803                    .flush_flow(flow_id.id as u64)
804                    .await
805                    .map_err(to_meta_err(snafu::location!()))?;
806                Ok(FlowResponse {
807                    affected_flows: vec![flow_id],
808                    affected_rows: row as u64,
809                    ..Default::default()
810                })
811            }
812            other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
813        }
814    }
815
816    async fn handle_inserts(&self, request: InsertRequests) -> MetaResult<FlowResponse> {
817        FlowEngine::handle_flow_inserts(self, request)
818            .await
819            .map(|_| Default::default())
820            .map_err(to_meta_err(snafu::location!()))
821    }
822}
823
824/// return a function to convert `crate::error::Error` to `common_meta::error::Error`
825fn to_meta_err(
826    location: snafu::Location,
827) -> impl FnOnce(crate::error::Error) -> common_meta::error::Error {
828    move |err: crate::error::Error| -> common_meta::error::Error {
829        match err {
830            crate::error::Error::FlowNotFound { id, .. } => {
831                common_meta::error::Error::FlowNotFound {
832                    flow_name: format!("flow_id={id}"),
833                    location,
834                }
835            }
836            _ => common_meta::error::Error::External {
837                location,
838                source: BoxedError::new(err),
839            },
840        }
841    }
842}
843
844#[async_trait::async_trait]
845impl common_meta::node_manager::Flownode for StreamingEngine {
846    async fn handle(&self, request: FlowRequest) -> MetaResult<FlowResponse> {
847        let query_ctx = request
848            .header
849            .and_then(|h| h.query_context)
850            .map(|ctx| ctx.into());
851        match request.body {
852            Some(flow_request::Body::Create(CreateRequest {
853                flow_id: Some(task_id),
854                source_table_ids,
855                sink_table_name: Some(sink_table_name),
856                create_if_not_exists,
857                expire_after,
858                comment,
859                sql,
860                flow_options,
861                or_replace,
862            })) => {
863                let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec();
864                let sink_table_name = [
865                    sink_table_name.catalog_name,
866                    sink_table_name.schema_name,
867                    sink_table_name.table_name,
868                ];
869                let expire_after = expire_after.map(|e| e.value);
870                let args = CreateFlowArgs {
871                    flow_id: task_id.id as u64,
872                    sink_table_name,
873                    source_table_ids,
874                    create_if_not_exists,
875                    or_replace,
876                    expire_after,
877                    comment: Some(comment),
878                    sql: sql.clone(),
879                    flow_options,
880                    query_ctx,
881                };
882                let ret = self
883                    .create_flow(args)
884                    .await
885                    .map_err(BoxedError::new)
886                    .with_context(|_| CreateFlowSnafu { sql: sql.clone() })
887                    .map_err(to_meta_err(snafu::location!()))?;
888                METRIC_FLOW_TASK_COUNT.inc();
889                Ok(FlowResponse {
890                    affected_flows: ret
891                        .map(|id| greptime_proto::v1::FlowId { id: id as u32 })
892                        .into_iter()
893                        .collect_vec(),
894                    ..Default::default()
895                })
896            }
897            Some(flow_request::Body::Drop(DropRequest {
898                flow_id: Some(flow_id),
899            })) => {
900                self.remove_flow(flow_id.id as u64)
901                    .await
902                    .map_err(to_meta_err(snafu::location!()))?;
903                METRIC_FLOW_TASK_COUNT.dec();
904                Ok(Default::default())
905            }
906            Some(flow_request::Body::Flush(FlushFlow {
907                flow_id: Some(flow_id),
908            })) => {
909                let row = self
910                    .flush_flow_inner(flow_id.id as u64)
911                    .await
912                    .map_err(to_meta_err(snafu::location!()))?;
913                Ok(FlowResponse {
914                    affected_flows: vec![flow_id],
915                    affected_rows: row as u64,
916                    ..Default::default()
917                })
918            }
919            other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
920        }
921    }
922
923    async fn handle_inserts(&self, request: InsertRequests) -> MetaResult<FlowResponse> {
924        self.handle_inserts_inner(request)
925            .await
926            .map(|_| Default::default())
927            .map_err(to_meta_err(snafu::location!()))
928    }
929}
930
931impl FlowEngine for StreamingEngine {
932    async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
933        self.create_flow_inner(args).await
934    }
935
936    async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
937        self.remove_flow_inner(flow_id).await
938    }
939
940    async fn flush_flow(&self, flow_id: FlowId) -> Result<usize, Error> {
941        self.flush_flow_inner(flow_id).await
942    }
943
944    async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error> {
945        self.flow_exist_inner(flow_id).await
946    }
947
948    async fn list_flows(&self) -> Result<impl IntoIterator<Item = FlowId>, Error> {
949        Ok(self
950            .flow_err_collectors
951            .read()
952            .await
953            .keys()
954            .cloned()
955            .collect::<Vec<_>>())
956    }
957
958    async fn handle_flow_inserts(
959        &self,
960        request: api::v1::region::InsertRequests,
961    ) -> Result<(), Error> {
962        self.handle_inserts_inner(request).await
963    }
964}
965
966/// Simple helper enum for fetching value from row with default value
967#[derive(Debug, Clone)]
968enum FetchFromRow {
969    Idx(usize),
970    Default(Value),
971}
972
973impl FetchFromRow {
974    /// Panic if idx is out of bound
975    fn fetch(&self, row: &repr::Row) -> Value {
976        match self {
977            FetchFromRow::Idx(idx) => row.get(*idx).unwrap().clone(),
978            FetchFromRow::Default(v) => v.clone(),
979        }
980    }
981}
982
983impl StreamingEngine {
984    async fn handle_inserts_inner(
985        &self,
986        request: InsertRequests,
987    ) -> std::result::Result<(), Error> {
988        // using try_read to ensure two things:
989        // 1. flush wouldn't happen until inserts before it is inserted
990        // 2. inserts happening concurrently with flush wouldn't be block by flush
991        let _flush_lock = self.flush_lock.try_read();
992        for write_request in request.requests {
993            let region_id = write_request.region_id;
994            let table_id = RegionId::from(region_id).table_id();
995
996            let (insert_schema, rows_proto) = write_request
997                .rows
998                .map(|r| (r.schema, r.rows))
999                .unwrap_or_default();
1000
1001            // TODO(discord9): reconsider time assignment mechanism
1002            let now = self.tick_manager.tick();
1003
1004            let (table_types, fetch_order) = {
1005                let ctx = self.node_context.read().await;
1006
1007                // TODO(discord9): also check schema version so that altered table can be reported
1008                let table_schema = ctx.table_source.table_from_id(&table_id).await?;
1009                let default_vals = table_schema
1010                    .default_values
1011                    .iter()
1012                    .zip(table_schema.relation_desc.typ().column_types.iter())
1013                    .map(|(v, ty)| {
1014                        v.as_ref().and_then(|v| {
1015                            match v.create_default(ty.scalar_type(), ty.nullable()) {
1016                                Ok(v) => Some(v),
1017                                Err(err) => {
1018                                    common_telemetry::error!(err; "Failed to create default value");
1019                                    None
1020                                }
1021                            }
1022                        })
1023                    })
1024                    .collect_vec();
1025
1026                let table_types = table_schema
1027                    .relation_desc
1028                    .typ()
1029                    .column_types
1030                    .clone()
1031                    .into_iter()
1032                    .map(|t| t.scalar_type)
1033                    .collect_vec();
1034                let table_col_names = table_schema.relation_desc.names;
1035                let table_col_names = table_col_names
1036                    .iter().enumerate()
1037                    .map(|(idx,name)| match name {
1038                        Some(name) => Ok(name.clone()),
1039                        None => InternalSnafu {
1040                            reason: format!("Expect column {idx} of table id={table_id} to have name in table schema, found None"),
1041                        }
1042                        .fail(),
1043                    })
1044                    .collect::<Result<Vec<_>, _>>()?;
1045                let name_to_col = HashMap::<_, _>::from_iter(
1046                    insert_schema
1047                        .iter()
1048                        .enumerate()
1049                        .map(|(i, name)| (&name.column_name, i)),
1050                );
1051
1052                let fetch_order: Vec<FetchFromRow> = table_col_names
1053                    .iter()
1054                    .zip(default_vals.into_iter())
1055                    .map(|(col_name, col_default_val)| {
1056                        name_to_col
1057                            .get(col_name)
1058                            .copied()
1059                            .map(FetchFromRow::Idx)
1060                            .or_else(|| col_default_val.clone().map(FetchFromRow::Default))
1061                            .with_context(|| UnexpectedSnafu {
1062                                reason: format!(
1063                                    "Column not found: {}, default_value: {:?}",
1064                                    col_name, col_default_val
1065                                ),
1066                            })
1067                    })
1068                    .try_collect()?;
1069
1070                trace!("Reordering columns: {:?}", fetch_order);
1071                (table_types, fetch_order)
1072            };
1073
1074            // TODO(discord9): use column instead of row
1075            let rows: Vec<DiffRow> = rows_proto
1076                .into_iter()
1077                .map(|r| {
1078                    let r = repr::Row::from(r);
1079                    let reordered = fetch_order.iter().map(|i| i.fetch(&r)).collect_vec();
1080                    repr::Row::new(reordered)
1081                })
1082                .map(|r| (r, now, 1))
1083                .collect_vec();
1084            if let Err(err) = self
1085                .handle_write_request(region_id.into(), rows, &table_types)
1086                .await
1087            {
1088                let err = BoxedError::new(err);
1089                let flow_ids = self
1090                    .node_context
1091                    .read()
1092                    .await
1093                    .get_flow_ids(table_id)
1094                    .into_iter()
1095                    .flatten()
1096                    .cloned()
1097                    .collect_vec();
1098                let err = InsertIntoFlowSnafu {
1099                    region_id,
1100                    flow_ids,
1101                }
1102                .into_error(err);
1103                common_telemetry::error!(err; "Failed to handle write request");
1104                return Err(err);
1105            }
1106        }
1107        Ok(())
1108    }
1109}