flow/
adapter.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! for getting data from source and sending results to sink
16//! and communicating with other parts of the database
17#![warn(unused_imports)]
18
19use std::collections::BTreeMap;
20use std::sync::Arc;
21use std::time::{Duration, Instant, SystemTime};
22
23use api::v1::{RowDeleteRequest, RowDeleteRequests, RowInsertRequest, RowInsertRequests};
24use common_config::Configurable;
25use common_error::ext::BoxedError;
26use common_meta::key::TableMetadataManagerRef;
27use common_options::memory::MemoryOptions;
28use common_runtime::JoinHandle;
29use common_stat::get_total_cpu_cores;
30use common_telemetry::logging::{LoggingOptions, TracingOptions};
31use common_telemetry::{debug, info, trace};
32use datatypes::schema::ColumnSchema;
33use datatypes::value::Value;
34use greptime_proto::v1;
35use itertools::{EitherOrBoth, Itertools};
36use meta_client::MetaClientOptions;
37use query::QueryEngine;
38use query::options::QueryOptions;
39use serde::{Deserialize, Serialize};
40use servers::grpc::GrpcOptions;
41use servers::heartbeat_options::HeartbeatOptions;
42use servers::http::HttpOptions;
43use session::context::QueryContext;
44use snafu::{OptionExt, ResultExt, ensure};
45use store_api::storage::{ConcreteDataType, RegionId};
46use table::metadata::TableId;
47use tokio::sync::broadcast::error::TryRecvError;
48use tokio::sync::{Mutex, RwLock, broadcast, watch};
49
50pub(crate) use crate::adapter::node_context::FlownodeContext;
51use crate::adapter::refill::RefillTask;
52use crate::adapter::table_source::ManagedTableSource;
53use crate::adapter::util::relation_desc_to_column_schemas_with_fallback;
54pub(crate) use crate::adapter::worker::{Worker, WorkerHandle, create_worker};
55use crate::batching_mode::BatchingModeOptions;
56use crate::compute::ErrCollector;
57use crate::df_optimizer::sql_to_flow_plan;
58use crate::error::{EvalSnafu, ExternalSnafu, InternalSnafu, InvalidQuerySnafu, UnexpectedSnafu};
59use crate::expr::Batch;
60use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_ROWS, METRIC_FLOW_RUN_INTERVAL_MS};
61use crate::repr::{self, BATCH_SIZE, DiffRow, RelationDesc, Row};
62use crate::{CreateFlowArgs, FlowId, TableName};
63
64pub(crate) mod flownode_impl;
65mod parse_expr;
66pub(crate) mod refill;
67mod stat;
68#[cfg(test)]
69mod tests;
70pub(crate) mod util;
71mod worker;
72
73pub(crate) mod node_context;
74pub(crate) mod table_source;
75
76use crate::FrontendInvoker;
77use crate::error::Error;
78use crate::utils::StateReportHandler;
79
80// `GREPTIME_TIMESTAMP` is not used to distinguish when table is created automatically by flow
81pub const AUTO_CREATED_PLACEHOLDER_TS_COL: &str = "__ts_placeholder";
82
83pub const AUTO_CREATED_UPDATE_AT_TS_COL: &str = "update_at";
84
85/// Flow config that exists both in standalone&distributed mode
86#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
87#[serde(default)]
88pub struct FlowConfig {
89    pub num_workers: usize,
90    pub batching_mode: BatchingModeOptions,
91}
92
93impl Default for FlowConfig {
94    fn default() -> Self {
95        Self {
96            num_workers: (get_total_cpu_cores() / 2).max(1),
97            batching_mode: BatchingModeOptions::default(),
98        }
99    }
100}
101
102/// Options for flow node
103#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
104#[serde(default)]
105pub struct FlownodeOptions {
106    pub node_id: Option<u64>,
107    pub flow: FlowConfig,
108    pub grpc: GrpcOptions,
109    pub http: HttpOptions,
110    pub meta_client: Option<MetaClientOptions>,
111    pub logging: LoggingOptions,
112    pub tracing: TracingOptions,
113    pub heartbeat: HeartbeatOptions,
114    pub query: QueryOptions,
115    pub user_provider: Option<String>,
116    pub memory: MemoryOptions,
117}
118
119impl Default for FlownodeOptions {
120    fn default() -> Self {
121        Self {
122            node_id: None,
123            flow: FlowConfig::default(),
124            grpc: GrpcOptions::default().with_bind_addr("127.0.0.1:3004"),
125            http: HttpOptions::default(),
126            meta_client: None,
127            logging: LoggingOptions::default(),
128            tracing: TracingOptions::default(),
129            heartbeat: HeartbeatOptions::default(),
130            // flownode's query option is set to 1 to throttle flow's query so
131            // that it won't use too much cpu or memory
132            query: QueryOptions {
133                parallelism: 1,
134                allow_query_fallback: false,
135            },
136            user_provider: None,
137            memory: MemoryOptions::default(),
138        }
139    }
140}
141
142impl Configurable for FlownodeOptions {
143    fn validate_sanitize(&mut self) -> common_config::error::Result<()> {
144        if self.flow.num_workers == 0 {
145            self.flow.num_workers = (get_total_cpu_cores() / 2).max(1);
146        }
147        Ok(())
148    }
149}
150
151/// Arc-ed FlowNodeManager, cheaper to clone
152pub type FlowStreamingEngineRef = Arc<StreamingEngine>;
153
154/// FlowNodeManager manages the state of all tasks in the flow node, which should be run on the same thread
155///
156/// The choice of timestamp is just using current system timestamp for now
157///
158pub struct StreamingEngine {
159    /// The handler to the worker that will run the dataflow
160    /// which is `!Send` so a handle is used
161    pub worker_handles: Vec<WorkerHandle>,
162    /// The selector to select a worker to run the dataflow
163    worker_selector: Mutex<usize>,
164    /// The query engine that will be used to parse the query and convert it to a dataflow plan
165    pub query_engine: Arc<dyn QueryEngine>,
166    /// Getting table name and table schema from table info manager
167    table_info_source: ManagedTableSource,
168    frontend_invoker: RwLock<Option<FrontendInvoker>>,
169    /// contains mapping from table name to global id, and table schema
170    node_context: RwLock<FlownodeContext>,
171    /// Contains all refill tasks
172    refill_tasks: RwLock<BTreeMap<FlowId, RefillTask>>,
173    flow_err_collectors: RwLock<BTreeMap<FlowId, ErrCollector>>,
174    src_send_buf_lens: RwLock<BTreeMap<TableId, watch::Receiver<usize>>>,
175    tick_manager: FlowTickManager,
176    /// This node id is only available in distributed mode, on standalone mode this is guaranteed to be `None`
177    pub node_id: Option<u32>,
178    /// Lock for flushing, will be `read` by `handle_inserts` and `write` by `flush_flow`
179    ///
180    /// So that a series of event like `inserts -> flush` can be handled correctly
181    flush_lock: RwLock<()>,
182    /// receive a oneshot sender to send state size report
183    state_report_handler: RwLock<Option<StateReportHandler>>,
184}
185
186/// Building FlownodeManager
187impl StreamingEngine {
188    /// set frontend invoker
189    pub async fn set_frontend_invoker(&self, frontend: FrontendInvoker) {
190        *self.frontend_invoker.write().await = Some(frontend);
191    }
192
193    /// Create **without** setting `frontend_invoker`
194    pub fn new(
195        node_id: Option<u32>,
196        query_engine: Arc<dyn QueryEngine>,
197        table_meta: TableMetadataManagerRef,
198    ) -> Self {
199        let srv_map = ManagedTableSource::new(
200            table_meta.table_info_manager().clone(),
201            table_meta.table_name_manager().clone(),
202        );
203        let node_context = FlownodeContext::new(Box::new(srv_map.clone()) as _);
204        let tick_manager = FlowTickManager::new();
205        let worker_handles = Vec::new();
206        StreamingEngine {
207            worker_handles,
208            worker_selector: Mutex::new(0),
209            query_engine,
210            table_info_source: srv_map,
211            frontend_invoker: RwLock::new(None),
212            node_context: RwLock::new(node_context),
213            refill_tasks: Default::default(),
214            flow_err_collectors: Default::default(),
215            src_send_buf_lens: Default::default(),
216            tick_manager,
217            node_id,
218            flush_lock: RwLock::new(()),
219            state_report_handler: RwLock::new(None),
220        }
221    }
222
223    pub async fn with_state_report_handler(self, handler: StateReportHandler) -> Self {
224        *self.state_report_handler.write().await = Some(handler);
225        self
226    }
227
228    /// Create a flownode manager with one worker
229    pub fn new_with_workers<'s>(
230        node_id: Option<u32>,
231        query_engine: Arc<dyn QueryEngine>,
232        table_meta: TableMetadataManagerRef,
233        num_workers: usize,
234    ) -> (Self, Vec<Worker<'s>>) {
235        let mut zelf = Self::new(node_id, query_engine, table_meta);
236
237        let workers: Vec<_> = (0..num_workers)
238            .map(|_| {
239                let (handle, worker) = create_worker();
240                zelf.add_worker_handle(handle);
241                worker
242            })
243            .collect();
244        (zelf, workers)
245    }
246
247    /// add a worker handler to manager, meaning this corresponding worker is under it's manage
248    pub fn add_worker_handle(&mut self, handle: WorkerHandle) {
249        self.worker_handles.push(handle);
250    }
251}
252
253#[derive(Debug)]
254pub enum DiffRequest {
255    Insert(Vec<(Row, repr::Timestamp)>),
256    Delete(Vec<(Row, repr::Timestamp)>),
257}
258
259impl DiffRequest {
260    pub fn len(&self) -> usize {
261        match self {
262            Self::Insert(v) => v.len(),
263            Self::Delete(v) => v.len(),
264        }
265    }
266
267    pub fn is_empty(&self) -> bool {
268        self.len() == 0
269    }
270}
271
272pub fn batches_to_rows_req(batches: Vec<Batch>) -> Result<Vec<DiffRequest>, Error> {
273    let mut reqs = Vec::new();
274    for batch in batches {
275        let mut rows = Vec::with_capacity(batch.row_count());
276        for i in 0..batch.row_count() {
277            let row = batch.get_row(i).context(EvalSnafu)?;
278            rows.push((Row::new(row), 0));
279        }
280        reqs.push(DiffRequest::Insert(rows));
281    }
282    Ok(reqs)
283}
284
285/// This impl block contains methods to send writeback requests to frontend
286impl StreamingEngine {
287    /// Return the number of requests it made
288    pub async fn send_writeback_requests(&self) -> Result<usize, Error> {
289        let all_reqs = self.generate_writeback_request().await?;
290        if all_reqs.is_empty() || all_reqs.iter().all(|v| v.1.is_empty()) {
291            return Ok(0);
292        }
293        let mut req_cnt = 0;
294        for (table_name, reqs) in all_reqs {
295            if reqs.is_empty() {
296                continue;
297            }
298            let (catalog, schema) = (table_name[0].clone(), table_name[1].clone());
299            let ctx = Arc::new(QueryContext::with(&catalog, &schema));
300
301            let (is_ts_placeholder, proto_schema) = match self
302                .try_fetch_existing_table(&table_name)
303                .await?
304                .context(UnexpectedSnafu {
305                    reason: format!("Table not found: {}", table_name.join(".")),
306                }) {
307                Ok(r) => r,
308                Err(e) => {
309                    if self
310                        .table_info_source
311                        .get_opt_table_id_from_name(&table_name)
312                        .await?
313                        .is_none()
314                    {
315                        // deal with both flow&sink table no longer exists
316                        // but some output is still in output buf
317                        common_telemetry::warn!(e; "Table `{}` no longer exists, skip writeback", table_name.join("."));
318                        continue;
319                    } else {
320                        return Err(e);
321                    }
322                }
323            };
324            let schema_len = proto_schema.len();
325
326            let total_rows = reqs.iter().map(|r| r.len()).sum::<usize>();
327            trace!(
328                "Sending {} writeback requests to table {}, reqs total rows={}",
329                reqs.len(),
330                table_name.join("."),
331                reqs.iter().map(|r| r.len()).sum::<usize>()
332            );
333
334            METRIC_FLOW_ROWS
335                .with_label_values(&["out-streaming"])
336                .inc_by(total_rows as u64);
337
338            let now = self.tick_manager.tick();
339            for req in reqs {
340                match req {
341                    DiffRequest::Insert(insert) => {
342                        let rows_proto: Vec<v1::Row> = insert
343                            .into_iter()
344                            .map(|(mut row, _ts)| {
345                                // extend `update_at` col if needed
346                                // if schema include a millisecond timestamp here, and result row doesn't have it, add it
347                                if row.len() < proto_schema.len()
348                                    && proto_schema[row.len()].datatype
349                                        == greptime_proto::v1::ColumnDataType::TimestampMillisecond
350                                            as i32
351                                {
352                                    row.extend([Value::from(
353                                        common_time::Timestamp::new_millisecond(now),
354                                    )]);
355                                }
356                                // ts col, if auto create
357                                if is_ts_placeholder {
358                                    ensure!(
359                                        row.len() == schema_len - 1,
360                                        InternalSnafu {
361                                            reason: format!(
362                                                "Row len mismatch, expect {} got {}",
363                                                schema_len - 1,
364                                                row.len()
365                                            )
366                                        }
367                                    );
368                                    row.extend([Value::from(
369                                        common_time::Timestamp::new_millisecond(0),
370                                    )]);
371                                }
372                                if row.len() != proto_schema.len() {
373                                    UnexpectedSnafu {
374                                        reason: format!(
375                                            "Flow output row length mismatch, expect {} got {}, the columns in schema are: {:?}",
376                                            proto_schema.len(),
377                                            row.len(),
378                                            proto_schema.iter().map(|c|&c.column_name).collect_vec()
379                                        ),
380                                    }
381                                    .fail()?;
382                                }
383                                Ok(row.into())
384                            })
385                            .collect::<Result<Vec<_>, Error>>()?;
386                        let table_name = table_name.last().unwrap().clone();
387                        let req = RowInsertRequest {
388                            table_name,
389                            rows: Some(v1::Rows {
390                                schema: proto_schema.clone(),
391                                rows: rows_proto,
392                            }),
393                        };
394                        req_cnt += 1;
395                        self.frontend_invoker
396                            .read()
397                            .await
398                            .as_ref()
399                            .with_context(|| UnexpectedSnafu {
400                                reason: "Expect a frontend invoker for flownode to write back",
401                            })?
402                            .row_inserts(RowInsertRequests { inserts: vec![req] }, ctx.clone())
403                            .await
404                            .map_err(BoxedError::new)
405                            .with_context(|_| ExternalSnafu {})?;
406                    }
407                    DiffRequest::Delete(remove) => {
408                        info!("original remove rows={:?}", remove);
409                        let rows_proto: Vec<v1::Row> = remove
410                            .into_iter()
411                            .map(|(mut row, _ts)| {
412                                row.extend(Some(Value::from(
413                                    common_time::Timestamp::new_millisecond(0),
414                                )));
415                                row.into()
416                            })
417                            .collect::<Vec<_>>();
418                        let table_name = table_name.last().unwrap().clone();
419                        let req = RowDeleteRequest {
420                            table_name,
421                            rows: Some(v1::Rows {
422                                schema: proto_schema.clone(),
423                                rows: rows_proto,
424                            }),
425                        };
426
427                        req_cnt += 1;
428                        self.frontend_invoker
429                            .read()
430                            .await
431                            .as_ref()
432                            .with_context(|| UnexpectedSnafu {
433                                reason: "Expect a frontend invoker for flownode to write back",
434                            })?
435                            .row_deletes(RowDeleteRequests { deletes: vec![req] }, ctx.clone())
436                            .await
437                            .map_err(BoxedError::new)
438                            .with_context(|_| ExternalSnafu {})?;
439                    }
440                }
441            }
442        }
443        Ok(req_cnt)
444    }
445
446    /// Generate writeback request for all sink table
447    pub async fn generate_writeback_request(
448        &self,
449    ) -> Result<BTreeMap<TableName, Vec<DiffRequest>>, Error> {
450        trace!("Start to generate writeback request");
451        let mut output = BTreeMap::new();
452        let mut total_row_count = 0;
453        for (name, sink_recv) in self
454            .node_context
455            .write()
456            .await
457            .sink_receiver
458            .iter_mut()
459            .map(|(n, (_s, r))| (n, r))
460        {
461            let mut batches = Vec::new();
462            while let Ok(batch) = sink_recv.try_recv() {
463                total_row_count += batch.row_count();
464                batches.push(batch);
465            }
466            let reqs = batches_to_rows_req(batches)?;
467            output.insert(name.clone(), reqs);
468        }
469        trace!("Prepare writeback req: total row count={}", total_row_count);
470        Ok(output)
471    }
472
473    /// Fetch table schema and primary key from table info source, if table not exist return None
474    async fn fetch_table_pk_schema(
475        &self,
476        table_name: &TableName,
477    ) -> Result<Option<(Vec<String>, Option<usize>, Vec<ColumnSchema>)>, Error> {
478        if let Some(table_id) = self
479            .table_info_source
480            .get_opt_table_id_from_name(table_name)
481            .await?
482        {
483            let table_info = self
484                .table_info_source
485                .get_table_info_value(&table_id)
486                .await?
487                .unwrap();
488            let meta = table_info.table_info.meta;
489            let primary_keys = meta
490                .primary_key_indices
491                .into_iter()
492                .map(|i| meta.schema.column_schemas[i].name.clone())
493                .collect_vec();
494            let schema = meta.schema.column_schemas;
495            let time_index = meta.schema.timestamp_index;
496            Ok(Some((primary_keys, time_index, schema)))
497        } else {
498            Ok(None)
499        }
500    }
501
502    /// return (primary keys, schema and if the table have a placeholder timestamp column)
503    /// schema of the table comes from flow's output plan
504    ///
505    /// adjust to add `update_at` column and ts placeholder if needed
506    async fn adjust_auto_created_table_schema(
507        &self,
508        schema: &RelationDesc,
509    ) -> Result<(Vec<String>, Vec<ColumnSchema>, bool), Error> {
510        // TODO(discord9): consider remove buggy auto create by schema
511
512        // TODO(discord9): use default key from schema
513        let primary_keys = schema
514            .typ()
515            .keys
516            .first()
517            .map(|v| {
518                v.column_indices
519                    .iter()
520                    .map(|i| {
521                        schema
522                            .get_name(*i)
523                            .clone()
524                            .unwrap_or_else(|| format!("col_{i}"))
525                    })
526                    .collect_vec()
527            })
528            .unwrap_or_default();
529        let update_at = ColumnSchema::new(
530            AUTO_CREATED_UPDATE_AT_TS_COL,
531            ConcreteDataType::timestamp_millisecond_datatype(),
532            true,
533        );
534
535        let original_schema = relation_desc_to_column_schemas_with_fallback(schema);
536
537        let mut with_auto_added_col = original_schema.clone();
538        with_auto_added_col.push(update_at);
539
540        // if no time index, add one as placeholder
541        let no_time_index = schema.typ().time_index.is_none();
542        if no_time_index {
543            let ts_col = ColumnSchema::new(
544                AUTO_CREATED_PLACEHOLDER_TS_COL,
545                ConcreteDataType::timestamp_millisecond_datatype(),
546                true,
547            )
548            .with_time_index(true);
549            with_auto_added_col.push(ts_col);
550        }
551
552        Ok((primary_keys, with_auto_added_col, no_time_index))
553    }
554}
555
556/// Flow Runtime related methods
557impl StreamingEngine {
558    /// Start state report handler, which will receive a sender from HeartbeatTask to send state size report back
559    ///
560    /// if heartbeat task is shutdown, this future will exit too
561    async fn start_state_report_handler(self: Arc<Self>) -> Option<JoinHandle<()>> {
562        let state_report_handler = self.state_report_handler.write().await.take();
563        if let Some(mut handler) = state_report_handler {
564            let zelf = self.clone();
565            let handler = common_runtime::spawn_global(async move {
566                while let Some(ret_handler) = handler.recv().await {
567                    let state_report = zelf.gen_state_report().await;
568                    ret_handler.send(state_report).unwrap_or_else(|err| {
569                        common_telemetry::error!(err; "Send state size report error");
570                    });
571                }
572            });
573            Some(handler)
574        } else {
575            None
576        }
577    }
578
579    /// run in common_runtime background runtime
580    pub fn run_background(
581        self: Arc<Self>,
582        shutdown: Option<broadcast::Receiver<()>>,
583    ) -> JoinHandle<()> {
584        info!("Starting flownode manager's background task");
585        common_runtime::spawn_global(async move {
586            let _state_report_handler = self.clone().start_state_report_handler().await;
587            self.run(shutdown).await;
588        })
589    }
590
591    /// log all flow errors
592    pub async fn log_all_errors(&self) {
593        for (f_id, f_err) in self.flow_err_collectors.read().await.iter() {
594            let all_errors = f_err.get_all().await;
595            if !all_errors.is_empty() {
596                let all_errors = all_errors
597                    .into_iter()
598                    .map(|i| format!("{:?}", i))
599                    .join("\n");
600                common_telemetry::error!("Flow {} has following errors: {}", f_id, all_errors);
601            }
602        }
603    }
604
605    /// Trigger dataflow running, and then send writeback request to the source sender
606    ///
607    /// note that this method didn't handle input mirror request, as this should be handled by grpc server
608    pub async fn run(&self, mut shutdown: Option<broadcast::Receiver<()>>) {
609        debug!("Starting to run");
610        let default_interval = Duration::from_secs(1);
611        let mut tick_interval = tokio::time::interval(default_interval);
612        // burst mode, so that if we miss a tick, we will run immediately to fully utilize the cpu
613        tick_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Burst);
614        let mut avg_spd = 0; // rows/sec
615        let mut since_last_run = tokio::time::Instant::now();
616        let run_per_trace = 10;
617        let mut run_cnt = 0;
618        loop {
619            // TODO(discord9): only run when new inputs arrive or scheduled to
620            let row_cnt = self.run_available(false).await.unwrap_or_else(|err| {
621                common_telemetry::error!(err;"Run available errors");
622                0
623            });
624
625            if let Err(err) = self.send_writeback_requests().await {
626                common_telemetry::error!(err;"Send writeback request errors");
627            };
628            self.log_all_errors().await;
629
630            // determine if need to shutdown
631            match &shutdown.as_mut().map(|s| s.try_recv()) {
632                Some(Ok(())) => {
633                    info!("Shutdown flow's main loop");
634                    break;
635                }
636                Some(Err(TryRecvError::Empty)) => (),
637                Some(Err(TryRecvError::Closed)) => {
638                    common_telemetry::error!("Shutdown channel is closed");
639                    break;
640                }
641                Some(Err(TryRecvError::Lagged(num))) => {
642                    common_telemetry::error!(
643                        "Shutdown channel is lagged by {}, meaning multiple shutdown cmd have been issued",
644                        num
645                    );
646                    break;
647                }
648                None => (),
649            }
650
651            // for now we want to batch rows until there is around `BATCH_SIZE` rows in send buf
652            // before trigger a run of flow's worker
653            let wait_for = since_last_run.elapsed();
654
655            // last runs insert speed
656            let cur_spd = row_cnt * 1000 / wait_for.as_millis().max(1) as usize;
657            // rapid increase, slow decay
658            avg_spd = if cur_spd > avg_spd {
659                cur_spd
660            } else {
661                (9 * avg_spd + cur_spd) / 10
662            };
663            let new_wait = BATCH_SIZE * 1000 / avg_spd.max(1); //in ms
664            let new_wait = Duration::from_millis(new_wait as u64).min(default_interval);
665
666            // print trace every `run_per_trace` times so that we can see if there is something wrong
667            // but also not get flooded with trace
668            if run_cnt >= run_per_trace {
669                trace!("avg_spd={} r/s, cur_spd={} r/s", avg_spd, cur_spd);
670                trace!("Wait for {} ms, row_cnt={}", new_wait.as_millis(), row_cnt);
671                run_cnt = 0;
672            } else {
673                run_cnt += 1;
674            }
675
676            METRIC_FLOW_RUN_INTERVAL_MS.set(new_wait.as_millis() as i64);
677            since_last_run = tokio::time::Instant::now();
678            tokio::select! {
679                _ = tick_interval.tick() => (),
680                _ = tokio::time::sleep(new_wait) => ()
681            }
682        }
683        // flow is now shutdown, drop frontend_invoker early so a ref cycle(in standalone mode) can be prevent:
684        // FlowWorkerManager.frontend_invoker -> FrontendInvoker.inserter
685        // -> Inserter.node_manager -> NodeManager.flownode -> Flownode.flow_streaming_engine.frontend_invoker
686        self.frontend_invoker.write().await.take();
687    }
688
689    /// Run all available subgraph in the flow node
690    /// This will try to run all dataflow in this node
691    ///
692    /// set `blocking` to true to wait until worker finish running
693    /// false to just trigger run and return immediately
694    /// return numbers of rows send to worker(Inaccuary)
695    /// TODO(discord9): add flag for subgraph that have input since last run
696    pub async fn run_available(&self, blocking: bool) -> Result<usize, Error> {
697        let mut row_cnt = 0;
698
699        let now = self.tick_manager.tick();
700        for worker in self.worker_handles.iter() {
701            // TODO(discord9): consider how to handle error in individual worker
702            worker.run_available(now, blocking).await?;
703        }
704        // check row send and rows remain in send buf
705        let flush_res = if blocking {
706            let ctx = self.node_context.read().await;
707            ctx.flush_all_sender().await
708        } else {
709            match self.node_context.try_read() {
710                Ok(ctx) => ctx.flush_all_sender().await,
711                Err(_) => return Ok(row_cnt),
712            }
713        };
714        match flush_res {
715            Ok(r) => {
716                common_telemetry::trace!("Total flushed {} rows", r);
717                row_cnt += r;
718            }
719            Err(err) => {
720                common_telemetry::error!("Flush send buf errors: {:?}", err);
721            }
722        };
723
724        Ok(row_cnt)
725    }
726
727    /// send write request to related source sender
728    pub async fn handle_write_request(
729        &self,
730        region_id: RegionId,
731        rows: Vec<DiffRow>,
732        batch_datatypes: &[ConcreteDataType],
733    ) -> Result<(), Error> {
734        let rows_len = rows.len();
735        let table_id = region_id.table_id();
736        let _timer = METRIC_FLOW_INSERT_ELAPSED
737            .with_label_values(&[table_id.to_string().as_str()])
738            .start_timer();
739        self.node_context
740            .read()
741            .await
742            .send(table_id, rows, batch_datatypes)
743            .await?;
744        trace!(
745            "Handling write request for table_id={} with {} rows",
746            table_id, rows_len
747        );
748        Ok(())
749    }
750}
751
752/// Create&Remove flow
753impl StreamingEngine {
754    /// remove a flow by it's id
755    pub async fn remove_flow_inner(&self, flow_id: FlowId) -> Result<(), Error> {
756        for handle in self.worker_handles.iter() {
757            if handle.contains_flow(flow_id).await? {
758                handle.remove_flow(flow_id).await?;
759                break;
760            }
761        }
762        self.node_context.write().await.remove_flow(flow_id);
763        Ok(())
764    }
765
766    /// Return task id if a new task is created, otherwise return None
767    ///
768    /// steps to create task:
769    /// 1. parse query into typed plan(and optional parse expire_after expr)
770    /// 2. render source/sink with output table id and used input table id
771    pub async fn create_flow_inner(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
772        let CreateFlowArgs {
773            flow_id,
774            sink_table_name,
775            source_table_ids,
776            create_if_not_exists,
777            or_replace,
778            expire_after,
779            eval_interval: _,
780            comment,
781            sql,
782            flow_options,
783            query_ctx,
784        } = args;
785
786        let mut node_ctx = self.node_context.write().await;
787        // assign global id to source and sink table
788        for source in &source_table_ids {
789            node_ctx
790                .assign_global_id_to_table(&self.table_info_source, None, Some(*source))
791                .await?;
792        }
793        node_ctx
794            .assign_global_id_to_table(&self.table_info_source, Some(sink_table_name.clone()), None)
795            .await?;
796
797        node_ctx.register_task_src_sink(flow_id, &source_table_ids, sink_table_name.clone());
798
799        node_ctx.query_context = query_ctx.map(Arc::new);
800        // construct a active dataflow state with it
801        let flow_plan = sql_to_flow_plan(&mut node_ctx, &self.query_engine, &sql).await?;
802
803        debug!("Flow {:?}'s Plan is {:?}", flow_id, flow_plan);
804
805        // check schema against actual table schema if exists
806        // if not exist create sink table immediately
807        if let Some((_, _, real_schema)) = self.fetch_table_pk_schema(&sink_table_name).await? {
808            let auto_schema = relation_desc_to_column_schemas_with_fallback(&flow_plan.schema);
809
810            // for column schema, only `data_type` need to be check for equality
811            // since one can omit flow's column name when write flow query
812            // print a user friendly error message about mismatch and how to correct them
813            for (idx, zipped) in auto_schema
814                .iter()
815                .zip_longest(real_schema.iter())
816                .enumerate()
817            {
818                match zipped {
819                    EitherOrBoth::Both(auto, real) => {
820                        if auto.data_type != real.data_type {
821                            InvalidQuerySnafu {
822                                    reason: format!(
823                                        "Column {}(name is '{}', flow inferred name is '{}')'s data type mismatch, expect {:?} got {:?}",
824                                        idx,
825                                        real.name,
826                                        auto.name,
827                                        real.data_type,
828                                        auto.data_type
829                                    ),
830                                }
831                                .fail()?;
832                        }
833                    }
834                    EitherOrBoth::Right(real) if real.data_type.is_timestamp() => {
835                        // if table is auto created, the last one or two column should be timestamp(update at and ts placeholder)
836                        continue;
837                    }
838                    _ => InvalidQuerySnafu {
839                        reason: format!(
840                            "schema length mismatched, expected {} found {}",
841                            real_schema.len(),
842                            auto_schema.len()
843                        ),
844                    }
845                    .fail()?,
846                }
847            }
848        } else {
849            // assign inferred schema to sink table
850            // create sink table
851            let did_create = self
852                .create_table_from_relation(
853                    &format!("flow-id={flow_id}"),
854                    &sink_table_name,
855                    &flow_plan.schema,
856                )
857                .await?;
858            if !did_create {
859                UnexpectedSnafu {
860                    reason: format!("Failed to create table {:?}", sink_table_name),
861                }
862                .fail()?;
863            }
864        }
865
866        node_ctx.add_flow_plan(flow_id, flow_plan.clone());
867
868        let _ = comment;
869        let _ = flow_options;
870
871        // TODO(discord9): add more than one handles
872        let sink_id = node_ctx.table_repr.get_by_name(&sink_table_name).unwrap().1;
873        let sink_sender = node_ctx.get_sink_by_global_id(&sink_id)?;
874
875        let source_ids = source_table_ids
876            .iter()
877            .map(|id| node_ctx.table_repr.get_by_table_id(id).unwrap().1)
878            .collect_vec();
879        let source_receivers = source_ids
880            .iter()
881            .map(|id| {
882                node_ctx
883                    .get_source_by_global_id(id)
884                    .map(|s| s.get_receiver())
885            })
886            .collect::<Result<Vec<_>, _>>()?;
887        let err_collector = ErrCollector::default();
888        self.flow_err_collectors
889            .write()
890            .await
891            .insert(flow_id, err_collector.clone());
892        // TODO(discord9): load balance?
893        let handle = self.get_worker_handle_for_create_flow().await;
894        let create_request = worker::Request::Create {
895            flow_id,
896            plan: flow_plan,
897            sink_id,
898            sink_sender,
899            source_ids,
900            src_recvs: source_receivers,
901            expire_after,
902            or_replace,
903            create_if_not_exists,
904            err_collector,
905        };
906
907        handle.create_flow(create_request).await?;
908        info!("Successfully create flow with id={}", flow_id);
909        Ok(Some(flow_id))
910    }
911
912    pub async fn flush_flow_inner(&self, flow_id: FlowId) -> Result<usize, Error> {
913        debug!("Starting to flush flow_id={:?}", flow_id);
914        // lock to make sure writes before flush are written to flow
915        // and immediately drop to prevent following writes to be blocked
916        drop(self.flush_lock.write().await);
917        let flushed_input_rows = self.node_context.read().await.flush_all_sender().await?;
918        let rows_send = self.run_available(true).await?;
919        let row = self.send_writeback_requests().await?;
920        debug!(
921            "Done to flush flow_id={:?} with {} input rows flushed, {} rows sent and {} output rows flushed",
922            flow_id, flushed_input_rows, rows_send, row
923        );
924        Ok(row)
925    }
926
927    pub async fn flow_exist_inner(&self, flow_id: FlowId) -> Result<bool, Error> {
928        let mut exist = false;
929        for handle in self.worker_handles.iter() {
930            if handle.contains_flow(flow_id).await? {
931                exist = true;
932                break;
933            }
934        }
935        Ok(exist)
936    }
937}
938
939/// FlowTickManager is a manager for flow tick, which trakc flow execution progress
940///
941/// TODO(discord9): better way to do it, and not expose flow tick even to other flow to avoid
942/// TSO coord mess
943#[derive(Clone, Debug)]
944pub struct FlowTickManager {
945    /// The starting instant of the flow, used with `start_timestamp` to calculate the current timestamp
946    start: Instant,
947    /// The timestamp when the flow started
948    start_timestamp: repr::Timestamp,
949}
950
951impl Default for FlowTickManager {
952    fn default() -> Self {
953        Self::new()
954    }
955}
956
957impl FlowTickManager {
958    pub fn new() -> Self {
959        FlowTickManager {
960            start: Instant::now(),
961            start_timestamp: SystemTime::now()
962                .duration_since(SystemTime::UNIX_EPOCH)
963                .unwrap()
964                .as_millis() as repr::Timestamp,
965        }
966    }
967
968    /// Return the current timestamp in milliseconds
969    ///
970    /// TODO(discord9): reconsider since `tick()` require a monotonic clock and also need to survive recover later
971    pub fn tick(&self) -> repr::Timestamp {
972        let current = Instant::now();
973        let since_the_epoch = current - self.start;
974        since_the_epoch.as_millis() as repr::Timestamp + self.start_timestamp
975    }
976}