Skip to main content

flow/
adapter.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! for getting data from source and sending results to sink
16//! and communicating with other parts of the database
17#![warn(unused_imports)]
18
19use std::collections::BTreeMap;
20use std::sync::Arc;
21use std::time::{Duration, Instant, SystemTime};
22
23use api::v1::{RowDeleteRequest, RowDeleteRequests, RowInsertRequest, RowInsertRequests};
24use common_base::memory_limit::MemoryLimit;
25use common_config::Configurable;
26use common_error::ext::BoxedError;
27use common_meta::key::TableMetadataManagerRef;
28use common_options::memory::MemoryOptions;
29use common_runtime::JoinHandle;
30use common_stat::get_total_cpu_cores;
31use common_telemetry::logging::{LoggingOptions, TracingOptions};
32use common_telemetry::{debug, info, trace};
33use datatypes::schema::ColumnSchema;
34use datatypes::value::Value;
35use greptime_proto::v1;
36use itertools::{EitherOrBoth, Itertools};
37use meta_client::MetaClientOptions;
38use query::QueryEngine;
39use query::options::QueryOptions;
40use serde::{Deserialize, Serialize};
41use servers::grpc::GrpcOptions;
42use servers::http::HttpOptions;
43use session::context::QueryContext;
44use snafu::{OptionExt, ResultExt, ensure};
45use store_api::storage::{ConcreteDataType, RegionId};
46use table::metadata::TableId;
47use tokio::sync::broadcast::error::TryRecvError;
48use tokio::sync::{Mutex, RwLock, broadcast, watch};
49
50pub(crate) use crate::adapter::node_context::FlownodeContext;
51use crate::adapter::refill::RefillTask;
52use crate::adapter::table_source::ManagedTableSource;
53use crate::adapter::util::relation_desc_to_column_schemas_with_fallback;
54pub(crate) use crate::adapter::worker::{Worker, WorkerHandle, create_worker};
55use crate::batching_mode::BatchingModeOptions;
56use crate::compute::ErrCollector;
57use crate::df_optimizer::sql_to_flow_plan;
58use crate::error::{EvalSnafu, ExternalSnafu, InternalSnafu, InvalidQuerySnafu, UnexpectedSnafu};
59use crate::expr::Batch;
60use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_ROWS, METRIC_FLOW_RUN_INTERVAL_MS};
61use crate::repr::{self, BATCH_SIZE, DiffRow, RelationDesc, Row};
62use crate::{CreateFlowArgs, FlowId, TableName};
63
64pub(crate) mod flownode_impl;
65mod parse_expr;
66pub(crate) mod refill;
67mod stat;
68#[cfg(test)]
69mod tests;
70pub(crate) mod util;
71mod worker;
72
73pub(crate) mod node_context;
74pub(crate) mod table_source;
75
76use crate::FrontendInvoker;
77use crate::error::Error;
78
79// `GREPTIME_TIMESTAMP` is not used to distinguish when table is created automatically by flow
80pub const AUTO_CREATED_PLACEHOLDER_TS_COL: &str = "__ts_placeholder";
81
82pub const AUTO_CREATED_UPDATE_AT_TS_COL: &str = "update_at";
83
84/// Flow config that exists both in standalone&distributed mode
85#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
86#[serde(default)]
87pub struct FlowConfig {
88    pub num_workers: usize,
89    pub batching_mode: BatchingModeOptions,
90}
91
92impl Default for FlowConfig {
93    fn default() -> Self {
94        Self {
95            num_workers: (get_total_cpu_cores() / 2).max(1),
96            batching_mode: BatchingModeOptions::default(),
97        }
98    }
99}
100
101/// Options for flow node
102#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
103#[serde(default)]
104pub struct FlownodeOptions {
105    pub node_id: Option<u64>,
106    pub flow: FlowConfig,
107    pub grpc: GrpcOptions,
108    pub http: HttpOptions,
109    pub meta_client: Option<MetaClientOptions>,
110    pub logging: LoggingOptions,
111    pub tracing: TracingOptions,
112    pub query: QueryOptions,
113    pub memory: MemoryOptions,
114}
115
116impl Default for FlownodeOptions {
117    fn default() -> Self {
118        Self {
119            node_id: None,
120            flow: FlowConfig::default(),
121            grpc: GrpcOptions::default().with_bind_addr("127.0.0.1:3004"),
122            http: HttpOptions::default(),
123            meta_client: None,
124            logging: LoggingOptions::default(),
125            tracing: TracingOptions::default(),
126            // flownode's query option is set to 1 to throttle flow's query so
127            // that it won't use too much cpu or memory
128            query: QueryOptions {
129                parallelism: 1,
130                allow_query_fallback: false,
131                memory_pool_size: MemoryLimit::default(),
132                enable_per_region_metrics: false,
133            },
134            memory: MemoryOptions::default(),
135        }
136    }
137}
138
139impl Configurable for FlownodeOptions {
140    fn validate_sanitize(&mut self) -> common_config::error::Result<()> {
141        if self.flow.num_workers == 0 {
142            self.flow.num_workers = (get_total_cpu_cores() / 2).max(1);
143        }
144        Ok(())
145    }
146}
147
148/// Arc-ed FlowNodeManager, cheaper to clone
149pub type FlowStreamingEngineRef = Arc<StreamingEngine>;
150
151/// FlowNodeManager manages the state of all tasks in the flow node, which should be run on the same thread
152///
153/// The choice of timestamp is just using current system timestamp for now
154///
155pub struct StreamingEngine {
156    /// The handler to the worker that will run the dataflow
157    /// which is `!Send` so a handle is used
158    pub worker_handles: Vec<WorkerHandle>,
159    /// The selector to select a worker to run the dataflow
160    worker_selector: Mutex<usize>,
161    /// The query engine that will be used to parse the query and convert it to a dataflow plan
162    pub query_engine: Arc<dyn QueryEngine>,
163    /// Getting table name and table schema from table info manager
164    table_info_source: ManagedTableSource,
165    frontend_invoker: RwLock<Option<FrontendInvoker>>,
166    /// contains mapping from table name to global id, and table schema
167    node_context: RwLock<FlownodeContext>,
168    /// Contains all refill tasks
169    refill_tasks: RwLock<BTreeMap<FlowId, RefillTask>>,
170    flow_err_collectors: RwLock<BTreeMap<FlowId, ErrCollector>>,
171    src_send_buf_lens: RwLock<BTreeMap<TableId, watch::Receiver<usize>>>,
172    tick_manager: FlowTickManager,
173    /// This node id is only available in distributed mode, on standalone mode this is guaranteed to be `None`
174    pub node_id: Option<u32>,
175    /// Lock for flushing, will be `read` by `handle_inserts` and `write` by `flush_flow`
176    ///
177    /// So that a series of event like `inserts -> flush` can be handled correctly
178    flush_lock: RwLock<()>,
179}
180
181/// Building FlownodeManager
182impl StreamingEngine {
183    /// set frontend invoker
184    pub async fn set_frontend_invoker(&self, frontend: FrontendInvoker) {
185        *self.frontend_invoker.write().await = Some(frontend);
186    }
187
188    /// Create **without** setting `frontend_invoker`
189    pub fn new(
190        node_id: Option<u32>,
191        query_engine: Arc<dyn QueryEngine>,
192        table_meta: TableMetadataManagerRef,
193    ) -> Self {
194        let srv_map = ManagedTableSource::new(
195            table_meta.table_info_manager().clone(),
196            table_meta.table_name_manager().clone(),
197        );
198        let node_context = FlownodeContext::new(Box::new(srv_map.clone()) as _);
199        let tick_manager = FlowTickManager::new();
200        let worker_handles = Vec::new();
201        StreamingEngine {
202            worker_handles,
203            worker_selector: Mutex::new(0),
204            query_engine,
205            table_info_source: srv_map,
206            frontend_invoker: RwLock::new(None),
207            node_context: RwLock::new(node_context),
208            refill_tasks: Default::default(),
209            flow_err_collectors: Default::default(),
210            src_send_buf_lens: Default::default(),
211            tick_manager,
212            node_id,
213            flush_lock: RwLock::new(()),
214        }
215    }
216
217    /// Create a flownode manager with one worker
218    pub fn new_with_workers<'s>(
219        node_id: Option<u32>,
220        query_engine: Arc<dyn QueryEngine>,
221        table_meta: TableMetadataManagerRef,
222        num_workers: usize,
223    ) -> (Self, Vec<Worker<'s>>) {
224        let mut zelf = Self::new(node_id, query_engine, table_meta);
225
226        let workers: Vec<_> = (0..num_workers)
227            .map(|_| {
228                let (handle, worker) = create_worker();
229                zelf.add_worker_handle(handle);
230                worker
231            })
232            .collect();
233        (zelf, workers)
234    }
235
236    /// add a worker handler to manager, meaning this corresponding worker is under it's manage
237    pub fn add_worker_handle(&mut self, handle: WorkerHandle) {
238        self.worker_handles.push(handle);
239    }
240}
241
242#[derive(Debug)]
243pub enum DiffRequest {
244    Insert(Vec<(Row, repr::Timestamp)>),
245    Delete(Vec<(Row, repr::Timestamp)>),
246}
247
248impl DiffRequest {
249    pub fn len(&self) -> usize {
250        match self {
251            Self::Insert(v) => v.len(),
252            Self::Delete(v) => v.len(),
253        }
254    }
255
256    pub fn is_empty(&self) -> bool {
257        self.len() == 0
258    }
259}
260
261pub fn batches_to_rows_req(batches: Vec<Batch>) -> Result<Vec<DiffRequest>, Error> {
262    let mut reqs = Vec::new();
263    for batch in batches {
264        let mut rows = Vec::with_capacity(batch.row_count());
265        for i in 0..batch.row_count() {
266            let row = batch.get_row(i).context(EvalSnafu)?;
267            rows.push((Row::new(row), 0));
268        }
269        reqs.push(DiffRequest::Insert(rows));
270    }
271    Ok(reqs)
272}
273
274/// This impl block contains methods to send writeback requests to frontend
275impl StreamingEngine {
276    /// Return the number of requests it made
277    pub async fn send_writeback_requests(&self) -> Result<usize, Error> {
278        let all_reqs = self.generate_writeback_request().await?;
279        if all_reqs.is_empty() || all_reqs.iter().all(|v| v.1.is_empty()) {
280            return Ok(0);
281        }
282        let mut req_cnt = 0;
283        for (table_name, reqs) in all_reqs {
284            if reqs.is_empty() {
285                continue;
286            }
287            let (catalog, schema) = (table_name[0].clone(), table_name[1].clone());
288            let ctx = Arc::new(QueryContext::with(&catalog, &schema));
289
290            let (is_ts_placeholder, proto_schema) = match self
291                .try_fetch_existing_table(&table_name)
292                .await?
293                .context(UnexpectedSnafu {
294                    reason: format!("Table not found: {}", table_name.join(".")),
295                }) {
296                Ok(r) => r,
297                Err(e) => {
298                    if self
299                        .table_info_source
300                        .get_opt_table_id_from_name(&table_name)
301                        .await?
302                        .is_none()
303                    {
304                        // deal with both flow&sink table no longer exists
305                        // but some output is still in output buf
306                        common_telemetry::warn!(e; "Table `{}` no longer exists, skip writeback", table_name.join("."));
307                        continue;
308                    } else {
309                        return Err(e);
310                    }
311                }
312            };
313            let schema_len = proto_schema.len();
314
315            let total_rows = reqs.iter().map(|r| r.len()).sum::<usize>();
316            trace!(
317                "Sending {} writeback requests to table {}, reqs total rows={}",
318                reqs.len(),
319                table_name.join("."),
320                reqs.iter().map(|r| r.len()).sum::<usize>()
321            );
322
323            METRIC_FLOW_ROWS
324                .with_label_values(&["out-streaming"])
325                .inc_by(total_rows as u64);
326
327            let now = self.tick_manager.tick();
328            for req in reqs {
329                match req {
330                    DiffRequest::Insert(insert) => {
331                        let rows_proto: Vec<v1::Row> = insert
332                            .into_iter()
333                            .map(|(mut row, _ts)| {
334                                // extend `update_at` col if needed
335                                // if schema include a millisecond timestamp here, and result row doesn't have it, add it
336                                if row.len() < proto_schema.len()
337                                    && proto_schema[row.len()].datatype
338                                        == greptime_proto::v1::ColumnDataType::TimestampMillisecond
339                                            as i32
340                                {
341                                    row.extend([Value::from(
342                                        common_time::Timestamp::new_millisecond(now),
343                                    )]);
344                                }
345                                // ts col, if auto create
346                                if is_ts_placeholder {
347                                    ensure!(
348                                        row.len() == schema_len - 1,
349                                        InternalSnafu {
350                                            reason: format!(
351                                                "Row len mismatch, expect {} got {}",
352                                                schema_len - 1,
353                                                row.len()
354                                            )
355                                        }
356                                    );
357                                    row.extend([Value::from(
358                                        common_time::Timestamp::new_millisecond(0),
359                                    )]);
360                                }
361                                if row.len() != proto_schema.len() {
362                                    UnexpectedSnafu {
363                                        reason: format!(
364                                            "Flow output row length mismatch, expect {} got {}, the columns in schema are: {:?}",
365                                            proto_schema.len(),
366                                            row.len(),
367                                            proto_schema.iter().map(|c|&c.column_name).collect_vec()
368                                        ),
369                                    }
370                                    .fail()?;
371                                }
372                                Ok(row.into())
373                            })
374                            .collect::<Result<Vec<_>, Error>>()?;
375                        let table_name = table_name.last().unwrap().clone();
376                        let req = RowInsertRequest {
377                            table_name,
378                            rows: Some(v1::Rows {
379                                schema: proto_schema.clone(),
380                                rows: rows_proto,
381                            }),
382                        };
383                        req_cnt += 1;
384                        self.frontend_invoker
385                            .read()
386                            .await
387                            .as_ref()
388                            .with_context(|| UnexpectedSnafu {
389                                reason: "Expect a frontend invoker for flownode to write back",
390                            })?
391                            .row_inserts(RowInsertRequests { inserts: vec![req] }, ctx.clone())
392                            .await
393                            .map_err(BoxedError::new)
394                            .with_context(|_| ExternalSnafu {})?;
395                    }
396                    DiffRequest::Delete(remove) => {
397                        info!("original remove rows={:?}", remove);
398                        let rows_proto: Vec<v1::Row> = remove
399                            .into_iter()
400                            .map(|(mut row, _ts)| {
401                                row.extend(Some(Value::from(
402                                    common_time::Timestamp::new_millisecond(0),
403                                )));
404                                row.into()
405                            })
406                            .collect::<Vec<_>>();
407                        let table_name = table_name.last().unwrap().clone();
408                        let req = RowDeleteRequest {
409                            table_name,
410                            rows: Some(v1::Rows {
411                                schema: proto_schema.clone(),
412                                rows: rows_proto,
413                            }),
414                        };
415
416                        req_cnt += 1;
417                        self.frontend_invoker
418                            .read()
419                            .await
420                            .as_ref()
421                            .with_context(|| UnexpectedSnafu {
422                                reason: "Expect a frontend invoker for flownode to write back",
423                            })?
424                            .row_deletes(RowDeleteRequests { deletes: vec![req] }, ctx.clone())
425                            .await
426                            .map_err(BoxedError::new)
427                            .with_context(|_| ExternalSnafu {})?;
428                    }
429                }
430            }
431        }
432        Ok(req_cnt)
433    }
434
435    /// Generate writeback request for all sink table
436    pub async fn generate_writeback_request(
437        &self,
438    ) -> Result<BTreeMap<TableName, Vec<DiffRequest>>, Error> {
439        trace!("Start to generate writeback request");
440        let mut output = BTreeMap::new();
441        let mut total_row_count = 0;
442        for (name, sink_recv) in self
443            .node_context
444            .write()
445            .await
446            .sink_receiver
447            .iter_mut()
448            .map(|(n, (_s, r))| (n, r))
449        {
450            let mut batches = Vec::new();
451            while let Ok(batch) = sink_recv.try_recv() {
452                total_row_count += batch.row_count();
453                batches.push(batch);
454            }
455            let reqs = batches_to_rows_req(batches)?;
456            output.insert(name.clone(), reqs);
457        }
458        trace!("Prepare writeback req: total row count={}", total_row_count);
459        Ok(output)
460    }
461
462    /// Fetch table schema and primary key from table info source, if table not exist return None
463    async fn fetch_table_pk_schema(
464        &self,
465        table_name: &TableName,
466    ) -> Result<Option<(Vec<String>, Option<usize>, Vec<ColumnSchema>)>, Error> {
467        if let Some(table_id) = self
468            .table_info_source
469            .get_opt_table_id_from_name(table_name)
470            .await?
471        {
472            let table_info = self
473                .table_info_source
474                .get_table_info_value(&table_id)
475                .await?
476                .unwrap();
477            let meta = table_info.table_info.meta;
478            let schema = meta.schema.column_schemas().to_vec();
479            let primary_keys = meta
480                .primary_key_indices
481                .into_iter()
482                .map(|i| schema[i].name.clone())
483                .collect_vec();
484            let time_index = meta.schema.timestamp_index();
485            Ok(Some((primary_keys, time_index, schema)))
486        } else {
487            Ok(None)
488        }
489    }
490
491    /// return (primary keys, schema and if the table have a placeholder timestamp column)
492    /// schema of the table comes from flow's output plan
493    ///
494    /// adjust to add `update_at` column and ts placeholder if needed
495    async fn adjust_auto_created_table_schema(
496        &self,
497        schema: &RelationDesc,
498    ) -> Result<(Vec<String>, Vec<ColumnSchema>, bool), Error> {
499        // TODO(discord9): consider remove buggy auto create by schema
500
501        // TODO(discord9): use default key from schema
502        let primary_keys = schema
503            .typ()
504            .keys
505            .first()
506            .map(|v| {
507                v.column_indices
508                    .iter()
509                    .map(|i| {
510                        schema
511                            .get_name(*i)
512                            .clone()
513                            .unwrap_or_else(|| format!("col_{i}"))
514                    })
515                    .collect_vec()
516            })
517            .unwrap_or_default();
518        let update_at = ColumnSchema::new(
519            AUTO_CREATED_UPDATE_AT_TS_COL,
520            ConcreteDataType::timestamp_millisecond_datatype(),
521            true,
522        );
523
524        let original_schema = relation_desc_to_column_schemas_with_fallback(schema);
525
526        let mut with_auto_added_col = original_schema.clone();
527        with_auto_added_col.push(update_at);
528
529        // if no time index, add one as placeholder
530        let no_time_index = schema.typ().time_index.is_none();
531        if no_time_index {
532            let ts_col = ColumnSchema::new(
533                AUTO_CREATED_PLACEHOLDER_TS_COL,
534                ConcreteDataType::timestamp_millisecond_datatype(),
535                true,
536            )
537            .with_time_index(true);
538            with_auto_added_col.push(ts_col);
539        }
540
541        Ok((primary_keys, with_auto_added_col, no_time_index))
542    }
543}
544
545/// Flow Runtime related methods
546impl StreamingEngine {
547    /// run in common_runtime background runtime
548    pub fn run_background(
549        self: Arc<Self>,
550        shutdown: Option<broadcast::Receiver<()>>,
551    ) -> JoinHandle<()> {
552        info!("Starting flownode manager's background task");
553        common_runtime::spawn_global(async move { self.run(shutdown).await })
554    }
555
556    /// log all flow errors
557    pub async fn log_all_errors(&self) {
558        for (f_id, f_err) in self.flow_err_collectors.read().await.iter() {
559            let all_errors = f_err.get_all().await;
560            if !all_errors.is_empty() {
561                let all_errors = all_errors
562                    .into_iter()
563                    .map(|i| format!("{:?}", i))
564                    .join("\n");
565                common_telemetry::error!("Flow {} has following errors: {}", f_id, all_errors);
566            }
567        }
568    }
569
570    /// Trigger dataflow running, and then send writeback request to the source sender
571    ///
572    /// note that this method didn't handle input mirror request, as this should be handled by grpc server
573    pub async fn run(&self, mut shutdown: Option<broadcast::Receiver<()>>) {
574        debug!("Starting to run");
575        let default_interval = Duration::from_secs(1);
576        let mut tick_interval = tokio::time::interval(default_interval);
577        // burst mode, so that if we miss a tick, we will run immediately to fully utilize the cpu
578        tick_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Burst);
579        let mut avg_spd = 0; // rows/sec
580        let mut since_last_run = tokio::time::Instant::now();
581        let run_per_trace = 10;
582        let mut run_cnt = 0;
583        loop {
584            // TODO(discord9): only run when new inputs arrive or scheduled to
585            let row_cnt = self.run_available(false).await.unwrap_or_else(|err| {
586                common_telemetry::error!(err;"Run available errors");
587                0
588            });
589
590            if let Err(err) = self.send_writeback_requests().await {
591                common_telemetry::error!(err;"Send writeback request errors");
592            };
593            self.log_all_errors().await;
594
595            // determine if need to shutdown
596            match &shutdown.as_mut().map(|s| s.try_recv()) {
597                Some(Ok(())) => {
598                    info!("Shutdown flow's main loop");
599                    break;
600                }
601                Some(Err(TryRecvError::Empty)) => (),
602                Some(Err(TryRecvError::Closed)) => {
603                    common_telemetry::error!("Shutdown channel is closed");
604                    break;
605                }
606                Some(Err(TryRecvError::Lagged(num))) => {
607                    common_telemetry::error!(
608                        "Shutdown channel is lagged by {}, meaning multiple shutdown cmd have been issued",
609                        num
610                    );
611                    break;
612                }
613                None => (),
614            }
615
616            // for now we want to batch rows until there is around `BATCH_SIZE` rows in send buf
617            // before trigger a run of flow's worker
618            let wait_for = since_last_run.elapsed();
619
620            // last runs insert speed
621            let cur_spd = row_cnt * 1000 / wait_for.as_millis().max(1) as usize;
622            // rapid increase, slow decay
623            avg_spd = if cur_spd > avg_spd {
624                cur_spd
625            } else {
626                (9 * avg_spd + cur_spd) / 10
627            };
628            let new_wait = BATCH_SIZE * 1000 / avg_spd.max(1); //in ms
629            let new_wait = Duration::from_millis(new_wait as u64).min(default_interval);
630
631            // print trace every `run_per_trace` times so that we can see if there is something wrong
632            // but also not get flooded with trace
633            if run_cnt >= run_per_trace {
634                trace!("avg_spd={} r/s, cur_spd={} r/s", avg_spd, cur_spd);
635                trace!("Wait for {} ms, row_cnt={}", new_wait.as_millis(), row_cnt);
636                run_cnt = 0;
637            } else {
638                run_cnt += 1;
639            }
640
641            METRIC_FLOW_RUN_INTERVAL_MS.set(new_wait.as_millis() as i64);
642            since_last_run = tokio::time::Instant::now();
643            tokio::select! {
644                _ = tick_interval.tick() => (),
645                _ = tokio::time::sleep(new_wait) => ()
646            }
647        }
648        // flow is now shutdown, drop frontend_invoker early so a ref cycle(in standalone mode) can be prevent:
649        // FlowWorkerManager.frontend_invoker -> FrontendInvoker.inserter
650        // -> Inserter.node_manager -> NodeManager.flownode -> Flownode.flow_streaming_engine.frontend_invoker
651        self.frontend_invoker.write().await.take();
652    }
653
654    /// Run all available subgraph in the flow node
655    /// This will try to run all dataflow in this node
656    ///
657    /// set `blocking` to true to wait until worker finish running
658    /// false to just trigger run and return immediately
659    /// return numbers of rows send to worker(Inaccuary)
660    /// TODO(discord9): add flag for subgraph that have input since last run
661    pub async fn run_available(&self, blocking: bool) -> Result<usize, Error> {
662        let mut row_cnt = 0;
663
664        let now = self.tick_manager.tick();
665        for worker in self.worker_handles.iter() {
666            // TODO(discord9): consider how to handle error in individual worker
667            worker.run_available(now, blocking).await?;
668        }
669        // check row send and rows remain in send buf
670        let flush_res = if blocking {
671            let ctx = self.node_context.read().await;
672            ctx.flush_all_sender().await
673        } else {
674            match self.node_context.try_read() {
675                Ok(ctx) => ctx.flush_all_sender().await,
676                Err(_) => return Ok(row_cnt),
677            }
678        };
679        match flush_res {
680            Ok(r) => {
681                common_telemetry::trace!("Total flushed {} rows", r);
682                row_cnt += r;
683            }
684            Err(err) => {
685                common_telemetry::error!("Flush send buf errors: {:?}", err);
686            }
687        };
688
689        Ok(row_cnt)
690    }
691
692    /// send write request to related source sender
693    pub async fn handle_write_request(
694        &self,
695        region_id: RegionId,
696        rows: Vec<DiffRow>,
697        batch_datatypes: &[ConcreteDataType],
698    ) -> Result<(), Error> {
699        let rows_len = rows.len();
700        let table_id = region_id.table_id();
701        let _timer = METRIC_FLOW_INSERT_ELAPSED
702            .with_label_values(&[table_id.to_string().as_str()])
703            .start_timer();
704        self.node_context
705            .read()
706            .await
707            .send(table_id, rows, batch_datatypes)
708            .await?;
709        trace!(
710            "Handling write request for table_id={} with {} rows",
711            table_id, rows_len
712        );
713        Ok(())
714    }
715}
716
717/// Create&Remove flow
718impl StreamingEngine {
719    /// remove a flow by it's id
720    pub async fn remove_flow_inner(&self, flow_id: FlowId) -> Result<(), Error> {
721        for handle in self.worker_handles.iter() {
722            if handle.contains_flow(flow_id).await? {
723                handle.remove_flow(flow_id).await?;
724                break;
725            }
726        }
727        self.node_context.write().await.remove_flow(flow_id);
728        Ok(())
729    }
730
731    /// Return task id if a new task is created, otherwise return None
732    ///
733    /// steps to create task:
734    /// 1. parse query into typed plan(and optional parse expire_after expr)
735    /// 2. render source/sink with output table id and used input table id
736    pub async fn create_flow_inner(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
737        let CreateFlowArgs {
738            flow_id,
739            sink_table_name,
740            source_table_ids,
741            create_if_not_exists,
742            or_replace,
743            expire_after,
744            eval_interval: _,
745            comment,
746            sql,
747            flow_options,
748            query_ctx,
749        } = args;
750
751        let mut node_ctx = self.node_context.write().await;
752        // assign global id to source and sink table
753        for source in &source_table_ids {
754            node_ctx
755                .assign_global_id_to_table(&self.table_info_source, None, Some(*source))
756                .await?;
757        }
758        node_ctx
759            .assign_global_id_to_table(&self.table_info_source, Some(sink_table_name.clone()), None)
760            .await?;
761
762        node_ctx.register_task_src_sink(flow_id, &source_table_ids, sink_table_name.clone());
763
764        node_ctx.query_context = query_ctx.map(Arc::new);
765        // construct a active dataflow state with it
766        let flow_plan = sql_to_flow_plan(&mut node_ctx, &self.query_engine, &sql).await?;
767
768        debug!("Flow {:?}'s Plan is {:?}", flow_id, flow_plan);
769
770        // check schema against actual table schema if exists
771        // if not exist create sink table immediately
772        if let Some((_, _, real_schema)) = self.fetch_table_pk_schema(&sink_table_name).await? {
773            let auto_schema = relation_desc_to_column_schemas_with_fallback(&flow_plan.schema);
774
775            // for column schema, only `data_type` need to be check for equality
776            // since one can omit flow's column name when write flow query
777            // print a user friendly error message about mismatch and how to correct them
778            for (idx, zipped) in auto_schema
779                .iter()
780                .zip_longest(real_schema.iter())
781                .enumerate()
782            {
783                match zipped {
784                    EitherOrBoth::Both(auto, real) => {
785                        if auto.data_type != real.data_type {
786                            InvalidQuerySnafu {
787                                    reason: format!(
788                                        "Column {}(name is '{}', flow inferred name is '{}')'s data type mismatch, expect {:?} got {:?}",
789                                        idx,
790                                        real.name,
791                                        auto.name,
792                                        real.data_type,
793                                        auto.data_type
794                                    ),
795                                }
796                                .fail()?;
797                        }
798                    }
799                    EitherOrBoth::Right(real) if real.data_type.is_timestamp() => {
800                        // if table is auto created, the last one or two column should be timestamp(update at and ts placeholder)
801                        continue;
802                    }
803                    _ => InvalidQuerySnafu {
804                        reason: format!(
805                            "schema length mismatched, expected {} found {}",
806                            real_schema.len(),
807                            auto_schema.len()
808                        ),
809                    }
810                    .fail()?,
811                }
812            }
813        } else {
814            // assign inferred schema to sink table
815            // create sink table
816            let did_create = self
817                .create_table_from_relation(
818                    &format!("flow-id={flow_id}"),
819                    &sink_table_name,
820                    &flow_plan.schema,
821                )
822                .await?;
823            if !did_create {
824                UnexpectedSnafu {
825                    reason: format!("Failed to create table {:?}", sink_table_name),
826                }
827                .fail()?;
828            }
829        }
830
831        node_ctx.add_flow_plan(flow_id, flow_plan.clone());
832
833        let _ = comment;
834        let _ = flow_options;
835
836        // TODO(discord9): add more than one handles
837        let sink_id = node_ctx.table_repr.get_by_name(&sink_table_name).unwrap().1;
838        let sink_sender = node_ctx.get_sink_by_global_id(&sink_id)?;
839
840        let source_ids = source_table_ids
841            .iter()
842            .map(|id| node_ctx.table_repr.get_by_table_id(id).unwrap().1)
843            .collect_vec();
844        let source_receivers = source_ids
845            .iter()
846            .map(|id| {
847                node_ctx
848                    .get_source_by_global_id(id)
849                    .map(|s| s.get_receiver())
850            })
851            .collect::<Result<Vec<_>, _>>()?;
852        let err_collector = ErrCollector::default();
853        self.flow_err_collectors
854            .write()
855            .await
856            .insert(flow_id, err_collector.clone());
857        // TODO(discord9): load balance?
858        let handle = self.get_worker_handle_for_create_flow().await;
859        let create_request = worker::Request::Create {
860            flow_id,
861            plan: flow_plan,
862            sink_id,
863            sink_sender,
864            source_ids,
865            src_recvs: source_receivers,
866            expire_after,
867            or_replace,
868            create_if_not_exists,
869            err_collector,
870        };
871
872        handle.create_flow(create_request).await?;
873        info!("Successfully create flow with id={}", flow_id);
874        Ok(Some(flow_id))
875    }
876
877    pub async fn flush_flow_inner(&self, flow_id: FlowId) -> Result<usize, Error> {
878        debug!("Starting to flush flow_id={:?}", flow_id);
879        // lock to make sure writes before flush are written to flow
880        // and immediately drop to prevent following writes to be blocked
881        drop(self.flush_lock.write().await);
882        let flushed_input_rows = self.node_context.read().await.flush_all_sender().await?;
883        let rows_send = self.run_available(true).await?;
884        let row = self.send_writeback_requests().await?;
885        debug!(
886            "Done to flush flow_id={:?} with {} input rows flushed, {} rows sent and {} output rows flushed",
887            flow_id, flushed_input_rows, rows_send, row
888        );
889        Ok(row)
890    }
891
892    pub async fn flow_exist_inner(&self, flow_id: FlowId) -> Result<bool, Error> {
893        let mut exist = false;
894        for handle in self.worker_handles.iter() {
895            if handle.contains_flow(flow_id).await? {
896                exist = true;
897                break;
898            }
899        }
900        Ok(exist)
901    }
902}
903
904/// FlowTickManager is a manager for flow tick, which trakc flow execution progress
905///
906/// TODO(discord9): better way to do it, and not expose flow tick even to other flow to avoid
907/// TSO coord mess
908#[derive(Clone, Debug)]
909pub struct FlowTickManager {
910    /// The starting instant of the flow, used with `start_timestamp` to calculate the current timestamp
911    start: Instant,
912    /// The timestamp when the flow started
913    start_timestamp: repr::Timestamp,
914}
915
916impl Default for FlowTickManager {
917    fn default() -> Self {
918        Self::new()
919    }
920}
921
922impl FlowTickManager {
923    pub fn new() -> Self {
924        FlowTickManager {
925            start: Instant::now(),
926            start_timestamp: SystemTime::now()
927                .duration_since(SystemTime::UNIX_EPOCH)
928                .unwrap()
929                .as_millis() as repr::Timestamp,
930        }
931    }
932
933    /// Return the current timestamp in milliseconds
934    ///
935    /// TODO(discord9): reconsider since `tick()` require a monotonic clock and also need to survive recover later
936    pub fn tick(&self) -> repr::Timestamp {
937        let current = Instant::now();
938        let since_the_epoch = current - self.start;
939        since_the_epoch.as_millis() as repr::Timestamp + self.start_timestamp
940    }
941}