flow/
adapter.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! for getting data from source and sending results to sink
16//! and communicating with other parts of the database
17#![warn(unused_imports)]
18
19use std::collections::BTreeMap;
20use std::sync::Arc;
21use std::time::{Duration, Instant, SystemTime};
22
23use api::v1::{RowDeleteRequest, RowDeleteRequests, RowInsertRequest, RowInsertRequests};
24use common_config::Configurable;
25use common_error::ext::BoxedError;
26use common_meta::key::TableMetadataManagerRef;
27use common_runtime::JoinHandle;
28use common_telemetry::logging::{LoggingOptions, TracingOptions};
29use common_telemetry::{debug, info, trace};
30use datatypes::schema::ColumnSchema;
31use datatypes::value::Value;
32use greptime_proto::v1;
33use itertools::{EitherOrBoth, Itertools};
34use meta_client::MetaClientOptions;
35use query::options::QueryOptions;
36use query::QueryEngine;
37use serde::{Deserialize, Serialize};
38use servers::grpc::GrpcOptions;
39use servers::heartbeat_options::HeartbeatOptions;
40use servers::http::HttpOptions;
41use session::context::QueryContext;
42use snafu::{ensure, OptionExt, ResultExt};
43use store_api::storage::{ConcreteDataType, RegionId};
44use table::metadata::TableId;
45use tokio::sync::broadcast::error::TryRecvError;
46use tokio::sync::{broadcast, watch, Mutex, RwLock};
47
48pub(crate) use crate::adapter::node_context::FlownodeContext;
49use crate::adapter::refill::RefillTask;
50use crate::adapter::table_source::ManagedTableSource;
51use crate::adapter::util::relation_desc_to_column_schemas_with_fallback;
52pub(crate) use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
53use crate::compute::ErrCollector;
54use crate::df_optimizer::sql_to_flow_plan;
55use crate::error::{EvalSnafu, ExternalSnafu, InternalSnafu, InvalidQuerySnafu, UnexpectedSnafu};
56use crate::expr::Batch;
57use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_ROWS, METRIC_FLOW_RUN_INTERVAL_MS};
58use crate::repr::{self, DiffRow, RelationDesc, Row, BATCH_SIZE};
59use crate::{CreateFlowArgs, FlowId, TableName};
60
61pub(crate) mod flownode_impl;
62mod parse_expr;
63pub(crate) mod refill;
64mod stat;
65#[cfg(test)]
66mod tests;
67pub(crate) mod util;
68mod worker;
69
70pub(crate) mod node_context;
71pub(crate) mod table_source;
72
73use crate::error::Error;
74use crate::utils::StateReportHandler;
75use crate::FrontendInvoker;
76
77// `GREPTIME_TIMESTAMP` is not used to distinguish when table is created automatically by flow
78pub const AUTO_CREATED_PLACEHOLDER_TS_COL: &str = "__ts_placeholder";
79
80pub const AUTO_CREATED_UPDATE_AT_TS_COL: &str = "update_at";
81
82/// Flow config that exists both in standalone&distributed mode
83#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
84#[serde(default)]
85pub struct FlowConfig {
86    pub num_workers: usize,
87}
88
89impl Default for FlowConfig {
90    fn default() -> Self {
91        Self {
92            num_workers: (common_config::utils::get_cpus() / 2).max(1),
93        }
94    }
95}
96
97/// Options for flow node
98#[derive(Clone, Debug, Serialize, Deserialize)]
99#[serde(default)]
100pub struct FlownodeOptions {
101    pub node_id: Option<u64>,
102    pub flow: FlowConfig,
103    pub grpc: GrpcOptions,
104    pub http: HttpOptions,
105    pub meta_client: Option<MetaClientOptions>,
106    pub logging: LoggingOptions,
107    pub tracing: TracingOptions,
108    pub heartbeat: HeartbeatOptions,
109    pub query: QueryOptions,
110    pub user_provider: Option<String>,
111}
112
113impl Default for FlownodeOptions {
114    fn default() -> Self {
115        Self {
116            node_id: None,
117            flow: FlowConfig::default(),
118            grpc: GrpcOptions::default().with_bind_addr("127.0.0.1:3004"),
119            http: HttpOptions::default(),
120            meta_client: None,
121            logging: LoggingOptions::default(),
122            tracing: TracingOptions::default(),
123            heartbeat: HeartbeatOptions::default(),
124            query: QueryOptions::default(),
125            user_provider: None,
126        }
127    }
128}
129
130impl Configurable for FlownodeOptions {
131    fn validate_sanitize(&mut self) -> common_config::error::Result<()> {
132        if self.flow.num_workers == 0 {
133            self.flow.num_workers = (common_config::utils::get_cpus() / 2).max(1);
134        }
135        Ok(())
136    }
137}
138
139/// Arc-ed FlowNodeManager, cheaper to clone
140pub type FlowStreamingEngineRef = Arc<StreamingEngine>;
141
142/// FlowNodeManager manages the state of all tasks in the flow node, which should be run on the same thread
143///
144/// The choice of timestamp is just using current system timestamp for now
145///
146pub struct StreamingEngine {
147    /// The handler to the worker that will run the dataflow
148    /// which is `!Send` so a handle is used
149    pub worker_handles: Vec<WorkerHandle>,
150    /// The selector to select a worker to run the dataflow
151    worker_selector: Mutex<usize>,
152    /// The query engine that will be used to parse the query and convert it to a dataflow plan
153    pub query_engine: Arc<dyn QueryEngine>,
154    /// Getting table name and table schema from table info manager
155    table_info_source: ManagedTableSource,
156    frontend_invoker: RwLock<Option<FrontendInvoker>>,
157    /// contains mapping from table name to global id, and table schema
158    node_context: RwLock<FlownodeContext>,
159    /// Contains all refill tasks
160    refill_tasks: RwLock<BTreeMap<FlowId, RefillTask>>,
161    flow_err_collectors: RwLock<BTreeMap<FlowId, ErrCollector>>,
162    src_send_buf_lens: RwLock<BTreeMap<TableId, watch::Receiver<usize>>>,
163    tick_manager: FlowTickManager,
164    /// This node id is only available in distributed mode, on standalone mode this is guaranteed to be `None`
165    pub node_id: Option<u32>,
166    /// Lock for flushing, will be `read` by `handle_inserts` and `write` by `flush_flow`
167    ///
168    /// So that a series of event like `inserts -> flush` can be handled correctly
169    flush_lock: RwLock<()>,
170    /// receive a oneshot sender to send state size report
171    state_report_handler: RwLock<Option<StateReportHandler>>,
172}
173
174/// Building FlownodeManager
175impl StreamingEngine {
176    /// set frontend invoker
177    pub async fn set_frontend_invoker(&self, frontend: FrontendInvoker) {
178        *self.frontend_invoker.write().await = Some(frontend);
179    }
180
181    /// Create **without** setting `frontend_invoker`
182    pub fn new(
183        node_id: Option<u32>,
184        query_engine: Arc<dyn QueryEngine>,
185        table_meta: TableMetadataManagerRef,
186    ) -> Self {
187        let srv_map = ManagedTableSource::new(
188            table_meta.table_info_manager().clone(),
189            table_meta.table_name_manager().clone(),
190        );
191        let node_context = FlownodeContext::new(Box::new(srv_map.clone()) as _);
192        let tick_manager = FlowTickManager::new();
193        let worker_handles = Vec::new();
194        StreamingEngine {
195            worker_handles,
196            worker_selector: Mutex::new(0),
197            query_engine,
198            table_info_source: srv_map,
199            frontend_invoker: RwLock::new(None),
200            node_context: RwLock::new(node_context),
201            refill_tasks: Default::default(),
202            flow_err_collectors: Default::default(),
203            src_send_buf_lens: Default::default(),
204            tick_manager,
205            node_id,
206            flush_lock: RwLock::new(()),
207            state_report_handler: RwLock::new(None),
208        }
209    }
210
211    pub async fn with_state_report_handler(self, handler: StateReportHandler) -> Self {
212        *self.state_report_handler.write().await = Some(handler);
213        self
214    }
215
216    /// Create a flownode manager with one worker
217    pub fn new_with_workers<'s>(
218        node_id: Option<u32>,
219        query_engine: Arc<dyn QueryEngine>,
220        table_meta: TableMetadataManagerRef,
221        num_workers: usize,
222    ) -> (Self, Vec<Worker<'s>>) {
223        let mut zelf = Self::new(node_id, query_engine, table_meta);
224
225        let workers: Vec<_> = (0..num_workers)
226            .map(|_| {
227                let (handle, worker) = create_worker();
228                zelf.add_worker_handle(handle);
229                worker
230            })
231            .collect();
232        (zelf, workers)
233    }
234
235    /// add a worker handler to manager, meaning this corresponding worker is under it's manage
236    pub fn add_worker_handle(&mut self, handle: WorkerHandle) {
237        self.worker_handles.push(handle);
238    }
239}
240
241#[derive(Debug)]
242pub enum DiffRequest {
243    Insert(Vec<(Row, repr::Timestamp)>),
244    Delete(Vec<(Row, repr::Timestamp)>),
245}
246
247impl DiffRequest {
248    pub fn len(&self) -> usize {
249        match self {
250            Self::Insert(v) => v.len(),
251            Self::Delete(v) => v.len(),
252        }
253    }
254}
255
256pub fn batches_to_rows_req(batches: Vec<Batch>) -> Result<Vec<DiffRequest>, Error> {
257    let mut reqs = Vec::new();
258    for batch in batches {
259        let mut rows = Vec::with_capacity(batch.row_count());
260        for i in 0..batch.row_count() {
261            let row = batch.get_row(i).context(EvalSnafu)?;
262            rows.push((Row::new(row), 0));
263        }
264        reqs.push(DiffRequest::Insert(rows));
265    }
266    Ok(reqs)
267}
268
269/// This impl block contains methods to send writeback requests to frontend
270impl StreamingEngine {
271    /// Return the number of requests it made
272    pub async fn send_writeback_requests(&self) -> Result<usize, Error> {
273        let all_reqs = self.generate_writeback_request().await?;
274        if all_reqs.is_empty() || all_reqs.iter().all(|v| v.1.is_empty()) {
275            return Ok(0);
276        }
277        let mut req_cnt = 0;
278        for (table_name, reqs) in all_reqs {
279            if reqs.is_empty() {
280                continue;
281            }
282            let (catalog, schema) = (table_name[0].clone(), table_name[1].clone());
283            let ctx = Arc::new(QueryContext::with(&catalog, &schema));
284
285            let (is_ts_placeholder, proto_schema) = match self
286                .try_fetch_existing_table(&table_name)
287                .await?
288                .context(UnexpectedSnafu {
289                    reason: format!("Table not found: {}", table_name.join(".")),
290                }) {
291                Ok(r) => r,
292                Err(e) => {
293                    if self
294                        .table_info_source
295                        .get_opt_table_id_from_name(&table_name)
296                        .await?
297                        .is_none()
298                    {
299                        // deal with both flow&sink table no longer exists
300                        // but some output is still in output buf
301                        common_telemetry::warn!(e; "Table `{}` no longer exists, skip writeback", table_name.join("."));
302                        continue;
303                    } else {
304                        return Err(e);
305                    }
306                }
307            };
308            let schema_len = proto_schema.len();
309
310            let total_rows = reqs.iter().map(|r| r.len()).sum::<usize>();
311            trace!(
312                "Sending {} writeback requests to table {}, reqs total rows={}",
313                reqs.len(),
314                table_name.join("."),
315                reqs.iter().map(|r| r.len()).sum::<usize>()
316            );
317
318            METRIC_FLOW_ROWS
319                .with_label_values(&["out"])
320                .inc_by(total_rows as u64);
321
322            let now = self.tick_manager.tick();
323            for req in reqs {
324                match req {
325                    DiffRequest::Insert(insert) => {
326                        let rows_proto: Vec<v1::Row> = insert
327                            .into_iter()
328                            .map(|(mut row, _ts)| {
329                                // extend `update_at` col if needed
330                                // if schema include a millisecond timestamp here, and result row doesn't have it, add it
331                                if row.len() < proto_schema.len()
332                                    && proto_schema[row.len()].datatype
333                                        == greptime_proto::v1::ColumnDataType::TimestampMillisecond
334                                            as i32
335                                {
336                                    row.extend([Value::from(
337                                        common_time::Timestamp::new_millisecond(now),
338                                    )]);
339                                }
340                                // ts col, if auto create
341                                if is_ts_placeholder {
342                                    ensure!(
343                                        row.len() == schema_len - 1,
344                                        InternalSnafu {
345                                            reason: format!(
346                                                "Row len mismatch, expect {} got {}",
347                                                schema_len - 1,
348                                                row.len()
349                                            )
350                                        }
351                                    );
352                                    row.extend([Value::from(
353                                        common_time::Timestamp::new_millisecond(0),
354                                    )]);
355                                }
356                                if row.len() != proto_schema.len() {
357                                    UnexpectedSnafu {
358                                        reason: format!(
359                                            "Flow output row length mismatch, expect {} got {}, the columns in schema are: {:?}",
360                                            proto_schema.len(),
361                                            row.len(),
362                                            proto_schema.iter().map(|c|&c.column_name).collect_vec()
363                                        ),
364                                    }
365                                    .fail()?;
366                                }
367                                Ok(row.into())
368                            })
369                            .collect::<Result<Vec<_>, Error>>()?;
370                        let table_name = table_name.last().unwrap().clone();
371                        let req = RowInsertRequest {
372                            table_name,
373                            rows: Some(v1::Rows {
374                                schema: proto_schema.clone(),
375                                rows: rows_proto,
376                            }),
377                        };
378                        req_cnt += 1;
379                        self.frontend_invoker
380                            .read()
381                            .await
382                            .as_ref()
383                            .with_context(|| UnexpectedSnafu {
384                                reason: "Expect a frontend invoker for flownode to write back",
385                            })?
386                            .row_inserts(RowInsertRequests { inserts: vec![req] }, ctx.clone())
387                            .await
388                            .map_err(BoxedError::new)
389                            .with_context(|_| ExternalSnafu {})?;
390                    }
391                    DiffRequest::Delete(remove) => {
392                        info!("original remove rows={:?}", remove);
393                        let rows_proto: Vec<v1::Row> = remove
394                            .into_iter()
395                            .map(|(mut row, _ts)| {
396                                row.extend(Some(Value::from(
397                                    common_time::Timestamp::new_millisecond(0),
398                                )));
399                                row.into()
400                            })
401                            .collect::<Vec<_>>();
402                        let table_name = table_name.last().unwrap().clone();
403                        let req = RowDeleteRequest {
404                            table_name,
405                            rows: Some(v1::Rows {
406                                schema: proto_schema.clone(),
407                                rows: rows_proto,
408                            }),
409                        };
410
411                        req_cnt += 1;
412                        self.frontend_invoker
413                            .read()
414                            .await
415                            .as_ref()
416                            .with_context(|| UnexpectedSnafu {
417                                reason: "Expect a frontend invoker for flownode to write back",
418                            })?
419                            .row_deletes(RowDeleteRequests { deletes: vec![req] }, ctx.clone())
420                            .await
421                            .map_err(BoxedError::new)
422                            .with_context(|_| ExternalSnafu {})?;
423                    }
424                }
425            }
426        }
427        Ok(req_cnt)
428    }
429
430    /// Generate writeback request for all sink table
431    pub async fn generate_writeback_request(
432        &self,
433    ) -> Result<BTreeMap<TableName, Vec<DiffRequest>>, Error> {
434        trace!("Start to generate writeback request");
435        let mut output = BTreeMap::new();
436        let mut total_row_count = 0;
437        for (name, sink_recv) in self
438            .node_context
439            .write()
440            .await
441            .sink_receiver
442            .iter_mut()
443            .map(|(n, (_s, r))| (n, r))
444        {
445            let mut batches = Vec::new();
446            while let Ok(batch) = sink_recv.try_recv() {
447                total_row_count += batch.row_count();
448                batches.push(batch);
449            }
450            let reqs = batches_to_rows_req(batches)?;
451            output.insert(name.clone(), reqs);
452        }
453        trace!("Prepare writeback req: total row count={}", total_row_count);
454        Ok(output)
455    }
456
457    /// Fetch table schema and primary key from table info source, if table not exist return None
458    async fn fetch_table_pk_schema(
459        &self,
460        table_name: &TableName,
461    ) -> Result<Option<(Vec<String>, Option<usize>, Vec<ColumnSchema>)>, Error> {
462        if let Some(table_id) = self
463            .table_info_source
464            .get_opt_table_id_from_name(table_name)
465            .await?
466        {
467            let table_info = self
468                .table_info_source
469                .get_table_info_value(&table_id)
470                .await?
471                .unwrap();
472            let meta = table_info.table_info.meta;
473            let primary_keys = meta
474                .primary_key_indices
475                .into_iter()
476                .map(|i| meta.schema.column_schemas[i].name.clone())
477                .collect_vec();
478            let schema = meta.schema.column_schemas;
479            let time_index = meta.schema.timestamp_index;
480            Ok(Some((primary_keys, time_index, schema)))
481        } else {
482            Ok(None)
483        }
484    }
485
486    /// return (primary keys, schema and if the table have a placeholder timestamp column)
487    /// schema of the table comes from flow's output plan
488    ///
489    /// adjust to add `update_at` column and ts placeholder if needed
490    async fn adjust_auto_created_table_schema(
491        &self,
492        schema: &RelationDesc,
493    ) -> Result<(Vec<String>, Vec<ColumnSchema>, bool), Error> {
494        // TODO(discord9): condiser remove buggy auto create by schema
495
496        // TODO(discord9): use default key from schema
497        let primary_keys = schema
498            .typ()
499            .keys
500            .first()
501            .map(|v| {
502                v.column_indices
503                    .iter()
504                    .map(|i| {
505                        schema
506                            .get_name(*i)
507                            .clone()
508                            .unwrap_or_else(|| format!("col_{i}"))
509                    })
510                    .collect_vec()
511            })
512            .unwrap_or_default();
513        let update_at = ColumnSchema::new(
514            AUTO_CREATED_UPDATE_AT_TS_COL,
515            ConcreteDataType::timestamp_millisecond_datatype(),
516            true,
517        );
518
519        let original_schema = relation_desc_to_column_schemas_with_fallback(schema);
520
521        let mut with_auto_added_col = original_schema.clone();
522        with_auto_added_col.push(update_at);
523
524        // if no time index, add one as placeholder
525        let no_time_index = schema.typ().time_index.is_none();
526        if no_time_index {
527            let ts_col = ColumnSchema::new(
528                AUTO_CREATED_PLACEHOLDER_TS_COL,
529                ConcreteDataType::timestamp_millisecond_datatype(),
530                true,
531            )
532            .with_time_index(true);
533            with_auto_added_col.push(ts_col);
534        }
535
536        Ok((primary_keys, with_auto_added_col, no_time_index))
537    }
538}
539
540/// Flow Runtime related methods
541impl StreamingEngine {
542    /// Start state report handler, which will receive a sender from HeartbeatTask to send state size report back
543    ///
544    /// if heartbeat task is shutdown, this future will exit too
545    async fn start_state_report_handler(self: Arc<Self>) -> Option<JoinHandle<()>> {
546        let state_report_handler = self.state_report_handler.write().await.take();
547        if let Some(mut handler) = state_report_handler {
548            let zelf = self.clone();
549            let handler = common_runtime::spawn_global(async move {
550                while let Some(ret_handler) = handler.recv().await {
551                    let state_report = zelf.gen_state_report().await;
552                    ret_handler.send(state_report).unwrap_or_else(|err| {
553                        common_telemetry::error!(err; "Send state size report error");
554                    });
555                }
556            });
557            Some(handler)
558        } else {
559            None
560        }
561    }
562
563    /// run in common_runtime background runtime
564    pub fn run_background(
565        self: Arc<Self>,
566        shutdown: Option<broadcast::Receiver<()>>,
567    ) -> JoinHandle<()> {
568        info!("Starting flownode manager's background task");
569        common_runtime::spawn_global(async move {
570            let _state_report_handler = self.clone().start_state_report_handler().await;
571            self.run(shutdown).await;
572        })
573    }
574
575    /// log all flow errors
576    pub async fn log_all_errors(&self) {
577        for (f_id, f_err) in self.flow_err_collectors.read().await.iter() {
578            let all_errors = f_err.get_all().await;
579            if !all_errors.is_empty() {
580                let all_errors = all_errors
581                    .into_iter()
582                    .map(|i| format!("{:?}", i))
583                    .join("\n");
584                common_telemetry::error!("Flow {} has following errors: {}", f_id, all_errors);
585            }
586        }
587    }
588
589    /// Trigger dataflow running, and then send writeback request to the source sender
590    ///
591    /// note that this method didn't handle input mirror request, as this should be handled by grpc server
592    pub async fn run(&self, mut shutdown: Option<broadcast::Receiver<()>>) {
593        debug!("Starting to run");
594        let default_interval = Duration::from_secs(1);
595        let mut tick_interval = tokio::time::interval(default_interval);
596        // burst mode, so that if we miss a tick, we will run immediately to fully utilize the cpu
597        tick_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Burst);
598        let mut avg_spd = 0; // rows/sec
599        let mut since_last_run = tokio::time::Instant::now();
600        let run_per_trace = 10;
601        let mut run_cnt = 0;
602        loop {
603            // TODO(discord9): only run when new inputs arrive or scheduled to
604            let row_cnt = self.run_available(false).await.unwrap_or_else(|err| {
605                common_telemetry::error!(err;"Run available errors");
606                0
607            });
608
609            if let Err(err) = self.send_writeback_requests().await {
610                common_telemetry::error!(err;"Send writeback request errors");
611            };
612            self.log_all_errors().await;
613
614            // determine if need to shutdown
615            match &shutdown.as_mut().map(|s| s.try_recv()) {
616                Some(Ok(())) => {
617                    info!("Shutdown flow's main loop");
618                    break;
619                }
620                Some(Err(TryRecvError::Empty)) => (),
621                Some(Err(TryRecvError::Closed)) => {
622                    common_telemetry::error!("Shutdown channel is closed");
623                    break;
624                }
625                Some(Err(TryRecvError::Lagged(num))) => {
626                    common_telemetry::error!("Shutdown channel is lagged by {}, meaning multiple shutdown cmd have been issued", num);
627                    break;
628                }
629                None => (),
630            }
631
632            // for now we want to batch rows until there is around `BATCH_SIZE` rows in send buf
633            // before trigger a run of flow's worker
634            let wait_for = since_last_run.elapsed();
635
636            // last runs insert speed
637            let cur_spd = row_cnt * 1000 / wait_for.as_millis().max(1) as usize;
638            // rapid increase, slow decay
639            avg_spd = if cur_spd > avg_spd {
640                cur_spd
641            } else {
642                (9 * avg_spd + cur_spd) / 10
643            };
644            let new_wait = BATCH_SIZE * 1000 / avg_spd.max(1); //in ms
645            let new_wait = Duration::from_millis(new_wait as u64).min(default_interval);
646
647            // print trace every `run_per_trace` times so that we can see if there is something wrong
648            // but also not get flooded with trace
649            if run_cnt >= run_per_trace {
650                trace!("avg_spd={} r/s, cur_spd={} r/s", avg_spd, cur_spd);
651                trace!("Wait for {} ms, row_cnt={}", new_wait.as_millis(), row_cnt);
652                run_cnt = 0;
653            } else {
654                run_cnt += 1;
655            }
656
657            METRIC_FLOW_RUN_INTERVAL_MS.set(new_wait.as_millis() as i64);
658            since_last_run = tokio::time::Instant::now();
659            tokio::select! {
660                _ = tick_interval.tick() => (),
661                _ = tokio::time::sleep(new_wait) => ()
662            }
663        }
664        // flow is now shutdown, drop frontend_invoker early so a ref cycle(in standalone mode) can be prevent:
665        // FlowWorkerManager.frontend_invoker -> FrontendInvoker.inserter
666        // -> Inserter.node_manager -> NodeManager.flownode -> Flownode.flow_streaming_engine.frontend_invoker
667        self.frontend_invoker.write().await.take();
668    }
669
670    /// Run all available subgraph in the flow node
671    /// This will try to run all dataflow in this node
672    ///
673    /// set `blocking` to true to wait until worker finish running
674    /// false to just trigger run and return immediately
675    /// return numbers of rows send to worker(Inaccuary)
676    /// TODO(discord9): add flag for subgraph that have input since last run
677    pub async fn run_available(&self, blocking: bool) -> Result<usize, Error> {
678        let mut row_cnt = 0;
679
680        let now = self.tick_manager.tick();
681        for worker in self.worker_handles.iter() {
682            // TODO(discord9): consider how to handle error in individual worker
683            worker.run_available(now, blocking).await?;
684        }
685        // check row send and rows remain in send buf
686        let flush_res = if blocking {
687            let ctx = self.node_context.read().await;
688            ctx.flush_all_sender().await
689        } else {
690            match self.node_context.try_read() {
691                Ok(ctx) => ctx.flush_all_sender().await,
692                Err(_) => return Ok(row_cnt),
693            }
694        };
695        match flush_res {
696            Ok(r) => {
697                common_telemetry::trace!("Total flushed {} rows", r);
698                row_cnt += r;
699            }
700            Err(err) => {
701                common_telemetry::error!("Flush send buf errors: {:?}", err);
702            }
703        };
704
705        Ok(row_cnt)
706    }
707
708    /// send write request to related source sender
709    pub async fn handle_write_request(
710        &self,
711        region_id: RegionId,
712        rows: Vec<DiffRow>,
713        batch_datatypes: &[ConcreteDataType],
714    ) -> Result<(), Error> {
715        let rows_len = rows.len();
716        let table_id = region_id.table_id();
717        let _timer = METRIC_FLOW_INSERT_ELAPSED
718            .with_label_values(&[table_id.to_string().as_str()])
719            .start_timer();
720        self.node_context
721            .read()
722            .await
723            .send(table_id, rows, batch_datatypes)
724            .await?;
725        trace!(
726            "Handling write request for table_id={} with {} rows",
727            table_id,
728            rows_len
729        );
730        Ok(())
731    }
732}
733
734/// Create&Remove flow
735impl StreamingEngine {
736    /// remove a flow by it's id
737    pub async fn remove_flow_inner(&self, flow_id: FlowId) -> Result<(), Error> {
738        for handle in self.worker_handles.iter() {
739            if handle.contains_flow(flow_id).await? {
740                handle.remove_flow(flow_id).await?;
741                break;
742            }
743        }
744        self.node_context.write().await.remove_flow(flow_id);
745        Ok(())
746    }
747
748    /// Return task id if a new task is created, otherwise return None
749    ///
750    /// steps to create task:
751    /// 1. parse query into typed plan(and optional parse expire_after expr)
752    /// 2. render source/sink with output table id and used input table id
753    pub async fn create_flow_inner(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
754        let CreateFlowArgs {
755            flow_id,
756            sink_table_name,
757            source_table_ids,
758            create_if_not_exists,
759            or_replace,
760            expire_after,
761            comment,
762            sql,
763            flow_options,
764            query_ctx,
765        } = args;
766
767        let mut node_ctx = self.node_context.write().await;
768        // assign global id to source and sink table
769        for source in &source_table_ids {
770            node_ctx
771                .assign_global_id_to_table(&self.table_info_source, None, Some(*source))
772                .await?;
773        }
774        node_ctx
775            .assign_global_id_to_table(&self.table_info_source, Some(sink_table_name.clone()), None)
776            .await?;
777
778        node_ctx.register_task_src_sink(flow_id, &source_table_ids, sink_table_name.clone());
779
780        node_ctx.query_context = query_ctx.map(Arc::new);
781        // construct a active dataflow state with it
782        let flow_plan = sql_to_flow_plan(&mut node_ctx, &self.query_engine, &sql).await?;
783
784        debug!("Flow {:?}'s Plan is {:?}", flow_id, flow_plan);
785
786        // check schema against actual table schema if exists
787        // if not exist create sink table immediately
788        if let Some((_, _, real_schema)) = self.fetch_table_pk_schema(&sink_table_name).await? {
789            let auto_schema = relation_desc_to_column_schemas_with_fallback(&flow_plan.schema);
790
791            // for column schema, only `data_type` need to be check for equality
792            // since one can omit flow's column name when write flow query
793            // print a user friendly error message about mismatch and how to correct them
794            for (idx, zipped) in auto_schema
795                .iter()
796                .zip_longest(real_schema.iter())
797                .enumerate()
798            {
799                match zipped {
800                    EitherOrBoth::Both(auto, real) => {
801                        if auto.data_type != real.data_type {
802                            InvalidQuerySnafu {
803                                    reason: format!(
804                                        "Column {}(name is '{}', flow inferred name is '{}')'s data type mismatch, expect {:?} got {:?}",
805                                        idx,
806                                        real.name,
807                                        auto.name,
808                                        real.data_type,
809                                        auto.data_type
810                                    ),
811                                }
812                                .fail()?;
813                        }
814                    }
815                    EitherOrBoth::Right(real) if real.data_type.is_timestamp() => {
816                        // if table is auto created, the last one or two column should be timestamp(update at and ts placeholder)
817                        continue;
818                    }
819                    _ => InvalidQuerySnafu {
820                        reason: format!(
821                            "schema length mismatched, expected {} found {}",
822                            real_schema.len(),
823                            auto_schema.len()
824                        ),
825                    }
826                    .fail()?,
827                }
828            }
829        } else {
830            // assign inferred schema to sink table
831            // create sink table
832            let did_create = self
833                .create_table_from_relation(
834                    &format!("flow-id={flow_id}"),
835                    &sink_table_name,
836                    &flow_plan.schema,
837                )
838                .await?;
839            if !did_create {
840                UnexpectedSnafu {
841                    reason: format!("Failed to create table {:?}", sink_table_name),
842                }
843                .fail()?;
844            }
845        }
846
847        node_ctx.add_flow_plan(flow_id, flow_plan.clone());
848
849        let _ = comment;
850        let _ = flow_options;
851
852        // TODO(discord9): add more than one handles
853        let sink_id = node_ctx.table_repr.get_by_name(&sink_table_name).unwrap().1;
854        let sink_sender = node_ctx.get_sink_by_global_id(&sink_id)?;
855
856        let source_ids = source_table_ids
857            .iter()
858            .map(|id| node_ctx.table_repr.get_by_table_id(id).unwrap().1)
859            .collect_vec();
860        let source_receivers = source_ids
861            .iter()
862            .map(|id| {
863                node_ctx
864                    .get_source_by_global_id(id)
865                    .map(|s| s.get_receiver())
866            })
867            .collect::<Result<Vec<_>, _>>()?;
868        let err_collector = ErrCollector::default();
869        self.flow_err_collectors
870            .write()
871            .await
872            .insert(flow_id, err_collector.clone());
873        // TODO(discord9): load balance?
874        let handle = self.get_worker_handle_for_create_flow().await;
875        let create_request = worker::Request::Create {
876            flow_id,
877            plan: flow_plan,
878            sink_id,
879            sink_sender,
880            source_ids,
881            src_recvs: source_receivers,
882            expire_after,
883            or_replace,
884            create_if_not_exists,
885            err_collector,
886        };
887
888        handle.create_flow(create_request).await?;
889        info!("Successfully create flow with id={}", flow_id);
890        Ok(Some(flow_id))
891    }
892
893    pub async fn flush_flow_inner(&self, flow_id: FlowId) -> Result<usize, Error> {
894        debug!("Starting to flush flow_id={:?}", flow_id);
895        // lock to make sure writes before flush are written to flow
896        // and immediately drop to prevent following writes to be blocked
897        drop(self.flush_lock.write().await);
898        let flushed_input_rows = self.node_context.read().await.flush_all_sender().await?;
899        let rows_send = self.run_available(true).await?;
900        let row = self.send_writeback_requests().await?;
901        debug!(
902            "Done to flush flow_id={:?} with {} input rows flushed, {} rows sended and {} output rows flushed",
903            flow_id, flushed_input_rows, rows_send, row
904        );
905        Ok(row)
906    }
907
908    pub async fn flow_exist_inner(&self, flow_id: FlowId) -> Result<bool, Error> {
909        let mut exist = false;
910        for handle in self.worker_handles.iter() {
911            if handle.contains_flow(flow_id).await? {
912                exist = true;
913                break;
914            }
915        }
916        Ok(exist)
917    }
918}
919
920/// FlowTickManager is a manager for flow tick, which trakc flow execution progress
921///
922/// TODO(discord9): better way to do it, and not expose flow tick even to other flow to avoid
923/// TSO coord mess
924#[derive(Clone, Debug)]
925pub struct FlowTickManager {
926    /// The starting instant of the flow, used with `start_timestamp` to calculate the current timestamp
927    start: Instant,
928    /// The timestamp when the flow started
929    start_timestamp: repr::Timestamp,
930}
931
932impl FlowTickManager {
933    pub fn new() -> Self {
934        FlowTickManager {
935            start: Instant::now(),
936            start_timestamp: SystemTime::now()
937                .duration_since(SystemTime::UNIX_EPOCH)
938                .unwrap()
939                .as_millis() as repr::Timestamp,
940        }
941    }
942
943    /// Return the current timestamp in milliseconds
944    ///
945    /// TODO(discord9): reconsider since `tick()` require a monotonic clock and also need to survive recover later
946    pub fn tick(&self) -> repr::Timestamp {
947        let current = Instant::now();
948        let since_the_epoch = current - self.start;
949        since_the_epoch.as_millis() as repr::Timestamp + self.start_timestamp
950    }
951}