flow/
adapter.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! for getting data from source and sending results to sink
16//! and communicating with other parts of the database
17#![warn(unused_imports)]
18
19use std::collections::BTreeMap;
20use std::sync::Arc;
21use std::time::{Duration, Instant, SystemTime};
22
23use api::v1::{RowDeleteRequest, RowDeleteRequests, RowInsertRequest, RowInsertRequests};
24use common_config::Configurable;
25use common_error::ext::BoxedError;
26use common_meta::key::TableMetadataManagerRef;
27use common_options::memory::MemoryOptions;
28use common_runtime::JoinHandle;
29use common_telemetry::logging::{LoggingOptions, TracingOptions};
30use common_telemetry::{debug, info, trace};
31use datatypes::schema::ColumnSchema;
32use datatypes::value::Value;
33use greptime_proto::v1;
34use itertools::{EitherOrBoth, Itertools};
35use meta_client::MetaClientOptions;
36use query::options::QueryOptions;
37use query::QueryEngine;
38use serde::{Deserialize, Serialize};
39use servers::grpc::GrpcOptions;
40use servers::heartbeat_options::HeartbeatOptions;
41use servers::http::HttpOptions;
42use session::context::QueryContext;
43use snafu::{ensure, OptionExt, ResultExt};
44use store_api::storage::{ConcreteDataType, RegionId};
45use table::metadata::TableId;
46use tokio::sync::broadcast::error::TryRecvError;
47use tokio::sync::{broadcast, watch, Mutex, RwLock};
48
49pub(crate) use crate::adapter::node_context::FlownodeContext;
50use crate::adapter::refill::RefillTask;
51use crate::adapter::table_source::ManagedTableSource;
52use crate::adapter::util::relation_desc_to_column_schemas_with_fallback;
53pub(crate) use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
54use crate::batching_mode::BatchingModeOptions;
55use crate::compute::ErrCollector;
56use crate::df_optimizer::sql_to_flow_plan;
57use crate::error::{EvalSnafu, ExternalSnafu, InternalSnafu, InvalidQuerySnafu, UnexpectedSnafu};
58use crate::expr::Batch;
59use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_ROWS, METRIC_FLOW_RUN_INTERVAL_MS};
60use crate::repr::{self, DiffRow, RelationDesc, Row, BATCH_SIZE};
61use crate::{CreateFlowArgs, FlowId, TableName};
62
63pub(crate) mod flownode_impl;
64mod parse_expr;
65pub(crate) mod refill;
66mod stat;
67#[cfg(test)]
68mod tests;
69pub(crate) mod util;
70mod worker;
71
72pub(crate) mod node_context;
73pub(crate) mod table_source;
74
75use crate::error::Error;
76use crate::utils::StateReportHandler;
77use crate::FrontendInvoker;
78
79// `GREPTIME_TIMESTAMP` is not used to distinguish when table is created automatically by flow
80pub const AUTO_CREATED_PLACEHOLDER_TS_COL: &str = "__ts_placeholder";
81
82pub const AUTO_CREATED_UPDATE_AT_TS_COL: &str = "update_at";
83
84/// Flow config that exists both in standalone&distributed mode
85#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
86#[serde(default)]
87pub struct FlowConfig {
88    pub num_workers: usize,
89    pub batching_mode: BatchingModeOptions,
90}
91
92impl Default for FlowConfig {
93    fn default() -> Self {
94        Self {
95            num_workers: (common_config::utils::get_cpus() / 2).max(1),
96            batching_mode: BatchingModeOptions::default(),
97        }
98    }
99}
100
101/// Options for flow node
102#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
103#[serde(default)]
104pub struct FlownodeOptions {
105    pub node_id: Option<u64>,
106    pub flow: FlowConfig,
107    pub grpc: GrpcOptions,
108    pub http: HttpOptions,
109    pub meta_client: Option<MetaClientOptions>,
110    pub logging: LoggingOptions,
111    pub tracing: TracingOptions,
112    pub heartbeat: HeartbeatOptions,
113    pub query: QueryOptions,
114    pub user_provider: Option<String>,
115    pub memory: MemoryOptions,
116}
117
118impl Default for FlownodeOptions {
119    fn default() -> Self {
120        Self {
121            node_id: None,
122            flow: FlowConfig::default(),
123            grpc: GrpcOptions::default().with_bind_addr("127.0.0.1:3004"),
124            http: HttpOptions::default(),
125            meta_client: None,
126            logging: LoggingOptions::default(),
127            tracing: TracingOptions::default(),
128            heartbeat: HeartbeatOptions::default(),
129            // flownode's query option is set to 1 to throttle flow's query so
130            // that it won't use too much cpu or memory
131            query: QueryOptions {
132                parallelism: 1,
133                allow_query_fallback: false,
134            },
135            user_provider: None,
136            memory: MemoryOptions::default(),
137        }
138    }
139}
140
141impl Configurable for FlownodeOptions {
142    fn validate_sanitize(&mut self) -> common_config::error::Result<()> {
143        if self.flow.num_workers == 0 {
144            self.flow.num_workers = (common_config::utils::get_cpus() / 2).max(1);
145        }
146        Ok(())
147    }
148}
149
150/// Arc-ed FlowNodeManager, cheaper to clone
151pub type FlowStreamingEngineRef = Arc<StreamingEngine>;
152
153/// FlowNodeManager manages the state of all tasks in the flow node, which should be run on the same thread
154///
155/// The choice of timestamp is just using current system timestamp for now
156///
157pub struct StreamingEngine {
158    /// The handler to the worker that will run the dataflow
159    /// which is `!Send` so a handle is used
160    pub worker_handles: Vec<WorkerHandle>,
161    /// The selector to select a worker to run the dataflow
162    worker_selector: Mutex<usize>,
163    /// The query engine that will be used to parse the query and convert it to a dataflow plan
164    pub query_engine: Arc<dyn QueryEngine>,
165    /// Getting table name and table schema from table info manager
166    table_info_source: ManagedTableSource,
167    frontend_invoker: RwLock<Option<FrontendInvoker>>,
168    /// contains mapping from table name to global id, and table schema
169    node_context: RwLock<FlownodeContext>,
170    /// Contains all refill tasks
171    refill_tasks: RwLock<BTreeMap<FlowId, RefillTask>>,
172    flow_err_collectors: RwLock<BTreeMap<FlowId, ErrCollector>>,
173    src_send_buf_lens: RwLock<BTreeMap<TableId, watch::Receiver<usize>>>,
174    tick_manager: FlowTickManager,
175    /// This node id is only available in distributed mode, on standalone mode this is guaranteed to be `None`
176    pub node_id: Option<u32>,
177    /// Lock for flushing, will be `read` by `handle_inserts` and `write` by `flush_flow`
178    ///
179    /// So that a series of event like `inserts -> flush` can be handled correctly
180    flush_lock: RwLock<()>,
181    /// receive a oneshot sender to send state size report
182    state_report_handler: RwLock<Option<StateReportHandler>>,
183}
184
185/// Building FlownodeManager
186impl StreamingEngine {
187    /// set frontend invoker
188    pub async fn set_frontend_invoker(&self, frontend: FrontendInvoker) {
189        *self.frontend_invoker.write().await = Some(frontend);
190    }
191
192    /// Create **without** setting `frontend_invoker`
193    pub fn new(
194        node_id: Option<u32>,
195        query_engine: Arc<dyn QueryEngine>,
196        table_meta: TableMetadataManagerRef,
197    ) -> Self {
198        let srv_map = ManagedTableSource::new(
199            table_meta.table_info_manager().clone(),
200            table_meta.table_name_manager().clone(),
201        );
202        let node_context = FlownodeContext::new(Box::new(srv_map.clone()) as _);
203        let tick_manager = FlowTickManager::new();
204        let worker_handles = Vec::new();
205        StreamingEngine {
206            worker_handles,
207            worker_selector: Mutex::new(0),
208            query_engine,
209            table_info_source: srv_map,
210            frontend_invoker: RwLock::new(None),
211            node_context: RwLock::new(node_context),
212            refill_tasks: Default::default(),
213            flow_err_collectors: Default::default(),
214            src_send_buf_lens: Default::default(),
215            tick_manager,
216            node_id,
217            flush_lock: RwLock::new(()),
218            state_report_handler: RwLock::new(None),
219        }
220    }
221
222    pub async fn with_state_report_handler(self, handler: StateReportHandler) -> Self {
223        *self.state_report_handler.write().await = Some(handler);
224        self
225    }
226
227    /// Create a flownode manager with one worker
228    pub fn new_with_workers<'s>(
229        node_id: Option<u32>,
230        query_engine: Arc<dyn QueryEngine>,
231        table_meta: TableMetadataManagerRef,
232        num_workers: usize,
233    ) -> (Self, Vec<Worker<'s>>) {
234        let mut zelf = Self::new(node_id, query_engine, table_meta);
235
236        let workers: Vec<_> = (0..num_workers)
237            .map(|_| {
238                let (handle, worker) = create_worker();
239                zelf.add_worker_handle(handle);
240                worker
241            })
242            .collect();
243        (zelf, workers)
244    }
245
246    /// add a worker handler to manager, meaning this corresponding worker is under it's manage
247    pub fn add_worker_handle(&mut self, handle: WorkerHandle) {
248        self.worker_handles.push(handle);
249    }
250}
251
252#[derive(Debug)]
253pub enum DiffRequest {
254    Insert(Vec<(Row, repr::Timestamp)>),
255    Delete(Vec<(Row, repr::Timestamp)>),
256}
257
258impl DiffRequest {
259    pub fn len(&self) -> usize {
260        match self {
261            Self::Insert(v) => v.len(),
262            Self::Delete(v) => v.len(),
263        }
264    }
265
266    pub fn is_empty(&self) -> bool {
267        self.len() == 0
268    }
269}
270
271pub fn batches_to_rows_req(batches: Vec<Batch>) -> Result<Vec<DiffRequest>, Error> {
272    let mut reqs = Vec::new();
273    for batch in batches {
274        let mut rows = Vec::with_capacity(batch.row_count());
275        for i in 0..batch.row_count() {
276            let row = batch.get_row(i).context(EvalSnafu)?;
277            rows.push((Row::new(row), 0));
278        }
279        reqs.push(DiffRequest::Insert(rows));
280    }
281    Ok(reqs)
282}
283
284/// This impl block contains methods to send writeback requests to frontend
285impl StreamingEngine {
286    /// Return the number of requests it made
287    pub async fn send_writeback_requests(&self) -> Result<usize, Error> {
288        let all_reqs = self.generate_writeback_request().await?;
289        if all_reqs.is_empty() || all_reqs.iter().all(|v| v.1.is_empty()) {
290            return Ok(0);
291        }
292        let mut req_cnt = 0;
293        for (table_name, reqs) in all_reqs {
294            if reqs.is_empty() {
295                continue;
296            }
297            let (catalog, schema) = (table_name[0].clone(), table_name[1].clone());
298            let ctx = Arc::new(QueryContext::with(&catalog, &schema));
299
300            let (is_ts_placeholder, proto_schema) = match self
301                .try_fetch_existing_table(&table_name)
302                .await?
303                .context(UnexpectedSnafu {
304                    reason: format!("Table not found: {}", table_name.join(".")),
305                }) {
306                Ok(r) => r,
307                Err(e) => {
308                    if self
309                        .table_info_source
310                        .get_opt_table_id_from_name(&table_name)
311                        .await?
312                        .is_none()
313                    {
314                        // deal with both flow&sink table no longer exists
315                        // but some output is still in output buf
316                        common_telemetry::warn!(e; "Table `{}` no longer exists, skip writeback", table_name.join("."));
317                        continue;
318                    } else {
319                        return Err(e);
320                    }
321                }
322            };
323            let schema_len = proto_schema.len();
324
325            let total_rows = reqs.iter().map(|r| r.len()).sum::<usize>();
326            trace!(
327                "Sending {} writeback requests to table {}, reqs total rows={}",
328                reqs.len(),
329                table_name.join("."),
330                reqs.iter().map(|r| r.len()).sum::<usize>()
331            );
332
333            METRIC_FLOW_ROWS
334                .with_label_values(&["out-streaming"])
335                .inc_by(total_rows as u64);
336
337            let now = self.tick_manager.tick();
338            for req in reqs {
339                match req {
340                    DiffRequest::Insert(insert) => {
341                        let rows_proto: Vec<v1::Row> = insert
342                            .into_iter()
343                            .map(|(mut row, _ts)| {
344                                // extend `update_at` col if needed
345                                // if schema include a millisecond timestamp here, and result row doesn't have it, add it
346                                if row.len() < proto_schema.len()
347                                    && proto_schema[row.len()].datatype
348                                        == greptime_proto::v1::ColumnDataType::TimestampMillisecond
349                                            as i32
350                                {
351                                    row.extend([Value::from(
352                                        common_time::Timestamp::new_millisecond(now),
353                                    )]);
354                                }
355                                // ts col, if auto create
356                                if is_ts_placeholder {
357                                    ensure!(
358                                        row.len() == schema_len - 1,
359                                        InternalSnafu {
360                                            reason: format!(
361                                                "Row len mismatch, expect {} got {}",
362                                                schema_len - 1,
363                                                row.len()
364                                            )
365                                        }
366                                    );
367                                    row.extend([Value::from(
368                                        common_time::Timestamp::new_millisecond(0),
369                                    )]);
370                                }
371                                if row.len() != proto_schema.len() {
372                                    UnexpectedSnafu {
373                                        reason: format!(
374                                            "Flow output row length mismatch, expect {} got {}, the columns in schema are: {:?}",
375                                            proto_schema.len(),
376                                            row.len(),
377                                            proto_schema.iter().map(|c|&c.column_name).collect_vec()
378                                        ),
379                                    }
380                                    .fail()?;
381                                }
382                                Ok(row.into())
383                            })
384                            .collect::<Result<Vec<_>, Error>>()?;
385                        let table_name = table_name.last().unwrap().clone();
386                        let req = RowInsertRequest {
387                            table_name,
388                            rows: Some(v1::Rows {
389                                schema: proto_schema.clone(),
390                                rows: rows_proto,
391                            }),
392                        };
393                        req_cnt += 1;
394                        self.frontend_invoker
395                            .read()
396                            .await
397                            .as_ref()
398                            .with_context(|| UnexpectedSnafu {
399                                reason: "Expect a frontend invoker for flownode to write back",
400                            })?
401                            .row_inserts(RowInsertRequests { inserts: vec![req] }, ctx.clone())
402                            .await
403                            .map_err(BoxedError::new)
404                            .with_context(|_| ExternalSnafu {})?;
405                    }
406                    DiffRequest::Delete(remove) => {
407                        info!("original remove rows={:?}", remove);
408                        let rows_proto: Vec<v1::Row> = remove
409                            .into_iter()
410                            .map(|(mut row, _ts)| {
411                                row.extend(Some(Value::from(
412                                    common_time::Timestamp::new_millisecond(0),
413                                )));
414                                row.into()
415                            })
416                            .collect::<Vec<_>>();
417                        let table_name = table_name.last().unwrap().clone();
418                        let req = RowDeleteRequest {
419                            table_name,
420                            rows: Some(v1::Rows {
421                                schema: proto_schema.clone(),
422                                rows: rows_proto,
423                            }),
424                        };
425
426                        req_cnt += 1;
427                        self.frontend_invoker
428                            .read()
429                            .await
430                            .as_ref()
431                            .with_context(|| UnexpectedSnafu {
432                                reason: "Expect a frontend invoker for flownode to write back",
433                            })?
434                            .row_deletes(RowDeleteRequests { deletes: vec![req] }, ctx.clone())
435                            .await
436                            .map_err(BoxedError::new)
437                            .with_context(|_| ExternalSnafu {})?;
438                    }
439                }
440            }
441        }
442        Ok(req_cnt)
443    }
444
445    /// Generate writeback request for all sink table
446    pub async fn generate_writeback_request(
447        &self,
448    ) -> Result<BTreeMap<TableName, Vec<DiffRequest>>, Error> {
449        trace!("Start to generate writeback request");
450        let mut output = BTreeMap::new();
451        let mut total_row_count = 0;
452        for (name, sink_recv) in self
453            .node_context
454            .write()
455            .await
456            .sink_receiver
457            .iter_mut()
458            .map(|(n, (_s, r))| (n, r))
459        {
460            let mut batches = Vec::new();
461            while let Ok(batch) = sink_recv.try_recv() {
462                total_row_count += batch.row_count();
463                batches.push(batch);
464            }
465            let reqs = batches_to_rows_req(batches)?;
466            output.insert(name.clone(), reqs);
467        }
468        trace!("Prepare writeback req: total row count={}", total_row_count);
469        Ok(output)
470    }
471
472    /// Fetch table schema and primary key from table info source, if table not exist return None
473    async fn fetch_table_pk_schema(
474        &self,
475        table_name: &TableName,
476    ) -> Result<Option<(Vec<String>, Option<usize>, Vec<ColumnSchema>)>, Error> {
477        if let Some(table_id) = self
478            .table_info_source
479            .get_opt_table_id_from_name(table_name)
480            .await?
481        {
482            let table_info = self
483                .table_info_source
484                .get_table_info_value(&table_id)
485                .await?
486                .unwrap();
487            let meta = table_info.table_info.meta;
488            let primary_keys = meta
489                .primary_key_indices
490                .into_iter()
491                .map(|i| meta.schema.column_schemas[i].name.clone())
492                .collect_vec();
493            let schema = meta.schema.column_schemas;
494            let time_index = meta.schema.timestamp_index;
495            Ok(Some((primary_keys, time_index, schema)))
496        } else {
497            Ok(None)
498        }
499    }
500
501    /// return (primary keys, schema and if the table have a placeholder timestamp column)
502    /// schema of the table comes from flow's output plan
503    ///
504    /// adjust to add `update_at` column and ts placeholder if needed
505    async fn adjust_auto_created_table_schema(
506        &self,
507        schema: &RelationDesc,
508    ) -> Result<(Vec<String>, Vec<ColumnSchema>, bool), Error> {
509        // TODO(discord9): consider remove buggy auto create by schema
510
511        // TODO(discord9): use default key from schema
512        let primary_keys = schema
513            .typ()
514            .keys
515            .first()
516            .map(|v| {
517                v.column_indices
518                    .iter()
519                    .map(|i| {
520                        schema
521                            .get_name(*i)
522                            .clone()
523                            .unwrap_or_else(|| format!("col_{i}"))
524                    })
525                    .collect_vec()
526            })
527            .unwrap_or_default();
528        let update_at = ColumnSchema::new(
529            AUTO_CREATED_UPDATE_AT_TS_COL,
530            ConcreteDataType::timestamp_millisecond_datatype(),
531            true,
532        );
533
534        let original_schema = relation_desc_to_column_schemas_with_fallback(schema);
535
536        let mut with_auto_added_col = original_schema.clone();
537        with_auto_added_col.push(update_at);
538
539        // if no time index, add one as placeholder
540        let no_time_index = schema.typ().time_index.is_none();
541        if no_time_index {
542            let ts_col = ColumnSchema::new(
543                AUTO_CREATED_PLACEHOLDER_TS_COL,
544                ConcreteDataType::timestamp_millisecond_datatype(),
545                true,
546            )
547            .with_time_index(true);
548            with_auto_added_col.push(ts_col);
549        }
550
551        Ok((primary_keys, with_auto_added_col, no_time_index))
552    }
553}
554
555/// Flow Runtime related methods
556impl StreamingEngine {
557    /// Start state report handler, which will receive a sender from HeartbeatTask to send state size report back
558    ///
559    /// if heartbeat task is shutdown, this future will exit too
560    async fn start_state_report_handler(self: Arc<Self>) -> Option<JoinHandle<()>> {
561        let state_report_handler = self.state_report_handler.write().await.take();
562        if let Some(mut handler) = state_report_handler {
563            let zelf = self.clone();
564            let handler = common_runtime::spawn_global(async move {
565                while let Some(ret_handler) = handler.recv().await {
566                    let state_report = zelf.gen_state_report().await;
567                    ret_handler.send(state_report).unwrap_or_else(|err| {
568                        common_telemetry::error!(err; "Send state size report error");
569                    });
570                }
571            });
572            Some(handler)
573        } else {
574            None
575        }
576    }
577
578    /// run in common_runtime background runtime
579    pub fn run_background(
580        self: Arc<Self>,
581        shutdown: Option<broadcast::Receiver<()>>,
582    ) -> JoinHandle<()> {
583        info!("Starting flownode manager's background task");
584        common_runtime::spawn_global(async move {
585            let _state_report_handler = self.clone().start_state_report_handler().await;
586            self.run(shutdown).await;
587        })
588    }
589
590    /// log all flow errors
591    pub async fn log_all_errors(&self) {
592        for (f_id, f_err) in self.flow_err_collectors.read().await.iter() {
593            let all_errors = f_err.get_all().await;
594            if !all_errors.is_empty() {
595                let all_errors = all_errors
596                    .into_iter()
597                    .map(|i| format!("{:?}", i))
598                    .join("\n");
599                common_telemetry::error!("Flow {} has following errors: {}", f_id, all_errors);
600            }
601        }
602    }
603
604    /// Trigger dataflow running, and then send writeback request to the source sender
605    ///
606    /// note that this method didn't handle input mirror request, as this should be handled by grpc server
607    pub async fn run(&self, mut shutdown: Option<broadcast::Receiver<()>>) {
608        debug!("Starting to run");
609        let default_interval = Duration::from_secs(1);
610        let mut tick_interval = tokio::time::interval(default_interval);
611        // burst mode, so that if we miss a tick, we will run immediately to fully utilize the cpu
612        tick_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Burst);
613        let mut avg_spd = 0; // rows/sec
614        let mut since_last_run = tokio::time::Instant::now();
615        let run_per_trace = 10;
616        let mut run_cnt = 0;
617        loop {
618            // TODO(discord9): only run when new inputs arrive or scheduled to
619            let row_cnt = self.run_available(false).await.unwrap_or_else(|err| {
620                common_telemetry::error!(err;"Run available errors");
621                0
622            });
623
624            if let Err(err) = self.send_writeback_requests().await {
625                common_telemetry::error!(err;"Send writeback request errors");
626            };
627            self.log_all_errors().await;
628
629            // determine if need to shutdown
630            match &shutdown.as_mut().map(|s| s.try_recv()) {
631                Some(Ok(())) => {
632                    info!("Shutdown flow's main loop");
633                    break;
634                }
635                Some(Err(TryRecvError::Empty)) => (),
636                Some(Err(TryRecvError::Closed)) => {
637                    common_telemetry::error!("Shutdown channel is closed");
638                    break;
639                }
640                Some(Err(TryRecvError::Lagged(num))) => {
641                    common_telemetry::error!("Shutdown channel is lagged by {}, meaning multiple shutdown cmd have been issued", num);
642                    break;
643                }
644                None => (),
645            }
646
647            // for now we want to batch rows until there is around `BATCH_SIZE` rows in send buf
648            // before trigger a run of flow's worker
649            let wait_for = since_last_run.elapsed();
650
651            // last runs insert speed
652            let cur_spd = row_cnt * 1000 / wait_for.as_millis().max(1) as usize;
653            // rapid increase, slow decay
654            avg_spd = if cur_spd > avg_spd {
655                cur_spd
656            } else {
657                (9 * avg_spd + cur_spd) / 10
658            };
659            let new_wait = BATCH_SIZE * 1000 / avg_spd.max(1); //in ms
660            let new_wait = Duration::from_millis(new_wait as u64).min(default_interval);
661
662            // print trace every `run_per_trace` times so that we can see if there is something wrong
663            // but also not get flooded with trace
664            if run_cnt >= run_per_trace {
665                trace!("avg_spd={} r/s, cur_spd={} r/s", avg_spd, cur_spd);
666                trace!("Wait for {} ms, row_cnt={}", new_wait.as_millis(), row_cnt);
667                run_cnt = 0;
668            } else {
669                run_cnt += 1;
670            }
671
672            METRIC_FLOW_RUN_INTERVAL_MS.set(new_wait.as_millis() as i64);
673            since_last_run = tokio::time::Instant::now();
674            tokio::select! {
675                _ = tick_interval.tick() => (),
676                _ = tokio::time::sleep(new_wait) => ()
677            }
678        }
679        // flow is now shutdown, drop frontend_invoker early so a ref cycle(in standalone mode) can be prevent:
680        // FlowWorkerManager.frontend_invoker -> FrontendInvoker.inserter
681        // -> Inserter.node_manager -> NodeManager.flownode -> Flownode.flow_streaming_engine.frontend_invoker
682        self.frontend_invoker.write().await.take();
683    }
684
685    /// Run all available subgraph in the flow node
686    /// This will try to run all dataflow in this node
687    ///
688    /// set `blocking` to true to wait until worker finish running
689    /// false to just trigger run and return immediately
690    /// return numbers of rows send to worker(Inaccuary)
691    /// TODO(discord9): add flag for subgraph that have input since last run
692    pub async fn run_available(&self, blocking: bool) -> Result<usize, Error> {
693        let mut row_cnt = 0;
694
695        let now = self.tick_manager.tick();
696        for worker in self.worker_handles.iter() {
697            // TODO(discord9): consider how to handle error in individual worker
698            worker.run_available(now, blocking).await?;
699        }
700        // check row send and rows remain in send buf
701        let flush_res = if blocking {
702            let ctx = self.node_context.read().await;
703            ctx.flush_all_sender().await
704        } else {
705            match self.node_context.try_read() {
706                Ok(ctx) => ctx.flush_all_sender().await,
707                Err(_) => return Ok(row_cnt),
708            }
709        };
710        match flush_res {
711            Ok(r) => {
712                common_telemetry::trace!("Total flushed {} rows", r);
713                row_cnt += r;
714            }
715            Err(err) => {
716                common_telemetry::error!("Flush send buf errors: {:?}", err);
717            }
718        };
719
720        Ok(row_cnt)
721    }
722
723    /// send write request to related source sender
724    pub async fn handle_write_request(
725        &self,
726        region_id: RegionId,
727        rows: Vec<DiffRow>,
728        batch_datatypes: &[ConcreteDataType],
729    ) -> Result<(), Error> {
730        let rows_len = rows.len();
731        let table_id = region_id.table_id();
732        let _timer = METRIC_FLOW_INSERT_ELAPSED
733            .with_label_values(&[table_id.to_string().as_str()])
734            .start_timer();
735        self.node_context
736            .read()
737            .await
738            .send(table_id, rows, batch_datatypes)
739            .await?;
740        trace!(
741            "Handling write request for table_id={} with {} rows",
742            table_id,
743            rows_len
744        );
745        Ok(())
746    }
747}
748
749/// Create&Remove flow
750impl StreamingEngine {
751    /// remove a flow by it's id
752    pub async fn remove_flow_inner(&self, flow_id: FlowId) -> Result<(), Error> {
753        for handle in self.worker_handles.iter() {
754            if handle.contains_flow(flow_id).await? {
755                handle.remove_flow(flow_id).await?;
756                break;
757            }
758        }
759        self.node_context.write().await.remove_flow(flow_id);
760        Ok(())
761    }
762
763    /// Return task id if a new task is created, otherwise return None
764    ///
765    /// steps to create task:
766    /// 1. parse query into typed plan(and optional parse expire_after expr)
767    /// 2. render source/sink with output table id and used input table id
768    pub async fn create_flow_inner(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
769        let CreateFlowArgs {
770            flow_id,
771            sink_table_name,
772            source_table_ids,
773            create_if_not_exists,
774            or_replace,
775            expire_after,
776            comment,
777            sql,
778            flow_options,
779            query_ctx,
780        } = args;
781
782        let mut node_ctx = self.node_context.write().await;
783        // assign global id to source and sink table
784        for source in &source_table_ids {
785            node_ctx
786                .assign_global_id_to_table(&self.table_info_source, None, Some(*source))
787                .await?;
788        }
789        node_ctx
790            .assign_global_id_to_table(&self.table_info_source, Some(sink_table_name.clone()), None)
791            .await?;
792
793        node_ctx.register_task_src_sink(flow_id, &source_table_ids, sink_table_name.clone());
794
795        node_ctx.query_context = query_ctx.map(Arc::new);
796        // construct a active dataflow state with it
797        let flow_plan = sql_to_flow_plan(&mut node_ctx, &self.query_engine, &sql).await?;
798
799        debug!("Flow {:?}'s Plan is {:?}", flow_id, flow_plan);
800
801        // check schema against actual table schema if exists
802        // if not exist create sink table immediately
803        if let Some((_, _, real_schema)) = self.fetch_table_pk_schema(&sink_table_name).await? {
804            let auto_schema = relation_desc_to_column_schemas_with_fallback(&flow_plan.schema);
805
806            // for column schema, only `data_type` need to be check for equality
807            // since one can omit flow's column name when write flow query
808            // print a user friendly error message about mismatch and how to correct them
809            for (idx, zipped) in auto_schema
810                .iter()
811                .zip_longest(real_schema.iter())
812                .enumerate()
813            {
814                match zipped {
815                    EitherOrBoth::Both(auto, real) => {
816                        if auto.data_type != real.data_type {
817                            InvalidQuerySnafu {
818                                    reason: format!(
819                                        "Column {}(name is '{}', flow inferred name is '{}')'s data type mismatch, expect {:?} got {:?}",
820                                        idx,
821                                        real.name,
822                                        auto.name,
823                                        real.data_type,
824                                        auto.data_type
825                                    ),
826                                }
827                                .fail()?;
828                        }
829                    }
830                    EitherOrBoth::Right(real) if real.data_type.is_timestamp() => {
831                        // if table is auto created, the last one or two column should be timestamp(update at and ts placeholder)
832                        continue;
833                    }
834                    _ => InvalidQuerySnafu {
835                        reason: format!(
836                            "schema length mismatched, expected {} found {}",
837                            real_schema.len(),
838                            auto_schema.len()
839                        ),
840                    }
841                    .fail()?,
842                }
843            }
844        } else {
845            // assign inferred schema to sink table
846            // create sink table
847            let did_create = self
848                .create_table_from_relation(
849                    &format!("flow-id={flow_id}"),
850                    &sink_table_name,
851                    &flow_plan.schema,
852                )
853                .await?;
854            if !did_create {
855                UnexpectedSnafu {
856                    reason: format!("Failed to create table {:?}", sink_table_name),
857                }
858                .fail()?;
859            }
860        }
861
862        node_ctx.add_flow_plan(flow_id, flow_plan.clone());
863
864        let _ = comment;
865        let _ = flow_options;
866
867        // TODO(discord9): add more than one handles
868        let sink_id = node_ctx.table_repr.get_by_name(&sink_table_name).unwrap().1;
869        let sink_sender = node_ctx.get_sink_by_global_id(&sink_id)?;
870
871        let source_ids = source_table_ids
872            .iter()
873            .map(|id| node_ctx.table_repr.get_by_table_id(id).unwrap().1)
874            .collect_vec();
875        let source_receivers = source_ids
876            .iter()
877            .map(|id| {
878                node_ctx
879                    .get_source_by_global_id(id)
880                    .map(|s| s.get_receiver())
881            })
882            .collect::<Result<Vec<_>, _>>()?;
883        let err_collector = ErrCollector::default();
884        self.flow_err_collectors
885            .write()
886            .await
887            .insert(flow_id, err_collector.clone());
888        // TODO(discord9): load balance?
889        let handle = self.get_worker_handle_for_create_flow().await;
890        let create_request = worker::Request::Create {
891            flow_id,
892            plan: flow_plan,
893            sink_id,
894            sink_sender,
895            source_ids,
896            src_recvs: source_receivers,
897            expire_after,
898            or_replace,
899            create_if_not_exists,
900            err_collector,
901        };
902
903        handle.create_flow(create_request).await?;
904        info!("Successfully create flow with id={}", flow_id);
905        Ok(Some(flow_id))
906    }
907
908    pub async fn flush_flow_inner(&self, flow_id: FlowId) -> Result<usize, Error> {
909        debug!("Starting to flush flow_id={:?}", flow_id);
910        // lock to make sure writes before flush are written to flow
911        // and immediately drop to prevent following writes to be blocked
912        drop(self.flush_lock.write().await);
913        let flushed_input_rows = self.node_context.read().await.flush_all_sender().await?;
914        let rows_send = self.run_available(true).await?;
915        let row = self.send_writeback_requests().await?;
916        debug!(
917            "Done to flush flow_id={:?} with {} input rows flushed, {} rows sent and {} output rows flushed",
918            flow_id, flushed_input_rows, rows_send, row
919        );
920        Ok(row)
921    }
922
923    pub async fn flow_exist_inner(&self, flow_id: FlowId) -> Result<bool, Error> {
924        let mut exist = false;
925        for handle in self.worker_handles.iter() {
926            if handle.contains_flow(flow_id).await? {
927                exist = true;
928                break;
929            }
930        }
931        Ok(exist)
932    }
933}
934
935/// FlowTickManager is a manager for flow tick, which trakc flow execution progress
936///
937/// TODO(discord9): better way to do it, and not expose flow tick even to other flow to avoid
938/// TSO coord mess
939#[derive(Clone, Debug)]
940pub struct FlowTickManager {
941    /// The starting instant of the flow, used with `start_timestamp` to calculate the current timestamp
942    start: Instant,
943    /// The timestamp when the flow started
944    start_timestamp: repr::Timestamp,
945}
946
947impl Default for FlowTickManager {
948    fn default() -> Self {
949        Self::new()
950    }
951}
952
953impl FlowTickManager {
954    pub fn new() -> Self {
955        FlowTickManager {
956            start: Instant::now(),
957            start_timestamp: SystemTime::now()
958                .duration_since(SystemTime::UNIX_EPOCH)
959                .unwrap()
960                .as_millis() as repr::Timestamp,
961        }
962    }
963
964    /// Return the current timestamp in milliseconds
965    ///
966    /// TODO(discord9): reconsider since `tick()` require a monotonic clock and also need to survive recover later
967    pub fn tick(&self) -> repr::Timestamp {
968        let current = Instant::now();
969        let since_the_epoch = current - self.start;
970        since_the_epoch.as_millis() as repr::Timestamp + self.start_timestamp
971    }
972}