flow/
adapter.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! for getting data from source and sending results to sink
16//! and communicating with other parts of the database
17#![warn(unused_imports)]
18
19use std::collections::BTreeMap;
20use std::sync::Arc;
21use std::time::{Duration, Instant, SystemTime};
22
23use api::v1::{RowDeleteRequest, RowDeleteRequests, RowInsertRequest, RowInsertRequests};
24use common_config::Configurable;
25use common_error::ext::BoxedError;
26use common_meta::key::TableMetadataManagerRef;
27use common_runtime::JoinHandle;
28use common_telemetry::logging::{LoggingOptions, TracingOptions};
29use common_telemetry::{debug, info, trace};
30use datatypes::schema::ColumnSchema;
31use datatypes::value::Value;
32use greptime_proto::v1;
33use itertools::{EitherOrBoth, Itertools};
34use meta_client::MetaClientOptions;
35use query::options::QueryOptions;
36use query::QueryEngine;
37use serde::{Deserialize, Serialize};
38use servers::grpc::GrpcOptions;
39use servers::heartbeat_options::HeartbeatOptions;
40use servers::http::HttpOptions;
41use session::context::QueryContext;
42use snafu::{ensure, OptionExt, ResultExt};
43use store_api::storage::{ConcreteDataType, RegionId};
44use table::metadata::TableId;
45use tokio::sync::broadcast::error::TryRecvError;
46use tokio::sync::{broadcast, watch, Mutex, RwLock};
47
48pub(crate) use crate::adapter::node_context::FlownodeContext;
49use crate::adapter::refill::RefillTask;
50use crate::adapter::table_source::ManagedTableSource;
51use crate::adapter::util::relation_desc_to_column_schemas_with_fallback;
52pub(crate) use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
53use crate::batching_mode::BatchingModeOptions;
54use crate::compute::ErrCollector;
55use crate::df_optimizer::sql_to_flow_plan;
56use crate::error::{EvalSnafu, ExternalSnafu, InternalSnafu, InvalidQuerySnafu, UnexpectedSnafu};
57use crate::expr::Batch;
58use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_ROWS, METRIC_FLOW_RUN_INTERVAL_MS};
59use crate::repr::{self, DiffRow, RelationDesc, Row, BATCH_SIZE};
60use crate::{CreateFlowArgs, FlowId, TableName};
61
62pub(crate) mod flownode_impl;
63mod parse_expr;
64pub(crate) mod refill;
65mod stat;
66#[cfg(test)]
67mod tests;
68pub(crate) mod util;
69mod worker;
70
71pub(crate) mod node_context;
72pub(crate) mod table_source;
73
74use crate::error::Error;
75use crate::utils::StateReportHandler;
76use crate::FrontendInvoker;
77
78// `GREPTIME_TIMESTAMP` is not used to distinguish when table is created automatically by flow
79pub const AUTO_CREATED_PLACEHOLDER_TS_COL: &str = "__ts_placeholder";
80
81pub const AUTO_CREATED_UPDATE_AT_TS_COL: &str = "update_at";
82
83/// Flow config that exists both in standalone&distributed mode
84#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
85#[serde(default)]
86pub struct FlowConfig {
87    pub num_workers: usize,
88    pub batching_mode: BatchingModeOptions,
89}
90
91impl Default for FlowConfig {
92    fn default() -> Self {
93        Self {
94            num_workers: (common_config::utils::get_cpus() / 2).max(1),
95            batching_mode: BatchingModeOptions::default(),
96        }
97    }
98}
99
100/// Options for flow node
101#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
102#[serde(default)]
103pub struct FlownodeOptions {
104    pub node_id: Option<u64>,
105    pub flow: FlowConfig,
106    pub grpc: GrpcOptions,
107    pub http: HttpOptions,
108    pub meta_client: Option<MetaClientOptions>,
109    pub logging: LoggingOptions,
110    pub tracing: TracingOptions,
111    pub heartbeat: HeartbeatOptions,
112    pub query: QueryOptions,
113    pub user_provider: Option<String>,
114}
115
116impl Default for FlownodeOptions {
117    fn default() -> Self {
118        Self {
119            node_id: None,
120            flow: FlowConfig::default(),
121            grpc: GrpcOptions::default().with_bind_addr("127.0.0.1:3004"),
122            http: HttpOptions::default(),
123            meta_client: None,
124            logging: LoggingOptions::default(),
125            tracing: TracingOptions::default(),
126            heartbeat: HeartbeatOptions::default(),
127            // flownode's query option is set to 1 to throttle flow's query so
128            // that it won't use too much cpu or memory
129            query: QueryOptions { parallelism: 1 },
130            user_provider: None,
131        }
132    }
133}
134
135impl Configurable for FlownodeOptions {
136    fn validate_sanitize(&mut self) -> common_config::error::Result<()> {
137        if self.flow.num_workers == 0 {
138            self.flow.num_workers = (common_config::utils::get_cpus() / 2).max(1);
139        }
140        Ok(())
141    }
142}
143
144/// Arc-ed FlowNodeManager, cheaper to clone
145pub type FlowStreamingEngineRef = Arc<StreamingEngine>;
146
147/// FlowNodeManager manages the state of all tasks in the flow node, which should be run on the same thread
148///
149/// The choice of timestamp is just using current system timestamp for now
150///
151pub struct StreamingEngine {
152    /// The handler to the worker that will run the dataflow
153    /// which is `!Send` so a handle is used
154    pub worker_handles: Vec<WorkerHandle>,
155    /// The selector to select a worker to run the dataflow
156    worker_selector: Mutex<usize>,
157    /// The query engine that will be used to parse the query and convert it to a dataflow plan
158    pub query_engine: Arc<dyn QueryEngine>,
159    /// Getting table name and table schema from table info manager
160    table_info_source: ManagedTableSource,
161    frontend_invoker: RwLock<Option<FrontendInvoker>>,
162    /// contains mapping from table name to global id, and table schema
163    node_context: RwLock<FlownodeContext>,
164    /// Contains all refill tasks
165    refill_tasks: RwLock<BTreeMap<FlowId, RefillTask>>,
166    flow_err_collectors: RwLock<BTreeMap<FlowId, ErrCollector>>,
167    src_send_buf_lens: RwLock<BTreeMap<TableId, watch::Receiver<usize>>>,
168    tick_manager: FlowTickManager,
169    /// This node id is only available in distributed mode, on standalone mode this is guaranteed to be `None`
170    pub node_id: Option<u32>,
171    /// Lock for flushing, will be `read` by `handle_inserts` and `write` by `flush_flow`
172    ///
173    /// So that a series of event like `inserts -> flush` can be handled correctly
174    flush_lock: RwLock<()>,
175    /// receive a oneshot sender to send state size report
176    state_report_handler: RwLock<Option<StateReportHandler>>,
177}
178
179/// Building FlownodeManager
180impl StreamingEngine {
181    /// set frontend invoker
182    pub async fn set_frontend_invoker(&self, frontend: FrontendInvoker) {
183        *self.frontend_invoker.write().await = Some(frontend);
184    }
185
186    /// Create **without** setting `frontend_invoker`
187    pub fn new(
188        node_id: Option<u32>,
189        query_engine: Arc<dyn QueryEngine>,
190        table_meta: TableMetadataManagerRef,
191    ) -> Self {
192        let srv_map = ManagedTableSource::new(
193            table_meta.table_info_manager().clone(),
194            table_meta.table_name_manager().clone(),
195        );
196        let node_context = FlownodeContext::new(Box::new(srv_map.clone()) as _);
197        let tick_manager = FlowTickManager::new();
198        let worker_handles = Vec::new();
199        StreamingEngine {
200            worker_handles,
201            worker_selector: Mutex::new(0),
202            query_engine,
203            table_info_source: srv_map,
204            frontend_invoker: RwLock::new(None),
205            node_context: RwLock::new(node_context),
206            refill_tasks: Default::default(),
207            flow_err_collectors: Default::default(),
208            src_send_buf_lens: Default::default(),
209            tick_manager,
210            node_id,
211            flush_lock: RwLock::new(()),
212            state_report_handler: RwLock::new(None),
213        }
214    }
215
216    pub async fn with_state_report_handler(self, handler: StateReportHandler) -> Self {
217        *self.state_report_handler.write().await = Some(handler);
218        self
219    }
220
221    /// Create a flownode manager with one worker
222    pub fn new_with_workers<'s>(
223        node_id: Option<u32>,
224        query_engine: Arc<dyn QueryEngine>,
225        table_meta: TableMetadataManagerRef,
226        num_workers: usize,
227    ) -> (Self, Vec<Worker<'s>>) {
228        let mut zelf = Self::new(node_id, query_engine, table_meta);
229
230        let workers: Vec<_> = (0..num_workers)
231            .map(|_| {
232                let (handle, worker) = create_worker();
233                zelf.add_worker_handle(handle);
234                worker
235            })
236            .collect();
237        (zelf, workers)
238    }
239
240    /// add a worker handler to manager, meaning this corresponding worker is under it's manage
241    pub fn add_worker_handle(&mut self, handle: WorkerHandle) {
242        self.worker_handles.push(handle);
243    }
244}
245
246#[derive(Debug)]
247pub enum DiffRequest {
248    Insert(Vec<(Row, repr::Timestamp)>),
249    Delete(Vec<(Row, repr::Timestamp)>),
250}
251
252impl DiffRequest {
253    pub fn len(&self) -> usize {
254        match self {
255            Self::Insert(v) => v.len(),
256            Self::Delete(v) => v.len(),
257        }
258    }
259
260    pub fn is_empty(&self) -> bool {
261        self.len() == 0
262    }
263}
264
265pub fn batches_to_rows_req(batches: Vec<Batch>) -> Result<Vec<DiffRequest>, Error> {
266    let mut reqs = Vec::new();
267    for batch in batches {
268        let mut rows = Vec::with_capacity(batch.row_count());
269        for i in 0..batch.row_count() {
270            let row = batch.get_row(i).context(EvalSnafu)?;
271            rows.push((Row::new(row), 0));
272        }
273        reqs.push(DiffRequest::Insert(rows));
274    }
275    Ok(reqs)
276}
277
278/// This impl block contains methods to send writeback requests to frontend
279impl StreamingEngine {
280    /// Return the number of requests it made
281    pub async fn send_writeback_requests(&self) -> Result<usize, Error> {
282        let all_reqs = self.generate_writeback_request().await?;
283        if all_reqs.is_empty() || all_reqs.iter().all(|v| v.1.is_empty()) {
284            return Ok(0);
285        }
286        let mut req_cnt = 0;
287        for (table_name, reqs) in all_reqs {
288            if reqs.is_empty() {
289                continue;
290            }
291            let (catalog, schema) = (table_name[0].clone(), table_name[1].clone());
292            let ctx = Arc::new(QueryContext::with(&catalog, &schema));
293
294            let (is_ts_placeholder, proto_schema) = match self
295                .try_fetch_existing_table(&table_name)
296                .await?
297                .context(UnexpectedSnafu {
298                    reason: format!("Table not found: {}", table_name.join(".")),
299                }) {
300                Ok(r) => r,
301                Err(e) => {
302                    if self
303                        .table_info_source
304                        .get_opt_table_id_from_name(&table_name)
305                        .await?
306                        .is_none()
307                    {
308                        // deal with both flow&sink table no longer exists
309                        // but some output is still in output buf
310                        common_telemetry::warn!(e; "Table `{}` no longer exists, skip writeback", table_name.join("."));
311                        continue;
312                    } else {
313                        return Err(e);
314                    }
315                }
316            };
317            let schema_len = proto_schema.len();
318
319            let total_rows = reqs.iter().map(|r| r.len()).sum::<usize>();
320            trace!(
321                "Sending {} writeback requests to table {}, reqs total rows={}",
322                reqs.len(),
323                table_name.join("."),
324                reqs.iter().map(|r| r.len()).sum::<usize>()
325            );
326
327            METRIC_FLOW_ROWS
328                .with_label_values(&["out-streaming"])
329                .inc_by(total_rows as u64);
330
331            let now = self.tick_manager.tick();
332            for req in reqs {
333                match req {
334                    DiffRequest::Insert(insert) => {
335                        let rows_proto: Vec<v1::Row> = insert
336                            .into_iter()
337                            .map(|(mut row, _ts)| {
338                                // extend `update_at` col if needed
339                                // if schema include a millisecond timestamp here, and result row doesn't have it, add it
340                                if row.len() < proto_schema.len()
341                                    && proto_schema[row.len()].datatype
342                                        == greptime_proto::v1::ColumnDataType::TimestampMillisecond
343                                            as i32
344                                {
345                                    row.extend([Value::from(
346                                        common_time::Timestamp::new_millisecond(now),
347                                    )]);
348                                }
349                                // ts col, if auto create
350                                if is_ts_placeholder {
351                                    ensure!(
352                                        row.len() == schema_len - 1,
353                                        InternalSnafu {
354                                            reason: format!(
355                                                "Row len mismatch, expect {} got {}",
356                                                schema_len - 1,
357                                                row.len()
358                                            )
359                                        }
360                                    );
361                                    row.extend([Value::from(
362                                        common_time::Timestamp::new_millisecond(0),
363                                    )]);
364                                }
365                                if row.len() != proto_schema.len() {
366                                    UnexpectedSnafu {
367                                        reason: format!(
368                                            "Flow output row length mismatch, expect {} got {}, the columns in schema are: {:?}",
369                                            proto_schema.len(),
370                                            row.len(),
371                                            proto_schema.iter().map(|c|&c.column_name).collect_vec()
372                                        ),
373                                    }
374                                    .fail()?;
375                                }
376                                Ok(row.into())
377                            })
378                            .collect::<Result<Vec<_>, Error>>()?;
379                        let table_name = table_name.last().unwrap().clone();
380                        let req = RowInsertRequest {
381                            table_name,
382                            rows: Some(v1::Rows {
383                                schema: proto_schema.clone(),
384                                rows: rows_proto,
385                            }),
386                        };
387                        req_cnt += 1;
388                        self.frontend_invoker
389                            .read()
390                            .await
391                            .as_ref()
392                            .with_context(|| UnexpectedSnafu {
393                                reason: "Expect a frontend invoker for flownode to write back",
394                            })?
395                            .row_inserts(RowInsertRequests { inserts: vec![req] }, ctx.clone())
396                            .await
397                            .map_err(BoxedError::new)
398                            .with_context(|_| ExternalSnafu {})?;
399                    }
400                    DiffRequest::Delete(remove) => {
401                        info!("original remove rows={:?}", remove);
402                        let rows_proto: Vec<v1::Row> = remove
403                            .into_iter()
404                            .map(|(mut row, _ts)| {
405                                row.extend(Some(Value::from(
406                                    common_time::Timestamp::new_millisecond(0),
407                                )));
408                                row.into()
409                            })
410                            .collect::<Vec<_>>();
411                        let table_name = table_name.last().unwrap().clone();
412                        let req = RowDeleteRequest {
413                            table_name,
414                            rows: Some(v1::Rows {
415                                schema: proto_schema.clone(),
416                                rows: rows_proto,
417                            }),
418                        };
419
420                        req_cnt += 1;
421                        self.frontend_invoker
422                            .read()
423                            .await
424                            .as_ref()
425                            .with_context(|| UnexpectedSnafu {
426                                reason: "Expect a frontend invoker for flownode to write back",
427                            })?
428                            .row_deletes(RowDeleteRequests { deletes: vec![req] }, ctx.clone())
429                            .await
430                            .map_err(BoxedError::new)
431                            .with_context(|_| ExternalSnafu {})?;
432                    }
433                }
434            }
435        }
436        Ok(req_cnt)
437    }
438
439    /// Generate writeback request for all sink table
440    pub async fn generate_writeback_request(
441        &self,
442    ) -> Result<BTreeMap<TableName, Vec<DiffRequest>>, Error> {
443        trace!("Start to generate writeback request");
444        let mut output = BTreeMap::new();
445        let mut total_row_count = 0;
446        for (name, sink_recv) in self
447            .node_context
448            .write()
449            .await
450            .sink_receiver
451            .iter_mut()
452            .map(|(n, (_s, r))| (n, r))
453        {
454            let mut batches = Vec::new();
455            while let Ok(batch) = sink_recv.try_recv() {
456                total_row_count += batch.row_count();
457                batches.push(batch);
458            }
459            let reqs = batches_to_rows_req(batches)?;
460            output.insert(name.clone(), reqs);
461        }
462        trace!("Prepare writeback req: total row count={}", total_row_count);
463        Ok(output)
464    }
465
466    /// Fetch table schema and primary key from table info source, if table not exist return None
467    async fn fetch_table_pk_schema(
468        &self,
469        table_name: &TableName,
470    ) -> Result<Option<(Vec<String>, Option<usize>, Vec<ColumnSchema>)>, Error> {
471        if let Some(table_id) = self
472            .table_info_source
473            .get_opt_table_id_from_name(table_name)
474            .await?
475        {
476            let table_info = self
477                .table_info_source
478                .get_table_info_value(&table_id)
479                .await?
480                .unwrap();
481            let meta = table_info.table_info.meta;
482            let primary_keys = meta
483                .primary_key_indices
484                .into_iter()
485                .map(|i| meta.schema.column_schemas[i].name.clone())
486                .collect_vec();
487            let schema = meta.schema.column_schemas;
488            let time_index = meta.schema.timestamp_index;
489            Ok(Some((primary_keys, time_index, schema)))
490        } else {
491            Ok(None)
492        }
493    }
494
495    /// return (primary keys, schema and if the table have a placeholder timestamp column)
496    /// schema of the table comes from flow's output plan
497    ///
498    /// adjust to add `update_at` column and ts placeholder if needed
499    async fn adjust_auto_created_table_schema(
500        &self,
501        schema: &RelationDesc,
502    ) -> Result<(Vec<String>, Vec<ColumnSchema>, bool), Error> {
503        // TODO(discord9): consider remove buggy auto create by schema
504
505        // TODO(discord9): use default key from schema
506        let primary_keys = schema
507            .typ()
508            .keys
509            .first()
510            .map(|v| {
511                v.column_indices
512                    .iter()
513                    .map(|i| {
514                        schema
515                            .get_name(*i)
516                            .clone()
517                            .unwrap_or_else(|| format!("col_{i}"))
518                    })
519                    .collect_vec()
520            })
521            .unwrap_or_default();
522        let update_at = ColumnSchema::new(
523            AUTO_CREATED_UPDATE_AT_TS_COL,
524            ConcreteDataType::timestamp_millisecond_datatype(),
525            true,
526        );
527
528        let original_schema = relation_desc_to_column_schemas_with_fallback(schema);
529
530        let mut with_auto_added_col = original_schema.clone();
531        with_auto_added_col.push(update_at);
532
533        // if no time index, add one as placeholder
534        let no_time_index = schema.typ().time_index.is_none();
535        if no_time_index {
536            let ts_col = ColumnSchema::new(
537                AUTO_CREATED_PLACEHOLDER_TS_COL,
538                ConcreteDataType::timestamp_millisecond_datatype(),
539                true,
540            )
541            .with_time_index(true);
542            with_auto_added_col.push(ts_col);
543        }
544
545        Ok((primary_keys, with_auto_added_col, no_time_index))
546    }
547}
548
549/// Flow Runtime related methods
550impl StreamingEngine {
551    /// Start state report handler, which will receive a sender from HeartbeatTask to send state size report back
552    ///
553    /// if heartbeat task is shutdown, this future will exit too
554    async fn start_state_report_handler(self: Arc<Self>) -> Option<JoinHandle<()>> {
555        let state_report_handler = self.state_report_handler.write().await.take();
556        if let Some(mut handler) = state_report_handler {
557            let zelf = self.clone();
558            let handler = common_runtime::spawn_global(async move {
559                while let Some(ret_handler) = handler.recv().await {
560                    let state_report = zelf.gen_state_report().await;
561                    ret_handler.send(state_report).unwrap_or_else(|err| {
562                        common_telemetry::error!(err; "Send state size report error");
563                    });
564                }
565            });
566            Some(handler)
567        } else {
568            None
569        }
570    }
571
572    /// run in common_runtime background runtime
573    pub fn run_background(
574        self: Arc<Self>,
575        shutdown: Option<broadcast::Receiver<()>>,
576    ) -> JoinHandle<()> {
577        info!("Starting flownode manager's background task");
578        common_runtime::spawn_global(async move {
579            let _state_report_handler = self.clone().start_state_report_handler().await;
580            self.run(shutdown).await;
581        })
582    }
583
584    /// log all flow errors
585    pub async fn log_all_errors(&self) {
586        for (f_id, f_err) in self.flow_err_collectors.read().await.iter() {
587            let all_errors = f_err.get_all().await;
588            if !all_errors.is_empty() {
589                let all_errors = all_errors
590                    .into_iter()
591                    .map(|i| format!("{:?}", i))
592                    .join("\n");
593                common_telemetry::error!("Flow {} has following errors: {}", f_id, all_errors);
594            }
595        }
596    }
597
598    /// Trigger dataflow running, and then send writeback request to the source sender
599    ///
600    /// note that this method didn't handle input mirror request, as this should be handled by grpc server
601    pub async fn run(&self, mut shutdown: Option<broadcast::Receiver<()>>) {
602        debug!("Starting to run");
603        let default_interval = Duration::from_secs(1);
604        let mut tick_interval = tokio::time::interval(default_interval);
605        // burst mode, so that if we miss a tick, we will run immediately to fully utilize the cpu
606        tick_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Burst);
607        let mut avg_spd = 0; // rows/sec
608        let mut since_last_run = tokio::time::Instant::now();
609        let run_per_trace = 10;
610        let mut run_cnt = 0;
611        loop {
612            // TODO(discord9): only run when new inputs arrive or scheduled to
613            let row_cnt = self.run_available(false).await.unwrap_or_else(|err| {
614                common_telemetry::error!(err;"Run available errors");
615                0
616            });
617
618            if let Err(err) = self.send_writeback_requests().await {
619                common_telemetry::error!(err;"Send writeback request errors");
620            };
621            self.log_all_errors().await;
622
623            // determine if need to shutdown
624            match &shutdown.as_mut().map(|s| s.try_recv()) {
625                Some(Ok(())) => {
626                    info!("Shutdown flow's main loop");
627                    break;
628                }
629                Some(Err(TryRecvError::Empty)) => (),
630                Some(Err(TryRecvError::Closed)) => {
631                    common_telemetry::error!("Shutdown channel is closed");
632                    break;
633                }
634                Some(Err(TryRecvError::Lagged(num))) => {
635                    common_telemetry::error!("Shutdown channel is lagged by {}, meaning multiple shutdown cmd have been issued", num);
636                    break;
637                }
638                None => (),
639            }
640
641            // for now we want to batch rows until there is around `BATCH_SIZE` rows in send buf
642            // before trigger a run of flow's worker
643            let wait_for = since_last_run.elapsed();
644
645            // last runs insert speed
646            let cur_spd = row_cnt * 1000 / wait_for.as_millis().max(1) as usize;
647            // rapid increase, slow decay
648            avg_spd = if cur_spd > avg_spd {
649                cur_spd
650            } else {
651                (9 * avg_spd + cur_spd) / 10
652            };
653            let new_wait = BATCH_SIZE * 1000 / avg_spd.max(1); //in ms
654            let new_wait = Duration::from_millis(new_wait as u64).min(default_interval);
655
656            // print trace every `run_per_trace` times so that we can see if there is something wrong
657            // but also not get flooded with trace
658            if run_cnt >= run_per_trace {
659                trace!("avg_spd={} r/s, cur_spd={} r/s", avg_spd, cur_spd);
660                trace!("Wait for {} ms, row_cnt={}", new_wait.as_millis(), row_cnt);
661                run_cnt = 0;
662            } else {
663                run_cnt += 1;
664            }
665
666            METRIC_FLOW_RUN_INTERVAL_MS.set(new_wait.as_millis() as i64);
667            since_last_run = tokio::time::Instant::now();
668            tokio::select! {
669                _ = tick_interval.tick() => (),
670                _ = tokio::time::sleep(new_wait) => ()
671            }
672        }
673        // flow is now shutdown, drop frontend_invoker early so a ref cycle(in standalone mode) can be prevent:
674        // FlowWorkerManager.frontend_invoker -> FrontendInvoker.inserter
675        // -> Inserter.node_manager -> NodeManager.flownode -> Flownode.flow_streaming_engine.frontend_invoker
676        self.frontend_invoker.write().await.take();
677    }
678
679    /// Run all available subgraph in the flow node
680    /// This will try to run all dataflow in this node
681    ///
682    /// set `blocking` to true to wait until worker finish running
683    /// false to just trigger run and return immediately
684    /// return numbers of rows send to worker(Inaccuary)
685    /// TODO(discord9): add flag for subgraph that have input since last run
686    pub async fn run_available(&self, blocking: bool) -> Result<usize, Error> {
687        let mut row_cnt = 0;
688
689        let now = self.tick_manager.tick();
690        for worker in self.worker_handles.iter() {
691            // TODO(discord9): consider how to handle error in individual worker
692            worker.run_available(now, blocking).await?;
693        }
694        // check row send and rows remain in send buf
695        let flush_res = if blocking {
696            let ctx = self.node_context.read().await;
697            ctx.flush_all_sender().await
698        } else {
699            match self.node_context.try_read() {
700                Ok(ctx) => ctx.flush_all_sender().await,
701                Err(_) => return Ok(row_cnt),
702            }
703        };
704        match flush_res {
705            Ok(r) => {
706                common_telemetry::trace!("Total flushed {} rows", r);
707                row_cnt += r;
708            }
709            Err(err) => {
710                common_telemetry::error!("Flush send buf errors: {:?}", err);
711            }
712        };
713
714        Ok(row_cnt)
715    }
716
717    /// send write request to related source sender
718    pub async fn handle_write_request(
719        &self,
720        region_id: RegionId,
721        rows: Vec<DiffRow>,
722        batch_datatypes: &[ConcreteDataType],
723    ) -> Result<(), Error> {
724        let rows_len = rows.len();
725        let table_id = region_id.table_id();
726        let _timer = METRIC_FLOW_INSERT_ELAPSED
727            .with_label_values(&[table_id.to_string().as_str()])
728            .start_timer();
729        self.node_context
730            .read()
731            .await
732            .send(table_id, rows, batch_datatypes)
733            .await?;
734        trace!(
735            "Handling write request for table_id={} with {} rows",
736            table_id,
737            rows_len
738        );
739        Ok(())
740    }
741}
742
743/// Create&Remove flow
744impl StreamingEngine {
745    /// remove a flow by it's id
746    pub async fn remove_flow_inner(&self, flow_id: FlowId) -> Result<(), Error> {
747        for handle in self.worker_handles.iter() {
748            if handle.contains_flow(flow_id).await? {
749                handle.remove_flow(flow_id).await?;
750                break;
751            }
752        }
753        self.node_context.write().await.remove_flow(flow_id);
754        Ok(())
755    }
756
757    /// Return task id if a new task is created, otherwise return None
758    ///
759    /// steps to create task:
760    /// 1. parse query into typed plan(and optional parse expire_after expr)
761    /// 2. render source/sink with output table id and used input table id
762    pub async fn create_flow_inner(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
763        let CreateFlowArgs {
764            flow_id,
765            sink_table_name,
766            source_table_ids,
767            create_if_not_exists,
768            or_replace,
769            expire_after,
770            comment,
771            sql,
772            flow_options,
773            query_ctx,
774        } = args;
775
776        let mut node_ctx = self.node_context.write().await;
777        // assign global id to source and sink table
778        for source in &source_table_ids {
779            node_ctx
780                .assign_global_id_to_table(&self.table_info_source, None, Some(*source))
781                .await?;
782        }
783        node_ctx
784            .assign_global_id_to_table(&self.table_info_source, Some(sink_table_name.clone()), None)
785            .await?;
786
787        node_ctx.register_task_src_sink(flow_id, &source_table_ids, sink_table_name.clone());
788
789        node_ctx.query_context = query_ctx.map(Arc::new);
790        // construct a active dataflow state with it
791        let flow_plan = sql_to_flow_plan(&mut node_ctx, &self.query_engine, &sql).await?;
792
793        debug!("Flow {:?}'s Plan is {:?}", flow_id, flow_plan);
794
795        // check schema against actual table schema if exists
796        // if not exist create sink table immediately
797        if let Some((_, _, real_schema)) = self.fetch_table_pk_schema(&sink_table_name).await? {
798            let auto_schema = relation_desc_to_column_schemas_with_fallback(&flow_plan.schema);
799
800            // for column schema, only `data_type` need to be check for equality
801            // since one can omit flow's column name when write flow query
802            // print a user friendly error message about mismatch and how to correct them
803            for (idx, zipped) in auto_schema
804                .iter()
805                .zip_longest(real_schema.iter())
806                .enumerate()
807            {
808                match zipped {
809                    EitherOrBoth::Both(auto, real) => {
810                        if auto.data_type != real.data_type {
811                            InvalidQuerySnafu {
812                                    reason: format!(
813                                        "Column {}(name is '{}', flow inferred name is '{}')'s data type mismatch, expect {:?} got {:?}",
814                                        idx,
815                                        real.name,
816                                        auto.name,
817                                        real.data_type,
818                                        auto.data_type
819                                    ),
820                                }
821                                .fail()?;
822                        }
823                    }
824                    EitherOrBoth::Right(real) if real.data_type.is_timestamp() => {
825                        // if table is auto created, the last one or two column should be timestamp(update at and ts placeholder)
826                        continue;
827                    }
828                    _ => InvalidQuerySnafu {
829                        reason: format!(
830                            "schema length mismatched, expected {} found {}",
831                            real_schema.len(),
832                            auto_schema.len()
833                        ),
834                    }
835                    .fail()?,
836                }
837            }
838        } else {
839            // assign inferred schema to sink table
840            // create sink table
841            let did_create = self
842                .create_table_from_relation(
843                    &format!("flow-id={flow_id}"),
844                    &sink_table_name,
845                    &flow_plan.schema,
846                )
847                .await?;
848            if !did_create {
849                UnexpectedSnafu {
850                    reason: format!("Failed to create table {:?}", sink_table_name),
851                }
852                .fail()?;
853            }
854        }
855
856        node_ctx.add_flow_plan(flow_id, flow_plan.clone());
857
858        let _ = comment;
859        let _ = flow_options;
860
861        // TODO(discord9): add more than one handles
862        let sink_id = node_ctx.table_repr.get_by_name(&sink_table_name).unwrap().1;
863        let sink_sender = node_ctx.get_sink_by_global_id(&sink_id)?;
864
865        let source_ids = source_table_ids
866            .iter()
867            .map(|id| node_ctx.table_repr.get_by_table_id(id).unwrap().1)
868            .collect_vec();
869        let source_receivers = source_ids
870            .iter()
871            .map(|id| {
872                node_ctx
873                    .get_source_by_global_id(id)
874                    .map(|s| s.get_receiver())
875            })
876            .collect::<Result<Vec<_>, _>>()?;
877        let err_collector = ErrCollector::default();
878        self.flow_err_collectors
879            .write()
880            .await
881            .insert(flow_id, err_collector.clone());
882        // TODO(discord9): load balance?
883        let handle = self.get_worker_handle_for_create_flow().await;
884        let create_request = worker::Request::Create {
885            flow_id,
886            plan: flow_plan,
887            sink_id,
888            sink_sender,
889            source_ids,
890            src_recvs: source_receivers,
891            expire_after,
892            or_replace,
893            create_if_not_exists,
894            err_collector,
895        };
896
897        handle.create_flow(create_request).await?;
898        info!("Successfully create flow with id={}", flow_id);
899        Ok(Some(flow_id))
900    }
901
902    pub async fn flush_flow_inner(&self, flow_id: FlowId) -> Result<usize, Error> {
903        debug!("Starting to flush flow_id={:?}", flow_id);
904        // lock to make sure writes before flush are written to flow
905        // and immediately drop to prevent following writes to be blocked
906        drop(self.flush_lock.write().await);
907        let flushed_input_rows = self.node_context.read().await.flush_all_sender().await?;
908        let rows_send = self.run_available(true).await?;
909        let row = self.send_writeback_requests().await?;
910        debug!(
911            "Done to flush flow_id={:?} with {} input rows flushed, {} rows sent and {} output rows flushed",
912            flow_id, flushed_input_rows, rows_send, row
913        );
914        Ok(row)
915    }
916
917    pub async fn flow_exist_inner(&self, flow_id: FlowId) -> Result<bool, Error> {
918        let mut exist = false;
919        for handle in self.worker_handles.iter() {
920            if handle.contains_flow(flow_id).await? {
921                exist = true;
922                break;
923            }
924        }
925        Ok(exist)
926    }
927}
928
929/// FlowTickManager is a manager for flow tick, which trakc flow execution progress
930///
931/// TODO(discord9): better way to do it, and not expose flow tick even to other flow to avoid
932/// TSO coord mess
933#[derive(Clone, Debug)]
934pub struct FlowTickManager {
935    /// The starting instant of the flow, used with `start_timestamp` to calculate the current timestamp
936    start: Instant,
937    /// The timestamp when the flow started
938    start_timestamp: repr::Timestamp,
939}
940
941impl Default for FlowTickManager {
942    fn default() -> Self {
943        Self::new()
944    }
945}
946
947impl FlowTickManager {
948    pub fn new() -> Self {
949        FlowTickManager {
950            start: Instant::now(),
951            start_timestamp: SystemTime::now()
952                .duration_since(SystemTime::UNIX_EPOCH)
953                .unwrap()
954                .as_millis() as repr::Timestamp,
955        }
956    }
957
958    /// Return the current timestamp in milliseconds
959    ///
960    /// TODO(discord9): reconsider since `tick()` require a monotonic clock and also need to survive recover later
961    pub fn tick(&self) -> repr::Timestamp {
962        let current = Instant::now();
963        let since_the_epoch = current - self.start;
964        since_the_epoch.as_millis() as repr::Timestamp + self.start_timestamp
965    }
966}