flow/
adapter.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! for getting data from source and sending results to sink
16//! and communicating with other parts of the database
17#![warn(unused_imports)]
18
19use std::collections::BTreeMap;
20use std::sync::Arc;
21use std::time::{Duration, Instant, SystemTime};
22
23use api::v1::{RowDeleteRequest, RowDeleteRequests, RowInsertRequest, RowInsertRequests};
24use common_base::memory_limit::MemoryLimit;
25use common_config::Configurable;
26use common_error::ext::BoxedError;
27use common_meta::key::TableMetadataManagerRef;
28use common_options::memory::MemoryOptions;
29use common_runtime::JoinHandle;
30use common_stat::get_total_cpu_cores;
31use common_telemetry::logging::{LoggingOptions, TracingOptions};
32use common_telemetry::{debug, info, trace};
33use datatypes::schema::ColumnSchema;
34use datatypes::value::Value;
35use greptime_proto::v1;
36use itertools::{EitherOrBoth, Itertools};
37use meta_client::MetaClientOptions;
38use query::QueryEngine;
39use query::options::QueryOptions;
40use serde::{Deserialize, Serialize};
41use servers::grpc::GrpcOptions;
42use servers::http::HttpOptions;
43use session::context::QueryContext;
44use snafu::{OptionExt, ResultExt, ensure};
45use store_api::storage::{ConcreteDataType, RegionId};
46use table::metadata::TableId;
47use tokio::sync::broadcast::error::TryRecvError;
48use tokio::sync::{Mutex, RwLock, broadcast, watch};
49
50pub(crate) use crate::adapter::node_context::FlownodeContext;
51use crate::adapter::refill::RefillTask;
52use crate::adapter::table_source::ManagedTableSource;
53use crate::adapter::util::relation_desc_to_column_schemas_with_fallback;
54pub(crate) use crate::adapter::worker::{Worker, WorkerHandle, create_worker};
55use crate::batching_mode::BatchingModeOptions;
56use crate::compute::ErrCollector;
57use crate::df_optimizer::sql_to_flow_plan;
58use crate::error::{EvalSnafu, ExternalSnafu, InternalSnafu, InvalidQuerySnafu, UnexpectedSnafu};
59use crate::expr::Batch;
60use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_ROWS, METRIC_FLOW_RUN_INTERVAL_MS};
61use crate::repr::{self, BATCH_SIZE, DiffRow, RelationDesc, Row};
62use crate::{CreateFlowArgs, FlowId, TableName};
63
64pub(crate) mod flownode_impl;
65mod parse_expr;
66pub(crate) mod refill;
67mod stat;
68#[cfg(test)]
69mod tests;
70pub(crate) mod util;
71mod worker;
72
73pub(crate) mod node_context;
74pub(crate) mod table_source;
75
76use crate::FrontendInvoker;
77use crate::error::Error;
78
79// `GREPTIME_TIMESTAMP` is not used to distinguish when table is created automatically by flow
80pub const AUTO_CREATED_PLACEHOLDER_TS_COL: &str = "__ts_placeholder";
81
82pub const AUTO_CREATED_UPDATE_AT_TS_COL: &str = "update_at";
83
84/// Flow config that exists both in standalone&distributed mode
85#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
86#[serde(default)]
87pub struct FlowConfig {
88    pub num_workers: usize,
89    pub batching_mode: BatchingModeOptions,
90}
91
92impl Default for FlowConfig {
93    fn default() -> Self {
94        Self {
95            num_workers: (get_total_cpu_cores() / 2).max(1),
96            batching_mode: BatchingModeOptions::default(),
97        }
98    }
99}
100
101/// Options for flow node
102#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
103#[serde(default)]
104pub struct FlownodeOptions {
105    pub node_id: Option<u64>,
106    pub flow: FlowConfig,
107    pub grpc: GrpcOptions,
108    pub http: HttpOptions,
109    pub meta_client: Option<MetaClientOptions>,
110    pub logging: LoggingOptions,
111    pub tracing: TracingOptions,
112    pub query: QueryOptions,
113    pub user_provider: Option<String>,
114    pub memory: MemoryOptions,
115}
116
117impl Default for FlownodeOptions {
118    fn default() -> Self {
119        Self {
120            node_id: None,
121            flow: FlowConfig::default(),
122            grpc: GrpcOptions::default().with_bind_addr("127.0.0.1:3004"),
123            http: HttpOptions::default(),
124            meta_client: None,
125            logging: LoggingOptions::default(),
126            tracing: TracingOptions::default(),
127            // flownode's query option is set to 1 to throttle flow's query so
128            // that it won't use too much cpu or memory
129            query: QueryOptions {
130                parallelism: 1,
131                allow_query_fallback: false,
132                memory_pool_size: MemoryLimit::default(),
133            },
134            user_provider: None,
135            memory: MemoryOptions::default(),
136        }
137    }
138}
139
140impl Configurable for FlownodeOptions {
141    fn validate_sanitize(&mut self) -> common_config::error::Result<()> {
142        if self.flow.num_workers == 0 {
143            self.flow.num_workers = (get_total_cpu_cores() / 2).max(1);
144        }
145        Ok(())
146    }
147}
148
149/// Arc-ed FlowNodeManager, cheaper to clone
150pub type FlowStreamingEngineRef = Arc<StreamingEngine>;
151
152/// FlowNodeManager manages the state of all tasks in the flow node, which should be run on the same thread
153///
154/// The choice of timestamp is just using current system timestamp for now
155///
156pub struct StreamingEngine {
157    /// The handler to the worker that will run the dataflow
158    /// which is `!Send` so a handle is used
159    pub worker_handles: Vec<WorkerHandle>,
160    /// The selector to select a worker to run the dataflow
161    worker_selector: Mutex<usize>,
162    /// The query engine that will be used to parse the query and convert it to a dataflow plan
163    pub query_engine: Arc<dyn QueryEngine>,
164    /// Getting table name and table schema from table info manager
165    table_info_source: ManagedTableSource,
166    frontend_invoker: RwLock<Option<FrontendInvoker>>,
167    /// contains mapping from table name to global id, and table schema
168    node_context: RwLock<FlownodeContext>,
169    /// Contains all refill tasks
170    refill_tasks: RwLock<BTreeMap<FlowId, RefillTask>>,
171    flow_err_collectors: RwLock<BTreeMap<FlowId, ErrCollector>>,
172    src_send_buf_lens: RwLock<BTreeMap<TableId, watch::Receiver<usize>>>,
173    tick_manager: FlowTickManager,
174    /// This node id is only available in distributed mode, on standalone mode this is guaranteed to be `None`
175    pub node_id: Option<u32>,
176    /// Lock for flushing, will be `read` by `handle_inserts` and `write` by `flush_flow`
177    ///
178    /// So that a series of event like `inserts -> flush` can be handled correctly
179    flush_lock: RwLock<()>,
180}
181
182/// Building FlownodeManager
183impl StreamingEngine {
184    /// set frontend invoker
185    pub async fn set_frontend_invoker(&self, frontend: FrontendInvoker) {
186        *self.frontend_invoker.write().await = Some(frontend);
187    }
188
189    /// Create **without** setting `frontend_invoker`
190    pub fn new(
191        node_id: Option<u32>,
192        query_engine: Arc<dyn QueryEngine>,
193        table_meta: TableMetadataManagerRef,
194    ) -> Self {
195        let srv_map = ManagedTableSource::new(
196            table_meta.table_info_manager().clone(),
197            table_meta.table_name_manager().clone(),
198        );
199        let node_context = FlownodeContext::new(Box::new(srv_map.clone()) as _);
200        let tick_manager = FlowTickManager::new();
201        let worker_handles = Vec::new();
202        StreamingEngine {
203            worker_handles,
204            worker_selector: Mutex::new(0),
205            query_engine,
206            table_info_source: srv_map,
207            frontend_invoker: RwLock::new(None),
208            node_context: RwLock::new(node_context),
209            refill_tasks: Default::default(),
210            flow_err_collectors: Default::default(),
211            src_send_buf_lens: Default::default(),
212            tick_manager,
213            node_id,
214            flush_lock: RwLock::new(()),
215        }
216    }
217
218    /// Create a flownode manager with one worker
219    pub fn new_with_workers<'s>(
220        node_id: Option<u32>,
221        query_engine: Arc<dyn QueryEngine>,
222        table_meta: TableMetadataManagerRef,
223        num_workers: usize,
224    ) -> (Self, Vec<Worker<'s>>) {
225        let mut zelf = Self::new(node_id, query_engine, table_meta);
226
227        let workers: Vec<_> = (0..num_workers)
228            .map(|_| {
229                let (handle, worker) = create_worker();
230                zelf.add_worker_handle(handle);
231                worker
232            })
233            .collect();
234        (zelf, workers)
235    }
236
237    /// add a worker handler to manager, meaning this corresponding worker is under it's manage
238    pub fn add_worker_handle(&mut self, handle: WorkerHandle) {
239        self.worker_handles.push(handle);
240    }
241}
242
243#[derive(Debug)]
244pub enum DiffRequest {
245    Insert(Vec<(Row, repr::Timestamp)>),
246    Delete(Vec<(Row, repr::Timestamp)>),
247}
248
249impl DiffRequest {
250    pub fn len(&self) -> usize {
251        match self {
252            Self::Insert(v) => v.len(),
253            Self::Delete(v) => v.len(),
254        }
255    }
256
257    pub fn is_empty(&self) -> bool {
258        self.len() == 0
259    }
260}
261
262pub fn batches_to_rows_req(batches: Vec<Batch>) -> Result<Vec<DiffRequest>, Error> {
263    let mut reqs = Vec::new();
264    for batch in batches {
265        let mut rows = Vec::with_capacity(batch.row_count());
266        for i in 0..batch.row_count() {
267            let row = batch.get_row(i).context(EvalSnafu)?;
268            rows.push((Row::new(row), 0));
269        }
270        reqs.push(DiffRequest::Insert(rows));
271    }
272    Ok(reqs)
273}
274
275/// This impl block contains methods to send writeback requests to frontend
276impl StreamingEngine {
277    /// Return the number of requests it made
278    pub async fn send_writeback_requests(&self) -> Result<usize, Error> {
279        let all_reqs = self.generate_writeback_request().await?;
280        if all_reqs.is_empty() || all_reqs.iter().all(|v| v.1.is_empty()) {
281            return Ok(0);
282        }
283        let mut req_cnt = 0;
284        for (table_name, reqs) in all_reqs {
285            if reqs.is_empty() {
286                continue;
287            }
288            let (catalog, schema) = (table_name[0].clone(), table_name[1].clone());
289            let ctx = Arc::new(QueryContext::with(&catalog, &schema));
290
291            let (is_ts_placeholder, proto_schema) = match self
292                .try_fetch_existing_table(&table_name)
293                .await?
294                .context(UnexpectedSnafu {
295                    reason: format!("Table not found: {}", table_name.join(".")),
296                }) {
297                Ok(r) => r,
298                Err(e) => {
299                    if self
300                        .table_info_source
301                        .get_opt_table_id_from_name(&table_name)
302                        .await?
303                        .is_none()
304                    {
305                        // deal with both flow&sink table no longer exists
306                        // but some output is still in output buf
307                        common_telemetry::warn!(e; "Table `{}` no longer exists, skip writeback", table_name.join("."));
308                        continue;
309                    } else {
310                        return Err(e);
311                    }
312                }
313            };
314            let schema_len = proto_schema.len();
315
316            let total_rows = reqs.iter().map(|r| r.len()).sum::<usize>();
317            trace!(
318                "Sending {} writeback requests to table {}, reqs total rows={}",
319                reqs.len(),
320                table_name.join("."),
321                reqs.iter().map(|r| r.len()).sum::<usize>()
322            );
323
324            METRIC_FLOW_ROWS
325                .with_label_values(&["out-streaming"])
326                .inc_by(total_rows as u64);
327
328            let now = self.tick_manager.tick();
329            for req in reqs {
330                match req {
331                    DiffRequest::Insert(insert) => {
332                        let rows_proto: Vec<v1::Row> = insert
333                            .into_iter()
334                            .map(|(mut row, _ts)| {
335                                // extend `update_at` col if needed
336                                // if schema include a millisecond timestamp here, and result row doesn't have it, add it
337                                if row.len() < proto_schema.len()
338                                    && proto_schema[row.len()].datatype
339                                        == greptime_proto::v1::ColumnDataType::TimestampMillisecond
340                                            as i32
341                                {
342                                    row.extend([Value::from(
343                                        common_time::Timestamp::new_millisecond(now),
344                                    )]);
345                                }
346                                // ts col, if auto create
347                                if is_ts_placeholder {
348                                    ensure!(
349                                        row.len() == schema_len - 1,
350                                        InternalSnafu {
351                                            reason: format!(
352                                                "Row len mismatch, expect {} got {}",
353                                                schema_len - 1,
354                                                row.len()
355                                            )
356                                        }
357                                    );
358                                    row.extend([Value::from(
359                                        common_time::Timestamp::new_millisecond(0),
360                                    )]);
361                                }
362                                if row.len() != proto_schema.len() {
363                                    UnexpectedSnafu {
364                                        reason: format!(
365                                            "Flow output row length mismatch, expect {} got {}, the columns in schema are: {:?}",
366                                            proto_schema.len(),
367                                            row.len(),
368                                            proto_schema.iter().map(|c|&c.column_name).collect_vec()
369                                        ),
370                                    }
371                                    .fail()?;
372                                }
373                                Ok(row.into())
374                            })
375                            .collect::<Result<Vec<_>, Error>>()?;
376                        let table_name = table_name.last().unwrap().clone();
377                        let req = RowInsertRequest {
378                            table_name,
379                            rows: Some(v1::Rows {
380                                schema: proto_schema.clone(),
381                                rows: rows_proto,
382                            }),
383                        };
384                        req_cnt += 1;
385                        self.frontend_invoker
386                            .read()
387                            .await
388                            .as_ref()
389                            .with_context(|| UnexpectedSnafu {
390                                reason: "Expect a frontend invoker for flownode to write back",
391                            })?
392                            .row_inserts(RowInsertRequests { inserts: vec![req] }, ctx.clone())
393                            .await
394                            .map_err(BoxedError::new)
395                            .with_context(|_| ExternalSnafu {})?;
396                    }
397                    DiffRequest::Delete(remove) => {
398                        info!("original remove rows={:?}", remove);
399                        let rows_proto: Vec<v1::Row> = remove
400                            .into_iter()
401                            .map(|(mut row, _ts)| {
402                                row.extend(Some(Value::from(
403                                    common_time::Timestamp::new_millisecond(0),
404                                )));
405                                row.into()
406                            })
407                            .collect::<Vec<_>>();
408                        let table_name = table_name.last().unwrap().clone();
409                        let req = RowDeleteRequest {
410                            table_name,
411                            rows: Some(v1::Rows {
412                                schema: proto_schema.clone(),
413                                rows: rows_proto,
414                            }),
415                        };
416
417                        req_cnt += 1;
418                        self.frontend_invoker
419                            .read()
420                            .await
421                            .as_ref()
422                            .with_context(|| UnexpectedSnafu {
423                                reason: "Expect a frontend invoker for flownode to write back",
424                            })?
425                            .row_deletes(RowDeleteRequests { deletes: vec![req] }, ctx.clone())
426                            .await
427                            .map_err(BoxedError::new)
428                            .with_context(|_| ExternalSnafu {})?;
429                    }
430                }
431            }
432        }
433        Ok(req_cnt)
434    }
435
436    /// Generate writeback request for all sink table
437    pub async fn generate_writeback_request(
438        &self,
439    ) -> Result<BTreeMap<TableName, Vec<DiffRequest>>, Error> {
440        trace!("Start to generate writeback request");
441        let mut output = BTreeMap::new();
442        let mut total_row_count = 0;
443        for (name, sink_recv) in self
444            .node_context
445            .write()
446            .await
447            .sink_receiver
448            .iter_mut()
449            .map(|(n, (_s, r))| (n, r))
450        {
451            let mut batches = Vec::new();
452            while let Ok(batch) = sink_recv.try_recv() {
453                total_row_count += batch.row_count();
454                batches.push(batch);
455            }
456            let reqs = batches_to_rows_req(batches)?;
457            output.insert(name.clone(), reqs);
458        }
459        trace!("Prepare writeback req: total row count={}", total_row_count);
460        Ok(output)
461    }
462
463    /// Fetch table schema and primary key from table info source, if table not exist return None
464    async fn fetch_table_pk_schema(
465        &self,
466        table_name: &TableName,
467    ) -> Result<Option<(Vec<String>, Option<usize>, Vec<ColumnSchema>)>, Error> {
468        if let Some(table_id) = self
469            .table_info_source
470            .get_opt_table_id_from_name(table_name)
471            .await?
472        {
473            let table_info = self
474                .table_info_source
475                .get_table_info_value(&table_id)
476                .await?
477                .unwrap();
478            let meta = table_info.table_info.meta;
479            let schema = meta.schema.column_schemas().to_vec();
480            let primary_keys = meta
481                .primary_key_indices
482                .into_iter()
483                .map(|i| schema[i].name.clone())
484                .collect_vec();
485            let time_index = meta.schema.timestamp_index();
486            Ok(Some((primary_keys, time_index, schema)))
487        } else {
488            Ok(None)
489        }
490    }
491
492    /// return (primary keys, schema and if the table have a placeholder timestamp column)
493    /// schema of the table comes from flow's output plan
494    ///
495    /// adjust to add `update_at` column and ts placeholder if needed
496    async fn adjust_auto_created_table_schema(
497        &self,
498        schema: &RelationDesc,
499    ) -> Result<(Vec<String>, Vec<ColumnSchema>, bool), Error> {
500        // TODO(discord9): consider remove buggy auto create by schema
501
502        // TODO(discord9): use default key from schema
503        let primary_keys = schema
504            .typ()
505            .keys
506            .first()
507            .map(|v| {
508                v.column_indices
509                    .iter()
510                    .map(|i| {
511                        schema
512                            .get_name(*i)
513                            .clone()
514                            .unwrap_or_else(|| format!("col_{i}"))
515                    })
516                    .collect_vec()
517            })
518            .unwrap_or_default();
519        let update_at = ColumnSchema::new(
520            AUTO_CREATED_UPDATE_AT_TS_COL,
521            ConcreteDataType::timestamp_millisecond_datatype(),
522            true,
523        );
524
525        let original_schema = relation_desc_to_column_schemas_with_fallback(schema);
526
527        let mut with_auto_added_col = original_schema.clone();
528        with_auto_added_col.push(update_at);
529
530        // if no time index, add one as placeholder
531        let no_time_index = schema.typ().time_index.is_none();
532        if no_time_index {
533            let ts_col = ColumnSchema::new(
534                AUTO_CREATED_PLACEHOLDER_TS_COL,
535                ConcreteDataType::timestamp_millisecond_datatype(),
536                true,
537            )
538            .with_time_index(true);
539            with_auto_added_col.push(ts_col);
540        }
541
542        Ok((primary_keys, with_auto_added_col, no_time_index))
543    }
544}
545
546/// Flow Runtime related methods
547impl StreamingEngine {
548    /// run in common_runtime background runtime
549    pub fn run_background(
550        self: Arc<Self>,
551        shutdown: Option<broadcast::Receiver<()>>,
552    ) -> JoinHandle<()> {
553        info!("Starting flownode manager's background task");
554        common_runtime::spawn_global(async move { self.run(shutdown).await })
555    }
556
557    /// log all flow errors
558    pub async fn log_all_errors(&self) {
559        for (f_id, f_err) in self.flow_err_collectors.read().await.iter() {
560            let all_errors = f_err.get_all().await;
561            if !all_errors.is_empty() {
562                let all_errors = all_errors
563                    .into_iter()
564                    .map(|i| format!("{:?}", i))
565                    .join("\n");
566                common_telemetry::error!("Flow {} has following errors: {}", f_id, all_errors);
567            }
568        }
569    }
570
571    /// Trigger dataflow running, and then send writeback request to the source sender
572    ///
573    /// note that this method didn't handle input mirror request, as this should be handled by grpc server
574    pub async fn run(&self, mut shutdown: Option<broadcast::Receiver<()>>) {
575        debug!("Starting to run");
576        let default_interval = Duration::from_secs(1);
577        let mut tick_interval = tokio::time::interval(default_interval);
578        // burst mode, so that if we miss a tick, we will run immediately to fully utilize the cpu
579        tick_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Burst);
580        let mut avg_spd = 0; // rows/sec
581        let mut since_last_run = tokio::time::Instant::now();
582        let run_per_trace = 10;
583        let mut run_cnt = 0;
584        loop {
585            // TODO(discord9): only run when new inputs arrive or scheduled to
586            let row_cnt = self.run_available(false).await.unwrap_or_else(|err| {
587                common_telemetry::error!(err;"Run available errors");
588                0
589            });
590
591            if let Err(err) = self.send_writeback_requests().await {
592                common_telemetry::error!(err;"Send writeback request errors");
593            };
594            self.log_all_errors().await;
595
596            // determine if need to shutdown
597            match &shutdown.as_mut().map(|s| s.try_recv()) {
598                Some(Ok(())) => {
599                    info!("Shutdown flow's main loop");
600                    break;
601                }
602                Some(Err(TryRecvError::Empty)) => (),
603                Some(Err(TryRecvError::Closed)) => {
604                    common_telemetry::error!("Shutdown channel is closed");
605                    break;
606                }
607                Some(Err(TryRecvError::Lagged(num))) => {
608                    common_telemetry::error!(
609                        "Shutdown channel is lagged by {}, meaning multiple shutdown cmd have been issued",
610                        num
611                    );
612                    break;
613                }
614                None => (),
615            }
616
617            // for now we want to batch rows until there is around `BATCH_SIZE` rows in send buf
618            // before trigger a run of flow's worker
619            let wait_for = since_last_run.elapsed();
620
621            // last runs insert speed
622            let cur_spd = row_cnt * 1000 / wait_for.as_millis().max(1) as usize;
623            // rapid increase, slow decay
624            avg_spd = if cur_spd > avg_spd {
625                cur_spd
626            } else {
627                (9 * avg_spd + cur_spd) / 10
628            };
629            let new_wait = BATCH_SIZE * 1000 / avg_spd.max(1); //in ms
630            let new_wait = Duration::from_millis(new_wait as u64).min(default_interval);
631
632            // print trace every `run_per_trace` times so that we can see if there is something wrong
633            // but also not get flooded with trace
634            if run_cnt >= run_per_trace {
635                trace!("avg_spd={} r/s, cur_spd={} r/s", avg_spd, cur_spd);
636                trace!("Wait for {} ms, row_cnt={}", new_wait.as_millis(), row_cnt);
637                run_cnt = 0;
638            } else {
639                run_cnt += 1;
640            }
641
642            METRIC_FLOW_RUN_INTERVAL_MS.set(new_wait.as_millis() as i64);
643            since_last_run = tokio::time::Instant::now();
644            tokio::select! {
645                _ = tick_interval.tick() => (),
646                _ = tokio::time::sleep(new_wait) => ()
647            }
648        }
649        // flow is now shutdown, drop frontend_invoker early so a ref cycle(in standalone mode) can be prevent:
650        // FlowWorkerManager.frontend_invoker -> FrontendInvoker.inserter
651        // -> Inserter.node_manager -> NodeManager.flownode -> Flownode.flow_streaming_engine.frontend_invoker
652        self.frontend_invoker.write().await.take();
653    }
654
655    /// Run all available subgraph in the flow node
656    /// This will try to run all dataflow in this node
657    ///
658    /// set `blocking` to true to wait until worker finish running
659    /// false to just trigger run and return immediately
660    /// return numbers of rows send to worker(Inaccuary)
661    /// TODO(discord9): add flag for subgraph that have input since last run
662    pub async fn run_available(&self, blocking: bool) -> Result<usize, Error> {
663        let mut row_cnt = 0;
664
665        let now = self.tick_manager.tick();
666        for worker in self.worker_handles.iter() {
667            // TODO(discord9): consider how to handle error in individual worker
668            worker.run_available(now, blocking).await?;
669        }
670        // check row send and rows remain in send buf
671        let flush_res = if blocking {
672            let ctx = self.node_context.read().await;
673            ctx.flush_all_sender().await
674        } else {
675            match self.node_context.try_read() {
676                Ok(ctx) => ctx.flush_all_sender().await,
677                Err(_) => return Ok(row_cnt),
678            }
679        };
680        match flush_res {
681            Ok(r) => {
682                common_telemetry::trace!("Total flushed {} rows", r);
683                row_cnt += r;
684            }
685            Err(err) => {
686                common_telemetry::error!("Flush send buf errors: {:?}", err);
687            }
688        };
689
690        Ok(row_cnt)
691    }
692
693    /// send write request to related source sender
694    pub async fn handle_write_request(
695        &self,
696        region_id: RegionId,
697        rows: Vec<DiffRow>,
698        batch_datatypes: &[ConcreteDataType],
699    ) -> Result<(), Error> {
700        let rows_len = rows.len();
701        let table_id = region_id.table_id();
702        let _timer = METRIC_FLOW_INSERT_ELAPSED
703            .with_label_values(&[table_id.to_string().as_str()])
704            .start_timer();
705        self.node_context
706            .read()
707            .await
708            .send(table_id, rows, batch_datatypes)
709            .await?;
710        trace!(
711            "Handling write request for table_id={} with {} rows",
712            table_id, rows_len
713        );
714        Ok(())
715    }
716}
717
718/// Create&Remove flow
719impl StreamingEngine {
720    /// remove a flow by it's id
721    pub async fn remove_flow_inner(&self, flow_id: FlowId) -> Result<(), Error> {
722        for handle in self.worker_handles.iter() {
723            if handle.contains_flow(flow_id).await? {
724                handle.remove_flow(flow_id).await?;
725                break;
726            }
727        }
728        self.node_context.write().await.remove_flow(flow_id);
729        Ok(())
730    }
731
732    /// Return task id if a new task is created, otherwise return None
733    ///
734    /// steps to create task:
735    /// 1. parse query into typed plan(and optional parse expire_after expr)
736    /// 2. render source/sink with output table id and used input table id
737    pub async fn create_flow_inner(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
738        let CreateFlowArgs {
739            flow_id,
740            sink_table_name,
741            source_table_ids,
742            create_if_not_exists,
743            or_replace,
744            expire_after,
745            eval_interval: _,
746            comment,
747            sql,
748            flow_options,
749            query_ctx,
750        } = args;
751
752        let mut node_ctx = self.node_context.write().await;
753        // assign global id to source and sink table
754        for source in &source_table_ids {
755            node_ctx
756                .assign_global_id_to_table(&self.table_info_source, None, Some(*source))
757                .await?;
758        }
759        node_ctx
760            .assign_global_id_to_table(&self.table_info_source, Some(sink_table_name.clone()), None)
761            .await?;
762
763        node_ctx.register_task_src_sink(flow_id, &source_table_ids, sink_table_name.clone());
764
765        node_ctx.query_context = query_ctx.map(Arc::new);
766        // construct a active dataflow state with it
767        let flow_plan = sql_to_flow_plan(&mut node_ctx, &self.query_engine, &sql).await?;
768
769        debug!("Flow {:?}'s Plan is {:?}", flow_id, flow_plan);
770
771        // check schema against actual table schema if exists
772        // if not exist create sink table immediately
773        if let Some((_, _, real_schema)) = self.fetch_table_pk_schema(&sink_table_name).await? {
774            let auto_schema = relation_desc_to_column_schemas_with_fallback(&flow_plan.schema);
775
776            // for column schema, only `data_type` need to be check for equality
777            // since one can omit flow's column name when write flow query
778            // print a user friendly error message about mismatch and how to correct them
779            for (idx, zipped) in auto_schema
780                .iter()
781                .zip_longest(real_schema.iter())
782                .enumerate()
783            {
784                match zipped {
785                    EitherOrBoth::Both(auto, real) => {
786                        if auto.data_type != real.data_type {
787                            InvalidQuerySnafu {
788                                    reason: format!(
789                                        "Column {}(name is '{}', flow inferred name is '{}')'s data type mismatch, expect {:?} got {:?}",
790                                        idx,
791                                        real.name,
792                                        auto.name,
793                                        real.data_type,
794                                        auto.data_type
795                                    ),
796                                }
797                                .fail()?;
798                        }
799                    }
800                    EitherOrBoth::Right(real) if real.data_type.is_timestamp() => {
801                        // if table is auto created, the last one or two column should be timestamp(update at and ts placeholder)
802                        continue;
803                    }
804                    _ => InvalidQuerySnafu {
805                        reason: format!(
806                            "schema length mismatched, expected {} found {}",
807                            real_schema.len(),
808                            auto_schema.len()
809                        ),
810                    }
811                    .fail()?,
812                }
813            }
814        } else {
815            // assign inferred schema to sink table
816            // create sink table
817            let did_create = self
818                .create_table_from_relation(
819                    &format!("flow-id={flow_id}"),
820                    &sink_table_name,
821                    &flow_plan.schema,
822                )
823                .await?;
824            if !did_create {
825                UnexpectedSnafu {
826                    reason: format!("Failed to create table {:?}", sink_table_name),
827                }
828                .fail()?;
829            }
830        }
831
832        node_ctx.add_flow_plan(flow_id, flow_plan.clone());
833
834        let _ = comment;
835        let _ = flow_options;
836
837        // TODO(discord9): add more than one handles
838        let sink_id = node_ctx.table_repr.get_by_name(&sink_table_name).unwrap().1;
839        let sink_sender = node_ctx.get_sink_by_global_id(&sink_id)?;
840
841        let source_ids = source_table_ids
842            .iter()
843            .map(|id| node_ctx.table_repr.get_by_table_id(id).unwrap().1)
844            .collect_vec();
845        let source_receivers = source_ids
846            .iter()
847            .map(|id| {
848                node_ctx
849                    .get_source_by_global_id(id)
850                    .map(|s| s.get_receiver())
851            })
852            .collect::<Result<Vec<_>, _>>()?;
853        let err_collector = ErrCollector::default();
854        self.flow_err_collectors
855            .write()
856            .await
857            .insert(flow_id, err_collector.clone());
858        // TODO(discord9): load balance?
859        let handle = self.get_worker_handle_for_create_flow().await;
860        let create_request = worker::Request::Create {
861            flow_id,
862            plan: flow_plan,
863            sink_id,
864            sink_sender,
865            source_ids,
866            src_recvs: source_receivers,
867            expire_after,
868            or_replace,
869            create_if_not_exists,
870            err_collector,
871        };
872
873        handle.create_flow(create_request).await?;
874        info!("Successfully create flow with id={}", flow_id);
875        Ok(Some(flow_id))
876    }
877
878    pub async fn flush_flow_inner(&self, flow_id: FlowId) -> Result<usize, Error> {
879        debug!("Starting to flush flow_id={:?}", flow_id);
880        // lock to make sure writes before flush are written to flow
881        // and immediately drop to prevent following writes to be blocked
882        drop(self.flush_lock.write().await);
883        let flushed_input_rows = self.node_context.read().await.flush_all_sender().await?;
884        let rows_send = self.run_available(true).await?;
885        let row = self.send_writeback_requests().await?;
886        debug!(
887            "Done to flush flow_id={:?} with {} input rows flushed, {} rows sent and {} output rows flushed",
888            flow_id, flushed_input_rows, rows_send, row
889        );
890        Ok(row)
891    }
892
893    pub async fn flow_exist_inner(&self, flow_id: FlowId) -> Result<bool, Error> {
894        let mut exist = false;
895        for handle in self.worker_handles.iter() {
896            if handle.contains_flow(flow_id).await? {
897                exist = true;
898                break;
899            }
900        }
901        Ok(exist)
902    }
903}
904
905/// FlowTickManager is a manager for flow tick, which trakc flow execution progress
906///
907/// TODO(discord9): better way to do it, and not expose flow tick even to other flow to avoid
908/// TSO coord mess
909#[derive(Clone, Debug)]
910pub struct FlowTickManager {
911    /// The starting instant of the flow, used with `start_timestamp` to calculate the current timestamp
912    start: Instant,
913    /// The timestamp when the flow started
914    start_timestamp: repr::Timestamp,
915}
916
917impl Default for FlowTickManager {
918    fn default() -> Self {
919        Self::new()
920    }
921}
922
923impl FlowTickManager {
924    pub fn new() -> Self {
925        FlowTickManager {
926            start: Instant::now(),
927            start_timestamp: SystemTime::now()
928                .duration_since(SystemTime::UNIX_EPOCH)
929                .unwrap()
930                .as_millis() as repr::Timestamp,
931        }
932    }
933
934    /// Return the current timestamp in milliseconds
935    ///
936    /// TODO(discord9): reconsider since `tick()` require a monotonic clock and also need to survive recover later
937    pub fn tick(&self) -> repr::Timestamp {
938        let current = Instant::now();
939        let since_the_epoch = current - self.start;
940        since_the_epoch.as_millis() as repr::Timestamp + self.start_timestamp
941    }
942}