flow/
adapter.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! for getting data from source and sending results to sink
16//! and communicating with other parts of the database
17#![warn(unused_imports)]
18
19use std::collections::BTreeMap;
20use std::sync::Arc;
21use std::time::{Duration, Instant, SystemTime};
22
23use api::v1::{RowDeleteRequest, RowDeleteRequests, RowInsertRequest, RowInsertRequests};
24use common_base::memory_limit::MemoryLimit;
25use common_config::Configurable;
26use common_error::ext::BoxedError;
27use common_meta::key::TableMetadataManagerRef;
28use common_options::memory::MemoryOptions;
29use common_runtime::JoinHandle;
30use common_stat::get_total_cpu_cores;
31use common_telemetry::logging::{LoggingOptions, TracingOptions};
32use common_telemetry::{debug, info, trace};
33use datatypes::schema::ColumnSchema;
34use datatypes::value::Value;
35use greptime_proto::v1;
36use itertools::{EitherOrBoth, Itertools};
37use meta_client::MetaClientOptions;
38use query::QueryEngine;
39use query::options::QueryOptions;
40use serde::{Deserialize, Serialize};
41use servers::grpc::GrpcOptions;
42use servers::heartbeat_options::HeartbeatOptions;
43use servers::http::HttpOptions;
44use session::context::QueryContext;
45use snafu::{OptionExt, ResultExt, ensure};
46use store_api::storage::{ConcreteDataType, RegionId};
47use table::metadata::TableId;
48use tokio::sync::broadcast::error::TryRecvError;
49use tokio::sync::{Mutex, RwLock, broadcast, watch};
50
51pub(crate) use crate::adapter::node_context::FlownodeContext;
52use crate::adapter::refill::RefillTask;
53use crate::adapter::table_source::ManagedTableSource;
54use crate::adapter::util::relation_desc_to_column_schemas_with_fallback;
55pub(crate) use crate::adapter::worker::{Worker, WorkerHandle, create_worker};
56use crate::batching_mode::BatchingModeOptions;
57use crate::compute::ErrCollector;
58use crate::df_optimizer::sql_to_flow_plan;
59use crate::error::{EvalSnafu, ExternalSnafu, InternalSnafu, InvalidQuerySnafu, UnexpectedSnafu};
60use crate::expr::Batch;
61use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_ROWS, METRIC_FLOW_RUN_INTERVAL_MS};
62use crate::repr::{self, BATCH_SIZE, DiffRow, RelationDesc, Row};
63use crate::{CreateFlowArgs, FlowId, TableName};
64
65pub(crate) mod flownode_impl;
66mod parse_expr;
67pub(crate) mod refill;
68mod stat;
69#[cfg(test)]
70mod tests;
71pub(crate) mod util;
72mod worker;
73
74pub(crate) mod node_context;
75pub(crate) mod table_source;
76
77use crate::FrontendInvoker;
78use crate::error::Error;
79use crate::utils::StateReportHandler;
80
81// `GREPTIME_TIMESTAMP` is not used to distinguish when table is created automatically by flow
82pub const AUTO_CREATED_PLACEHOLDER_TS_COL: &str = "__ts_placeholder";
83
84pub const AUTO_CREATED_UPDATE_AT_TS_COL: &str = "update_at";
85
86/// Flow config that exists both in standalone&distributed mode
87#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
88#[serde(default)]
89pub struct FlowConfig {
90    pub num_workers: usize,
91    pub batching_mode: BatchingModeOptions,
92}
93
94impl Default for FlowConfig {
95    fn default() -> Self {
96        Self {
97            num_workers: (get_total_cpu_cores() / 2).max(1),
98            batching_mode: BatchingModeOptions::default(),
99        }
100    }
101}
102
103/// Options for flow node
104#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
105#[serde(default)]
106pub struct FlownodeOptions {
107    pub node_id: Option<u64>,
108    pub flow: FlowConfig,
109    pub grpc: GrpcOptions,
110    pub http: HttpOptions,
111    pub meta_client: Option<MetaClientOptions>,
112    pub logging: LoggingOptions,
113    pub tracing: TracingOptions,
114    pub heartbeat: HeartbeatOptions,
115    pub query: QueryOptions,
116    pub user_provider: Option<String>,
117    pub memory: MemoryOptions,
118}
119
120impl Default for FlownodeOptions {
121    fn default() -> Self {
122        Self {
123            node_id: None,
124            flow: FlowConfig::default(),
125            grpc: GrpcOptions::default().with_bind_addr("127.0.0.1:3004"),
126            http: HttpOptions::default(),
127            meta_client: None,
128            logging: LoggingOptions::default(),
129            tracing: TracingOptions::default(),
130            heartbeat: HeartbeatOptions::default(),
131            // flownode's query option is set to 1 to throttle flow's query so
132            // that it won't use too much cpu or memory
133            query: QueryOptions {
134                parallelism: 1,
135                allow_query_fallback: false,
136                memory_pool_size: MemoryLimit::default(),
137            },
138            user_provider: None,
139            memory: MemoryOptions::default(),
140        }
141    }
142}
143
144impl Configurable for FlownodeOptions {
145    fn validate_sanitize(&mut self) -> common_config::error::Result<()> {
146        if self.flow.num_workers == 0 {
147            self.flow.num_workers = (get_total_cpu_cores() / 2).max(1);
148        }
149        Ok(())
150    }
151}
152
153/// Arc-ed FlowNodeManager, cheaper to clone
154pub type FlowStreamingEngineRef = Arc<StreamingEngine>;
155
156/// FlowNodeManager manages the state of all tasks in the flow node, which should be run on the same thread
157///
158/// The choice of timestamp is just using current system timestamp for now
159///
160pub struct StreamingEngine {
161    /// The handler to the worker that will run the dataflow
162    /// which is `!Send` so a handle is used
163    pub worker_handles: Vec<WorkerHandle>,
164    /// The selector to select a worker to run the dataflow
165    worker_selector: Mutex<usize>,
166    /// The query engine that will be used to parse the query and convert it to a dataflow plan
167    pub query_engine: Arc<dyn QueryEngine>,
168    /// Getting table name and table schema from table info manager
169    table_info_source: ManagedTableSource,
170    frontend_invoker: RwLock<Option<FrontendInvoker>>,
171    /// contains mapping from table name to global id, and table schema
172    node_context: RwLock<FlownodeContext>,
173    /// Contains all refill tasks
174    refill_tasks: RwLock<BTreeMap<FlowId, RefillTask>>,
175    flow_err_collectors: RwLock<BTreeMap<FlowId, ErrCollector>>,
176    src_send_buf_lens: RwLock<BTreeMap<TableId, watch::Receiver<usize>>>,
177    tick_manager: FlowTickManager,
178    /// This node id is only available in distributed mode, on standalone mode this is guaranteed to be `None`
179    pub node_id: Option<u32>,
180    /// Lock for flushing, will be `read` by `handle_inserts` and `write` by `flush_flow`
181    ///
182    /// So that a series of event like `inserts -> flush` can be handled correctly
183    flush_lock: RwLock<()>,
184    /// receive a oneshot sender to send state size report
185    state_report_handler: RwLock<Option<StateReportHandler>>,
186}
187
188/// Building FlownodeManager
189impl StreamingEngine {
190    /// set frontend invoker
191    pub async fn set_frontend_invoker(&self, frontend: FrontendInvoker) {
192        *self.frontend_invoker.write().await = Some(frontend);
193    }
194
195    /// Create **without** setting `frontend_invoker`
196    pub fn new(
197        node_id: Option<u32>,
198        query_engine: Arc<dyn QueryEngine>,
199        table_meta: TableMetadataManagerRef,
200    ) -> Self {
201        let srv_map = ManagedTableSource::new(
202            table_meta.table_info_manager().clone(),
203            table_meta.table_name_manager().clone(),
204        );
205        let node_context = FlownodeContext::new(Box::new(srv_map.clone()) as _);
206        let tick_manager = FlowTickManager::new();
207        let worker_handles = Vec::new();
208        StreamingEngine {
209            worker_handles,
210            worker_selector: Mutex::new(0),
211            query_engine,
212            table_info_source: srv_map,
213            frontend_invoker: RwLock::new(None),
214            node_context: RwLock::new(node_context),
215            refill_tasks: Default::default(),
216            flow_err_collectors: Default::default(),
217            src_send_buf_lens: Default::default(),
218            tick_manager,
219            node_id,
220            flush_lock: RwLock::new(()),
221            state_report_handler: RwLock::new(None),
222        }
223    }
224
225    pub async fn with_state_report_handler(self, handler: StateReportHandler) -> Self {
226        *self.state_report_handler.write().await = Some(handler);
227        self
228    }
229
230    /// Create a flownode manager with one worker
231    pub fn new_with_workers<'s>(
232        node_id: Option<u32>,
233        query_engine: Arc<dyn QueryEngine>,
234        table_meta: TableMetadataManagerRef,
235        num_workers: usize,
236    ) -> (Self, Vec<Worker<'s>>) {
237        let mut zelf = Self::new(node_id, query_engine, table_meta);
238
239        let workers: Vec<_> = (0..num_workers)
240            .map(|_| {
241                let (handle, worker) = create_worker();
242                zelf.add_worker_handle(handle);
243                worker
244            })
245            .collect();
246        (zelf, workers)
247    }
248
249    /// add a worker handler to manager, meaning this corresponding worker is under it's manage
250    pub fn add_worker_handle(&mut self, handle: WorkerHandle) {
251        self.worker_handles.push(handle);
252    }
253}
254
255#[derive(Debug)]
256pub enum DiffRequest {
257    Insert(Vec<(Row, repr::Timestamp)>),
258    Delete(Vec<(Row, repr::Timestamp)>),
259}
260
261impl DiffRequest {
262    pub fn len(&self) -> usize {
263        match self {
264            Self::Insert(v) => v.len(),
265            Self::Delete(v) => v.len(),
266        }
267    }
268
269    pub fn is_empty(&self) -> bool {
270        self.len() == 0
271    }
272}
273
274pub fn batches_to_rows_req(batches: Vec<Batch>) -> Result<Vec<DiffRequest>, Error> {
275    let mut reqs = Vec::new();
276    for batch in batches {
277        let mut rows = Vec::with_capacity(batch.row_count());
278        for i in 0..batch.row_count() {
279            let row = batch.get_row(i).context(EvalSnafu)?;
280            rows.push((Row::new(row), 0));
281        }
282        reqs.push(DiffRequest::Insert(rows));
283    }
284    Ok(reqs)
285}
286
287/// This impl block contains methods to send writeback requests to frontend
288impl StreamingEngine {
289    /// Return the number of requests it made
290    pub async fn send_writeback_requests(&self) -> Result<usize, Error> {
291        let all_reqs = self.generate_writeback_request().await?;
292        if all_reqs.is_empty() || all_reqs.iter().all(|v| v.1.is_empty()) {
293            return Ok(0);
294        }
295        let mut req_cnt = 0;
296        for (table_name, reqs) in all_reqs {
297            if reqs.is_empty() {
298                continue;
299            }
300            let (catalog, schema) = (table_name[0].clone(), table_name[1].clone());
301            let ctx = Arc::new(QueryContext::with(&catalog, &schema));
302
303            let (is_ts_placeholder, proto_schema) = match self
304                .try_fetch_existing_table(&table_name)
305                .await?
306                .context(UnexpectedSnafu {
307                    reason: format!("Table not found: {}", table_name.join(".")),
308                }) {
309                Ok(r) => r,
310                Err(e) => {
311                    if self
312                        .table_info_source
313                        .get_opt_table_id_from_name(&table_name)
314                        .await?
315                        .is_none()
316                    {
317                        // deal with both flow&sink table no longer exists
318                        // but some output is still in output buf
319                        common_telemetry::warn!(e; "Table `{}` no longer exists, skip writeback", table_name.join("."));
320                        continue;
321                    } else {
322                        return Err(e);
323                    }
324                }
325            };
326            let schema_len = proto_schema.len();
327
328            let total_rows = reqs.iter().map(|r| r.len()).sum::<usize>();
329            trace!(
330                "Sending {} writeback requests to table {}, reqs total rows={}",
331                reqs.len(),
332                table_name.join("."),
333                reqs.iter().map(|r| r.len()).sum::<usize>()
334            );
335
336            METRIC_FLOW_ROWS
337                .with_label_values(&["out-streaming"])
338                .inc_by(total_rows as u64);
339
340            let now = self.tick_manager.tick();
341            for req in reqs {
342                match req {
343                    DiffRequest::Insert(insert) => {
344                        let rows_proto: Vec<v1::Row> = insert
345                            .into_iter()
346                            .map(|(mut row, _ts)| {
347                                // extend `update_at` col if needed
348                                // if schema include a millisecond timestamp here, and result row doesn't have it, add it
349                                if row.len() < proto_schema.len()
350                                    && proto_schema[row.len()].datatype
351                                        == greptime_proto::v1::ColumnDataType::TimestampMillisecond
352                                            as i32
353                                {
354                                    row.extend([Value::from(
355                                        common_time::Timestamp::new_millisecond(now),
356                                    )]);
357                                }
358                                // ts col, if auto create
359                                if is_ts_placeholder {
360                                    ensure!(
361                                        row.len() == schema_len - 1,
362                                        InternalSnafu {
363                                            reason: format!(
364                                                "Row len mismatch, expect {} got {}",
365                                                schema_len - 1,
366                                                row.len()
367                                            )
368                                        }
369                                    );
370                                    row.extend([Value::from(
371                                        common_time::Timestamp::new_millisecond(0),
372                                    )]);
373                                }
374                                if row.len() != proto_schema.len() {
375                                    UnexpectedSnafu {
376                                        reason: format!(
377                                            "Flow output row length mismatch, expect {} got {}, the columns in schema are: {:?}",
378                                            proto_schema.len(),
379                                            row.len(),
380                                            proto_schema.iter().map(|c|&c.column_name).collect_vec()
381                                        ),
382                                    }
383                                    .fail()?;
384                                }
385                                Ok(row.into())
386                            })
387                            .collect::<Result<Vec<_>, Error>>()?;
388                        let table_name = table_name.last().unwrap().clone();
389                        let req = RowInsertRequest {
390                            table_name,
391                            rows: Some(v1::Rows {
392                                schema: proto_schema.clone(),
393                                rows: rows_proto,
394                            }),
395                        };
396                        req_cnt += 1;
397                        self.frontend_invoker
398                            .read()
399                            .await
400                            .as_ref()
401                            .with_context(|| UnexpectedSnafu {
402                                reason: "Expect a frontend invoker for flownode to write back",
403                            })?
404                            .row_inserts(RowInsertRequests { inserts: vec![req] }, ctx.clone())
405                            .await
406                            .map_err(BoxedError::new)
407                            .with_context(|_| ExternalSnafu {})?;
408                    }
409                    DiffRequest::Delete(remove) => {
410                        info!("original remove rows={:?}", remove);
411                        let rows_proto: Vec<v1::Row> = remove
412                            .into_iter()
413                            .map(|(mut row, _ts)| {
414                                row.extend(Some(Value::from(
415                                    common_time::Timestamp::new_millisecond(0),
416                                )));
417                                row.into()
418                            })
419                            .collect::<Vec<_>>();
420                        let table_name = table_name.last().unwrap().clone();
421                        let req = RowDeleteRequest {
422                            table_name,
423                            rows: Some(v1::Rows {
424                                schema: proto_schema.clone(),
425                                rows: rows_proto,
426                            }),
427                        };
428
429                        req_cnt += 1;
430                        self.frontend_invoker
431                            .read()
432                            .await
433                            .as_ref()
434                            .with_context(|| UnexpectedSnafu {
435                                reason: "Expect a frontend invoker for flownode to write back",
436                            })?
437                            .row_deletes(RowDeleteRequests { deletes: vec![req] }, ctx.clone())
438                            .await
439                            .map_err(BoxedError::new)
440                            .with_context(|_| ExternalSnafu {})?;
441                    }
442                }
443            }
444        }
445        Ok(req_cnt)
446    }
447
448    /// Generate writeback request for all sink table
449    pub async fn generate_writeback_request(
450        &self,
451    ) -> Result<BTreeMap<TableName, Vec<DiffRequest>>, Error> {
452        trace!("Start to generate writeback request");
453        let mut output = BTreeMap::new();
454        let mut total_row_count = 0;
455        for (name, sink_recv) in self
456            .node_context
457            .write()
458            .await
459            .sink_receiver
460            .iter_mut()
461            .map(|(n, (_s, r))| (n, r))
462        {
463            let mut batches = Vec::new();
464            while let Ok(batch) = sink_recv.try_recv() {
465                total_row_count += batch.row_count();
466                batches.push(batch);
467            }
468            let reqs = batches_to_rows_req(batches)?;
469            output.insert(name.clone(), reqs);
470        }
471        trace!("Prepare writeback req: total row count={}", total_row_count);
472        Ok(output)
473    }
474
475    /// Fetch table schema and primary key from table info source, if table not exist return None
476    async fn fetch_table_pk_schema(
477        &self,
478        table_name: &TableName,
479    ) -> Result<Option<(Vec<String>, Option<usize>, Vec<ColumnSchema>)>, Error> {
480        if let Some(table_id) = self
481            .table_info_source
482            .get_opt_table_id_from_name(table_name)
483            .await?
484        {
485            let table_info = self
486                .table_info_source
487                .get_table_info_value(&table_id)
488                .await?
489                .unwrap();
490            let meta = table_info.table_info.meta;
491            let primary_keys = meta
492                .primary_key_indices
493                .into_iter()
494                .map(|i| meta.schema.column_schemas[i].name.clone())
495                .collect_vec();
496            let schema = meta.schema.column_schemas;
497            let time_index = meta.schema.timestamp_index;
498            Ok(Some((primary_keys, time_index, schema)))
499        } else {
500            Ok(None)
501        }
502    }
503
504    /// return (primary keys, schema and if the table have a placeholder timestamp column)
505    /// schema of the table comes from flow's output plan
506    ///
507    /// adjust to add `update_at` column and ts placeholder if needed
508    async fn adjust_auto_created_table_schema(
509        &self,
510        schema: &RelationDesc,
511    ) -> Result<(Vec<String>, Vec<ColumnSchema>, bool), Error> {
512        // TODO(discord9): consider remove buggy auto create by schema
513
514        // TODO(discord9): use default key from schema
515        let primary_keys = schema
516            .typ()
517            .keys
518            .first()
519            .map(|v| {
520                v.column_indices
521                    .iter()
522                    .map(|i| {
523                        schema
524                            .get_name(*i)
525                            .clone()
526                            .unwrap_or_else(|| format!("col_{i}"))
527                    })
528                    .collect_vec()
529            })
530            .unwrap_or_default();
531        let update_at = ColumnSchema::new(
532            AUTO_CREATED_UPDATE_AT_TS_COL,
533            ConcreteDataType::timestamp_millisecond_datatype(),
534            true,
535        );
536
537        let original_schema = relation_desc_to_column_schemas_with_fallback(schema);
538
539        let mut with_auto_added_col = original_schema.clone();
540        with_auto_added_col.push(update_at);
541
542        // if no time index, add one as placeholder
543        let no_time_index = schema.typ().time_index.is_none();
544        if no_time_index {
545            let ts_col = ColumnSchema::new(
546                AUTO_CREATED_PLACEHOLDER_TS_COL,
547                ConcreteDataType::timestamp_millisecond_datatype(),
548                true,
549            )
550            .with_time_index(true);
551            with_auto_added_col.push(ts_col);
552        }
553
554        Ok((primary_keys, with_auto_added_col, no_time_index))
555    }
556}
557
558/// Flow Runtime related methods
559impl StreamingEngine {
560    /// Start state report handler, which will receive a sender from HeartbeatTask to send state size report back
561    ///
562    /// if heartbeat task is shutdown, this future will exit too
563    async fn start_state_report_handler(self: Arc<Self>) -> Option<JoinHandle<()>> {
564        let state_report_handler = self.state_report_handler.write().await.take();
565        if let Some(mut handler) = state_report_handler {
566            let zelf = self.clone();
567            let handler = common_runtime::spawn_global(async move {
568                while let Some(ret_handler) = handler.recv().await {
569                    let state_report = zelf.gen_state_report().await;
570                    ret_handler.send(state_report).unwrap_or_else(|err| {
571                        common_telemetry::error!(err; "Send state size report error");
572                    });
573                }
574            });
575            Some(handler)
576        } else {
577            None
578        }
579    }
580
581    /// run in common_runtime background runtime
582    pub fn run_background(
583        self: Arc<Self>,
584        shutdown: Option<broadcast::Receiver<()>>,
585    ) -> JoinHandle<()> {
586        info!("Starting flownode manager's background task");
587        common_runtime::spawn_global(async move {
588            let _state_report_handler = self.clone().start_state_report_handler().await;
589            self.run(shutdown).await;
590        })
591    }
592
593    /// log all flow errors
594    pub async fn log_all_errors(&self) {
595        for (f_id, f_err) in self.flow_err_collectors.read().await.iter() {
596            let all_errors = f_err.get_all().await;
597            if !all_errors.is_empty() {
598                let all_errors = all_errors
599                    .into_iter()
600                    .map(|i| format!("{:?}", i))
601                    .join("\n");
602                common_telemetry::error!("Flow {} has following errors: {}", f_id, all_errors);
603            }
604        }
605    }
606
607    /// Trigger dataflow running, and then send writeback request to the source sender
608    ///
609    /// note that this method didn't handle input mirror request, as this should be handled by grpc server
610    pub async fn run(&self, mut shutdown: Option<broadcast::Receiver<()>>) {
611        debug!("Starting to run");
612        let default_interval = Duration::from_secs(1);
613        let mut tick_interval = tokio::time::interval(default_interval);
614        // burst mode, so that if we miss a tick, we will run immediately to fully utilize the cpu
615        tick_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Burst);
616        let mut avg_spd = 0; // rows/sec
617        let mut since_last_run = tokio::time::Instant::now();
618        let run_per_trace = 10;
619        let mut run_cnt = 0;
620        loop {
621            // TODO(discord9): only run when new inputs arrive or scheduled to
622            let row_cnt = self.run_available(false).await.unwrap_or_else(|err| {
623                common_telemetry::error!(err;"Run available errors");
624                0
625            });
626
627            if let Err(err) = self.send_writeback_requests().await {
628                common_telemetry::error!(err;"Send writeback request errors");
629            };
630            self.log_all_errors().await;
631
632            // determine if need to shutdown
633            match &shutdown.as_mut().map(|s| s.try_recv()) {
634                Some(Ok(())) => {
635                    info!("Shutdown flow's main loop");
636                    break;
637                }
638                Some(Err(TryRecvError::Empty)) => (),
639                Some(Err(TryRecvError::Closed)) => {
640                    common_telemetry::error!("Shutdown channel is closed");
641                    break;
642                }
643                Some(Err(TryRecvError::Lagged(num))) => {
644                    common_telemetry::error!(
645                        "Shutdown channel is lagged by {}, meaning multiple shutdown cmd have been issued",
646                        num
647                    );
648                    break;
649                }
650                None => (),
651            }
652
653            // for now we want to batch rows until there is around `BATCH_SIZE` rows in send buf
654            // before trigger a run of flow's worker
655            let wait_for = since_last_run.elapsed();
656
657            // last runs insert speed
658            let cur_spd = row_cnt * 1000 / wait_for.as_millis().max(1) as usize;
659            // rapid increase, slow decay
660            avg_spd = if cur_spd > avg_spd {
661                cur_spd
662            } else {
663                (9 * avg_spd + cur_spd) / 10
664            };
665            let new_wait = BATCH_SIZE * 1000 / avg_spd.max(1); //in ms
666            let new_wait = Duration::from_millis(new_wait as u64).min(default_interval);
667
668            // print trace every `run_per_trace` times so that we can see if there is something wrong
669            // but also not get flooded with trace
670            if run_cnt >= run_per_trace {
671                trace!("avg_spd={} r/s, cur_spd={} r/s", avg_spd, cur_spd);
672                trace!("Wait for {} ms, row_cnt={}", new_wait.as_millis(), row_cnt);
673                run_cnt = 0;
674            } else {
675                run_cnt += 1;
676            }
677
678            METRIC_FLOW_RUN_INTERVAL_MS.set(new_wait.as_millis() as i64);
679            since_last_run = tokio::time::Instant::now();
680            tokio::select! {
681                _ = tick_interval.tick() => (),
682                _ = tokio::time::sleep(new_wait) => ()
683            }
684        }
685        // flow is now shutdown, drop frontend_invoker early so a ref cycle(in standalone mode) can be prevent:
686        // FlowWorkerManager.frontend_invoker -> FrontendInvoker.inserter
687        // -> Inserter.node_manager -> NodeManager.flownode -> Flownode.flow_streaming_engine.frontend_invoker
688        self.frontend_invoker.write().await.take();
689    }
690
691    /// Run all available subgraph in the flow node
692    /// This will try to run all dataflow in this node
693    ///
694    /// set `blocking` to true to wait until worker finish running
695    /// false to just trigger run and return immediately
696    /// return numbers of rows send to worker(Inaccuary)
697    /// TODO(discord9): add flag for subgraph that have input since last run
698    pub async fn run_available(&self, blocking: bool) -> Result<usize, Error> {
699        let mut row_cnt = 0;
700
701        let now = self.tick_manager.tick();
702        for worker in self.worker_handles.iter() {
703            // TODO(discord9): consider how to handle error in individual worker
704            worker.run_available(now, blocking).await?;
705        }
706        // check row send and rows remain in send buf
707        let flush_res = if blocking {
708            let ctx = self.node_context.read().await;
709            ctx.flush_all_sender().await
710        } else {
711            match self.node_context.try_read() {
712                Ok(ctx) => ctx.flush_all_sender().await,
713                Err(_) => return Ok(row_cnt),
714            }
715        };
716        match flush_res {
717            Ok(r) => {
718                common_telemetry::trace!("Total flushed {} rows", r);
719                row_cnt += r;
720            }
721            Err(err) => {
722                common_telemetry::error!("Flush send buf errors: {:?}", err);
723            }
724        };
725
726        Ok(row_cnt)
727    }
728
729    /// send write request to related source sender
730    pub async fn handle_write_request(
731        &self,
732        region_id: RegionId,
733        rows: Vec<DiffRow>,
734        batch_datatypes: &[ConcreteDataType],
735    ) -> Result<(), Error> {
736        let rows_len = rows.len();
737        let table_id = region_id.table_id();
738        let _timer = METRIC_FLOW_INSERT_ELAPSED
739            .with_label_values(&[table_id.to_string().as_str()])
740            .start_timer();
741        self.node_context
742            .read()
743            .await
744            .send(table_id, rows, batch_datatypes)
745            .await?;
746        trace!(
747            "Handling write request for table_id={} with {} rows",
748            table_id, rows_len
749        );
750        Ok(())
751    }
752}
753
754/// Create&Remove flow
755impl StreamingEngine {
756    /// remove a flow by it's id
757    pub async fn remove_flow_inner(&self, flow_id: FlowId) -> Result<(), Error> {
758        for handle in self.worker_handles.iter() {
759            if handle.contains_flow(flow_id).await? {
760                handle.remove_flow(flow_id).await?;
761                break;
762            }
763        }
764        self.node_context.write().await.remove_flow(flow_id);
765        Ok(())
766    }
767
768    /// Return task id if a new task is created, otherwise return None
769    ///
770    /// steps to create task:
771    /// 1. parse query into typed plan(and optional parse expire_after expr)
772    /// 2. render source/sink with output table id and used input table id
773    pub async fn create_flow_inner(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
774        let CreateFlowArgs {
775            flow_id,
776            sink_table_name,
777            source_table_ids,
778            create_if_not_exists,
779            or_replace,
780            expire_after,
781            eval_interval: _,
782            comment,
783            sql,
784            flow_options,
785            query_ctx,
786        } = args;
787
788        let mut node_ctx = self.node_context.write().await;
789        // assign global id to source and sink table
790        for source in &source_table_ids {
791            node_ctx
792                .assign_global_id_to_table(&self.table_info_source, None, Some(*source))
793                .await?;
794        }
795        node_ctx
796            .assign_global_id_to_table(&self.table_info_source, Some(sink_table_name.clone()), None)
797            .await?;
798
799        node_ctx.register_task_src_sink(flow_id, &source_table_ids, sink_table_name.clone());
800
801        node_ctx.query_context = query_ctx.map(Arc::new);
802        // construct a active dataflow state with it
803        let flow_plan = sql_to_flow_plan(&mut node_ctx, &self.query_engine, &sql).await?;
804
805        debug!("Flow {:?}'s Plan is {:?}", flow_id, flow_plan);
806
807        // check schema against actual table schema if exists
808        // if not exist create sink table immediately
809        if let Some((_, _, real_schema)) = self.fetch_table_pk_schema(&sink_table_name).await? {
810            let auto_schema = relation_desc_to_column_schemas_with_fallback(&flow_plan.schema);
811
812            // for column schema, only `data_type` need to be check for equality
813            // since one can omit flow's column name when write flow query
814            // print a user friendly error message about mismatch and how to correct them
815            for (idx, zipped) in auto_schema
816                .iter()
817                .zip_longest(real_schema.iter())
818                .enumerate()
819            {
820                match zipped {
821                    EitherOrBoth::Both(auto, real) => {
822                        if auto.data_type != real.data_type {
823                            InvalidQuerySnafu {
824                                    reason: format!(
825                                        "Column {}(name is '{}', flow inferred name is '{}')'s data type mismatch, expect {:?} got {:?}",
826                                        idx,
827                                        real.name,
828                                        auto.name,
829                                        real.data_type,
830                                        auto.data_type
831                                    ),
832                                }
833                                .fail()?;
834                        }
835                    }
836                    EitherOrBoth::Right(real) if real.data_type.is_timestamp() => {
837                        // if table is auto created, the last one or two column should be timestamp(update at and ts placeholder)
838                        continue;
839                    }
840                    _ => InvalidQuerySnafu {
841                        reason: format!(
842                            "schema length mismatched, expected {} found {}",
843                            real_schema.len(),
844                            auto_schema.len()
845                        ),
846                    }
847                    .fail()?,
848                }
849            }
850        } else {
851            // assign inferred schema to sink table
852            // create sink table
853            let did_create = self
854                .create_table_from_relation(
855                    &format!("flow-id={flow_id}"),
856                    &sink_table_name,
857                    &flow_plan.schema,
858                )
859                .await?;
860            if !did_create {
861                UnexpectedSnafu {
862                    reason: format!("Failed to create table {:?}", sink_table_name),
863                }
864                .fail()?;
865            }
866        }
867
868        node_ctx.add_flow_plan(flow_id, flow_plan.clone());
869
870        let _ = comment;
871        let _ = flow_options;
872
873        // TODO(discord9): add more than one handles
874        let sink_id = node_ctx.table_repr.get_by_name(&sink_table_name).unwrap().1;
875        let sink_sender = node_ctx.get_sink_by_global_id(&sink_id)?;
876
877        let source_ids = source_table_ids
878            .iter()
879            .map(|id| node_ctx.table_repr.get_by_table_id(id).unwrap().1)
880            .collect_vec();
881        let source_receivers = source_ids
882            .iter()
883            .map(|id| {
884                node_ctx
885                    .get_source_by_global_id(id)
886                    .map(|s| s.get_receiver())
887            })
888            .collect::<Result<Vec<_>, _>>()?;
889        let err_collector = ErrCollector::default();
890        self.flow_err_collectors
891            .write()
892            .await
893            .insert(flow_id, err_collector.clone());
894        // TODO(discord9): load balance?
895        let handle = self.get_worker_handle_for_create_flow().await;
896        let create_request = worker::Request::Create {
897            flow_id,
898            plan: flow_plan,
899            sink_id,
900            sink_sender,
901            source_ids,
902            src_recvs: source_receivers,
903            expire_after,
904            or_replace,
905            create_if_not_exists,
906            err_collector,
907        };
908
909        handle.create_flow(create_request).await?;
910        info!("Successfully create flow with id={}", flow_id);
911        Ok(Some(flow_id))
912    }
913
914    pub async fn flush_flow_inner(&self, flow_id: FlowId) -> Result<usize, Error> {
915        debug!("Starting to flush flow_id={:?}", flow_id);
916        // lock to make sure writes before flush are written to flow
917        // and immediately drop to prevent following writes to be blocked
918        drop(self.flush_lock.write().await);
919        let flushed_input_rows = self.node_context.read().await.flush_all_sender().await?;
920        let rows_send = self.run_available(true).await?;
921        let row = self.send_writeback_requests().await?;
922        debug!(
923            "Done to flush flow_id={:?} with {} input rows flushed, {} rows sent and {} output rows flushed",
924            flow_id, flushed_input_rows, rows_send, row
925        );
926        Ok(row)
927    }
928
929    pub async fn flow_exist_inner(&self, flow_id: FlowId) -> Result<bool, Error> {
930        let mut exist = false;
931        for handle in self.worker_handles.iter() {
932            if handle.contains_flow(flow_id).await? {
933                exist = true;
934                break;
935            }
936        }
937        Ok(exist)
938    }
939}
940
941/// FlowTickManager is a manager for flow tick, which trakc flow execution progress
942///
943/// TODO(discord9): better way to do it, and not expose flow tick even to other flow to avoid
944/// TSO coord mess
945#[derive(Clone, Debug)]
946pub struct FlowTickManager {
947    /// The starting instant of the flow, used with `start_timestamp` to calculate the current timestamp
948    start: Instant,
949    /// The timestamp when the flow started
950    start_timestamp: repr::Timestamp,
951}
952
953impl Default for FlowTickManager {
954    fn default() -> Self {
955        Self::new()
956    }
957}
958
959impl FlowTickManager {
960    pub fn new() -> Self {
961        FlowTickManager {
962            start: Instant::now(),
963            start_timestamp: SystemTime::now()
964                .duration_since(SystemTime::UNIX_EPOCH)
965                .unwrap()
966                .as_millis() as repr::Timestamp,
967        }
968    }
969
970    /// Return the current timestamp in milliseconds
971    ///
972    /// TODO(discord9): reconsider since `tick()` require a monotonic clock and also need to survive recover later
973    pub fn tick(&self) -> repr::Timestamp {
974        let current = Instant::now();
975        let since_the_epoch = current - self.start;
976        since_the_epoch.as_millis() as repr::Timestamp + self.start_timestamp
977    }
978}