1#![warn(unused_imports)]
18
19use std::collections::BTreeMap;
20use std::sync::Arc;
21use std::time::{Duration, Instant, SystemTime};
22
23use api::v1::{RowDeleteRequest, RowDeleteRequests, RowInsertRequest, RowInsertRequests};
24use common_config::Configurable;
25use common_error::ext::BoxedError;
26use common_meta::key::TableMetadataManagerRef;
27use common_runtime::JoinHandle;
28use common_telemetry::logging::{LoggingOptions, TracingOptions};
29use common_telemetry::{debug, info, trace};
30use datatypes::schema::ColumnSchema;
31use datatypes::value::Value;
32use greptime_proto::v1;
33use itertools::{EitherOrBoth, Itertools};
34use meta_client::MetaClientOptions;
35use query::options::QueryOptions;
36use query::QueryEngine;
37use serde::{Deserialize, Serialize};
38use servers::grpc::GrpcOptions;
39use servers::heartbeat_options::HeartbeatOptions;
40use servers::http::HttpOptions;
41use session::context::QueryContext;
42use snafu::{ensure, OptionExt, ResultExt};
43use store_api::storage::{ConcreteDataType, RegionId};
44use table::metadata::TableId;
45use tokio::sync::broadcast::error::TryRecvError;
46use tokio::sync::{broadcast, watch, Mutex, RwLock};
47
48pub(crate) use crate::adapter::node_context::FlownodeContext;
49use crate::adapter::refill::RefillTask;
50use crate::adapter::table_source::ManagedTableSource;
51use crate::adapter::util::relation_desc_to_column_schemas_with_fallback;
52pub(crate) use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
53use crate::compute::ErrCollector;
54use crate::df_optimizer::sql_to_flow_plan;
55use crate::error::{EvalSnafu, ExternalSnafu, InternalSnafu, InvalidQuerySnafu, UnexpectedSnafu};
56use crate::expr::Batch;
57use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_ROWS, METRIC_FLOW_RUN_INTERVAL_MS};
58use crate::repr::{self, DiffRow, RelationDesc, Row, BATCH_SIZE};
59use crate::{CreateFlowArgs, FlowId, TableName};
60
61pub(crate) mod flownode_impl;
62mod parse_expr;
63pub(crate) mod refill;
64mod stat;
65#[cfg(test)]
66mod tests;
67pub(crate) mod util;
68mod worker;
69
70pub(crate) mod node_context;
71pub(crate) mod table_source;
72
73use crate::error::Error;
74use crate::utils::StateReportHandler;
75use crate::FrontendInvoker;
76
77pub const AUTO_CREATED_PLACEHOLDER_TS_COL: &str = "__ts_placeholder";
79
80pub const AUTO_CREATED_UPDATE_AT_TS_COL: &str = "update_at";
81
82#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
84#[serde(default)]
85pub struct FlowConfig {
86 pub num_workers: usize,
87}
88
89impl Default for FlowConfig {
90 fn default() -> Self {
91 Self {
92 num_workers: (common_config::utils::get_cpus() / 2).max(1),
93 }
94 }
95}
96
97#[derive(Clone, Debug, Serialize, Deserialize)]
99#[serde(default)]
100pub struct FlownodeOptions {
101 pub node_id: Option<u64>,
102 pub flow: FlowConfig,
103 pub grpc: GrpcOptions,
104 pub http: HttpOptions,
105 pub meta_client: Option<MetaClientOptions>,
106 pub logging: LoggingOptions,
107 pub tracing: TracingOptions,
108 pub heartbeat: HeartbeatOptions,
109 pub query: QueryOptions,
110 pub user_provider: Option<String>,
111}
112
113impl Default for FlownodeOptions {
114 fn default() -> Self {
115 Self {
116 node_id: None,
117 flow: FlowConfig::default(),
118 grpc: GrpcOptions::default().with_bind_addr("127.0.0.1:3004"),
119 http: HttpOptions::default(),
120 meta_client: None,
121 logging: LoggingOptions::default(),
122 tracing: TracingOptions::default(),
123 heartbeat: HeartbeatOptions::default(),
124 query: QueryOptions::default(),
125 user_provider: None,
126 }
127 }
128}
129
130impl Configurable for FlownodeOptions {
131 fn validate_sanitize(&mut self) -> common_config::error::Result<()> {
132 if self.flow.num_workers == 0 {
133 self.flow.num_workers = (common_config::utils::get_cpus() / 2).max(1);
134 }
135 Ok(())
136 }
137}
138
139pub type FlowStreamingEngineRef = Arc<StreamingEngine>;
141
142pub struct StreamingEngine {
147 pub worker_handles: Vec<WorkerHandle>,
150 worker_selector: Mutex<usize>,
152 pub query_engine: Arc<dyn QueryEngine>,
154 table_info_source: ManagedTableSource,
156 frontend_invoker: RwLock<Option<FrontendInvoker>>,
157 node_context: RwLock<FlownodeContext>,
159 refill_tasks: RwLock<BTreeMap<FlowId, RefillTask>>,
161 flow_err_collectors: RwLock<BTreeMap<FlowId, ErrCollector>>,
162 src_send_buf_lens: RwLock<BTreeMap<TableId, watch::Receiver<usize>>>,
163 tick_manager: FlowTickManager,
164 pub node_id: Option<u32>,
166 flush_lock: RwLock<()>,
170 state_report_handler: RwLock<Option<StateReportHandler>>,
172}
173
174impl StreamingEngine {
176 pub async fn set_frontend_invoker(&self, frontend: FrontendInvoker) {
178 *self.frontend_invoker.write().await = Some(frontend);
179 }
180
181 pub fn new(
183 node_id: Option<u32>,
184 query_engine: Arc<dyn QueryEngine>,
185 table_meta: TableMetadataManagerRef,
186 ) -> Self {
187 let srv_map = ManagedTableSource::new(
188 table_meta.table_info_manager().clone(),
189 table_meta.table_name_manager().clone(),
190 );
191 let node_context = FlownodeContext::new(Box::new(srv_map.clone()) as _);
192 let tick_manager = FlowTickManager::new();
193 let worker_handles = Vec::new();
194 StreamingEngine {
195 worker_handles,
196 worker_selector: Mutex::new(0),
197 query_engine,
198 table_info_source: srv_map,
199 frontend_invoker: RwLock::new(None),
200 node_context: RwLock::new(node_context),
201 refill_tasks: Default::default(),
202 flow_err_collectors: Default::default(),
203 src_send_buf_lens: Default::default(),
204 tick_manager,
205 node_id,
206 flush_lock: RwLock::new(()),
207 state_report_handler: RwLock::new(None),
208 }
209 }
210
211 pub async fn with_state_report_handler(self, handler: StateReportHandler) -> Self {
212 *self.state_report_handler.write().await = Some(handler);
213 self
214 }
215
216 pub fn new_with_workers<'s>(
218 node_id: Option<u32>,
219 query_engine: Arc<dyn QueryEngine>,
220 table_meta: TableMetadataManagerRef,
221 num_workers: usize,
222 ) -> (Self, Vec<Worker<'s>>) {
223 let mut zelf = Self::new(node_id, query_engine, table_meta);
224
225 let workers: Vec<_> = (0..num_workers)
226 .map(|_| {
227 let (handle, worker) = create_worker();
228 zelf.add_worker_handle(handle);
229 worker
230 })
231 .collect();
232 (zelf, workers)
233 }
234
235 pub fn add_worker_handle(&mut self, handle: WorkerHandle) {
237 self.worker_handles.push(handle);
238 }
239}
240
241#[derive(Debug)]
242pub enum DiffRequest {
243 Insert(Vec<(Row, repr::Timestamp)>),
244 Delete(Vec<(Row, repr::Timestamp)>),
245}
246
247impl DiffRequest {
248 pub fn len(&self) -> usize {
249 match self {
250 Self::Insert(v) => v.len(),
251 Self::Delete(v) => v.len(),
252 }
253 }
254}
255
256pub fn batches_to_rows_req(batches: Vec<Batch>) -> Result<Vec<DiffRequest>, Error> {
257 let mut reqs = Vec::new();
258 for batch in batches {
259 let mut rows = Vec::with_capacity(batch.row_count());
260 for i in 0..batch.row_count() {
261 let row = batch.get_row(i).context(EvalSnafu)?;
262 rows.push((Row::new(row), 0));
263 }
264 reqs.push(DiffRequest::Insert(rows));
265 }
266 Ok(reqs)
267}
268
269impl StreamingEngine {
271 pub async fn send_writeback_requests(&self) -> Result<usize, Error> {
273 let all_reqs = self.generate_writeback_request().await?;
274 if all_reqs.is_empty() || all_reqs.iter().all(|v| v.1.is_empty()) {
275 return Ok(0);
276 }
277 let mut req_cnt = 0;
278 for (table_name, reqs) in all_reqs {
279 if reqs.is_empty() {
280 continue;
281 }
282 let (catalog, schema) = (table_name[0].clone(), table_name[1].clone());
283 let ctx = Arc::new(QueryContext::with(&catalog, &schema));
284
285 let (is_ts_placeholder, proto_schema) = match self
286 .try_fetch_existing_table(&table_name)
287 .await?
288 .context(UnexpectedSnafu {
289 reason: format!("Table not found: {}", table_name.join(".")),
290 }) {
291 Ok(r) => r,
292 Err(e) => {
293 if self
294 .table_info_source
295 .get_opt_table_id_from_name(&table_name)
296 .await?
297 .is_none()
298 {
299 common_telemetry::warn!(e; "Table `{}` no longer exists, skip writeback", table_name.join("."));
302 continue;
303 } else {
304 return Err(e);
305 }
306 }
307 };
308 let schema_len = proto_schema.len();
309
310 let total_rows = reqs.iter().map(|r| r.len()).sum::<usize>();
311 trace!(
312 "Sending {} writeback requests to table {}, reqs total rows={}",
313 reqs.len(),
314 table_name.join("."),
315 reqs.iter().map(|r| r.len()).sum::<usize>()
316 );
317
318 METRIC_FLOW_ROWS
319 .with_label_values(&["out"])
320 .inc_by(total_rows as u64);
321
322 let now = self.tick_manager.tick();
323 for req in reqs {
324 match req {
325 DiffRequest::Insert(insert) => {
326 let rows_proto: Vec<v1::Row> = insert
327 .into_iter()
328 .map(|(mut row, _ts)| {
329 if row.len() < proto_schema.len()
332 && proto_schema[row.len()].datatype
333 == greptime_proto::v1::ColumnDataType::TimestampMillisecond
334 as i32
335 {
336 row.extend([Value::from(
337 common_time::Timestamp::new_millisecond(now),
338 )]);
339 }
340 if is_ts_placeholder {
342 ensure!(
343 row.len() == schema_len - 1,
344 InternalSnafu {
345 reason: format!(
346 "Row len mismatch, expect {} got {}",
347 schema_len - 1,
348 row.len()
349 )
350 }
351 );
352 row.extend([Value::from(
353 common_time::Timestamp::new_millisecond(0),
354 )]);
355 }
356 if row.len() != proto_schema.len() {
357 UnexpectedSnafu {
358 reason: format!(
359 "Flow output row length mismatch, expect {} got {}, the columns in schema are: {:?}",
360 proto_schema.len(),
361 row.len(),
362 proto_schema.iter().map(|c|&c.column_name).collect_vec()
363 ),
364 }
365 .fail()?;
366 }
367 Ok(row.into())
368 })
369 .collect::<Result<Vec<_>, Error>>()?;
370 let table_name = table_name.last().unwrap().clone();
371 let req = RowInsertRequest {
372 table_name,
373 rows: Some(v1::Rows {
374 schema: proto_schema.clone(),
375 rows: rows_proto,
376 }),
377 };
378 req_cnt += 1;
379 self.frontend_invoker
380 .read()
381 .await
382 .as_ref()
383 .with_context(|| UnexpectedSnafu {
384 reason: "Expect a frontend invoker for flownode to write back",
385 })?
386 .row_inserts(RowInsertRequests { inserts: vec![req] }, ctx.clone())
387 .await
388 .map_err(BoxedError::new)
389 .with_context(|_| ExternalSnafu {})?;
390 }
391 DiffRequest::Delete(remove) => {
392 info!("original remove rows={:?}", remove);
393 let rows_proto: Vec<v1::Row> = remove
394 .into_iter()
395 .map(|(mut row, _ts)| {
396 row.extend(Some(Value::from(
397 common_time::Timestamp::new_millisecond(0),
398 )));
399 row.into()
400 })
401 .collect::<Vec<_>>();
402 let table_name = table_name.last().unwrap().clone();
403 let req = RowDeleteRequest {
404 table_name,
405 rows: Some(v1::Rows {
406 schema: proto_schema.clone(),
407 rows: rows_proto,
408 }),
409 };
410
411 req_cnt += 1;
412 self.frontend_invoker
413 .read()
414 .await
415 .as_ref()
416 .with_context(|| UnexpectedSnafu {
417 reason: "Expect a frontend invoker for flownode to write back",
418 })?
419 .row_deletes(RowDeleteRequests { deletes: vec![req] }, ctx.clone())
420 .await
421 .map_err(BoxedError::new)
422 .with_context(|_| ExternalSnafu {})?;
423 }
424 }
425 }
426 }
427 Ok(req_cnt)
428 }
429
430 pub async fn generate_writeback_request(
432 &self,
433 ) -> Result<BTreeMap<TableName, Vec<DiffRequest>>, Error> {
434 trace!("Start to generate writeback request");
435 let mut output = BTreeMap::new();
436 let mut total_row_count = 0;
437 for (name, sink_recv) in self
438 .node_context
439 .write()
440 .await
441 .sink_receiver
442 .iter_mut()
443 .map(|(n, (_s, r))| (n, r))
444 {
445 let mut batches = Vec::new();
446 while let Ok(batch) = sink_recv.try_recv() {
447 total_row_count += batch.row_count();
448 batches.push(batch);
449 }
450 let reqs = batches_to_rows_req(batches)?;
451 output.insert(name.clone(), reqs);
452 }
453 trace!("Prepare writeback req: total row count={}", total_row_count);
454 Ok(output)
455 }
456
457 async fn fetch_table_pk_schema(
459 &self,
460 table_name: &TableName,
461 ) -> Result<Option<(Vec<String>, Option<usize>, Vec<ColumnSchema>)>, Error> {
462 if let Some(table_id) = self
463 .table_info_source
464 .get_opt_table_id_from_name(table_name)
465 .await?
466 {
467 let table_info = self
468 .table_info_source
469 .get_table_info_value(&table_id)
470 .await?
471 .unwrap();
472 let meta = table_info.table_info.meta;
473 let primary_keys = meta
474 .primary_key_indices
475 .into_iter()
476 .map(|i| meta.schema.column_schemas[i].name.clone())
477 .collect_vec();
478 let schema = meta.schema.column_schemas;
479 let time_index = meta.schema.timestamp_index;
480 Ok(Some((primary_keys, time_index, schema)))
481 } else {
482 Ok(None)
483 }
484 }
485
486 async fn adjust_auto_created_table_schema(
491 &self,
492 schema: &RelationDesc,
493 ) -> Result<(Vec<String>, Vec<ColumnSchema>, bool), Error> {
494 let primary_keys = schema
498 .typ()
499 .keys
500 .first()
501 .map(|v| {
502 v.column_indices
503 .iter()
504 .map(|i| {
505 schema
506 .get_name(*i)
507 .clone()
508 .unwrap_or_else(|| format!("col_{i}"))
509 })
510 .collect_vec()
511 })
512 .unwrap_or_default();
513 let update_at = ColumnSchema::new(
514 AUTO_CREATED_UPDATE_AT_TS_COL,
515 ConcreteDataType::timestamp_millisecond_datatype(),
516 true,
517 );
518
519 let original_schema = relation_desc_to_column_schemas_with_fallback(schema);
520
521 let mut with_auto_added_col = original_schema.clone();
522 with_auto_added_col.push(update_at);
523
524 let no_time_index = schema.typ().time_index.is_none();
526 if no_time_index {
527 let ts_col = ColumnSchema::new(
528 AUTO_CREATED_PLACEHOLDER_TS_COL,
529 ConcreteDataType::timestamp_millisecond_datatype(),
530 true,
531 )
532 .with_time_index(true);
533 with_auto_added_col.push(ts_col);
534 }
535
536 Ok((primary_keys, with_auto_added_col, no_time_index))
537 }
538}
539
540impl StreamingEngine {
542 async fn start_state_report_handler(self: Arc<Self>) -> Option<JoinHandle<()>> {
546 let state_report_handler = self.state_report_handler.write().await.take();
547 if let Some(mut handler) = state_report_handler {
548 let zelf = self.clone();
549 let handler = common_runtime::spawn_global(async move {
550 while let Some(ret_handler) = handler.recv().await {
551 let state_report = zelf.gen_state_report().await;
552 ret_handler.send(state_report).unwrap_or_else(|err| {
553 common_telemetry::error!(err; "Send state size report error");
554 });
555 }
556 });
557 Some(handler)
558 } else {
559 None
560 }
561 }
562
563 pub fn run_background(
565 self: Arc<Self>,
566 shutdown: Option<broadcast::Receiver<()>>,
567 ) -> JoinHandle<()> {
568 info!("Starting flownode manager's background task");
569 common_runtime::spawn_global(async move {
570 let _state_report_handler = self.clone().start_state_report_handler().await;
571 self.run(shutdown).await;
572 })
573 }
574
575 pub async fn log_all_errors(&self) {
577 for (f_id, f_err) in self.flow_err_collectors.read().await.iter() {
578 let all_errors = f_err.get_all().await;
579 if !all_errors.is_empty() {
580 let all_errors = all_errors
581 .into_iter()
582 .map(|i| format!("{:?}", i))
583 .join("\n");
584 common_telemetry::error!("Flow {} has following errors: {}", f_id, all_errors);
585 }
586 }
587 }
588
589 pub async fn run(&self, mut shutdown: Option<broadcast::Receiver<()>>) {
593 debug!("Starting to run");
594 let default_interval = Duration::from_secs(1);
595 let mut tick_interval = tokio::time::interval(default_interval);
596 tick_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Burst);
598 let mut avg_spd = 0; let mut since_last_run = tokio::time::Instant::now();
600 let run_per_trace = 10;
601 let mut run_cnt = 0;
602 loop {
603 let row_cnt = self.run_available(false).await.unwrap_or_else(|err| {
605 common_telemetry::error!(err;"Run available errors");
606 0
607 });
608
609 if let Err(err) = self.send_writeback_requests().await {
610 common_telemetry::error!(err;"Send writeback request errors");
611 };
612 self.log_all_errors().await;
613
614 match &shutdown.as_mut().map(|s| s.try_recv()) {
616 Some(Ok(())) => {
617 info!("Shutdown flow's main loop");
618 break;
619 }
620 Some(Err(TryRecvError::Empty)) => (),
621 Some(Err(TryRecvError::Closed)) => {
622 common_telemetry::error!("Shutdown channel is closed");
623 break;
624 }
625 Some(Err(TryRecvError::Lagged(num))) => {
626 common_telemetry::error!("Shutdown channel is lagged by {}, meaning multiple shutdown cmd have been issued", num);
627 break;
628 }
629 None => (),
630 }
631
632 let wait_for = since_last_run.elapsed();
635
636 let cur_spd = row_cnt * 1000 / wait_for.as_millis().max(1) as usize;
638 avg_spd = if cur_spd > avg_spd {
640 cur_spd
641 } else {
642 (9 * avg_spd + cur_spd) / 10
643 };
644 let new_wait = BATCH_SIZE * 1000 / avg_spd.max(1); let new_wait = Duration::from_millis(new_wait as u64).min(default_interval);
646
647 if run_cnt >= run_per_trace {
650 trace!("avg_spd={} r/s, cur_spd={} r/s", avg_spd, cur_spd);
651 trace!("Wait for {} ms, row_cnt={}", new_wait.as_millis(), row_cnt);
652 run_cnt = 0;
653 } else {
654 run_cnt += 1;
655 }
656
657 METRIC_FLOW_RUN_INTERVAL_MS.set(new_wait.as_millis() as i64);
658 since_last_run = tokio::time::Instant::now();
659 tokio::select! {
660 _ = tick_interval.tick() => (),
661 _ = tokio::time::sleep(new_wait) => ()
662 }
663 }
664 self.frontend_invoker.write().await.take();
668 }
669
670 pub async fn run_available(&self, blocking: bool) -> Result<usize, Error> {
678 let mut row_cnt = 0;
679
680 let now = self.tick_manager.tick();
681 for worker in self.worker_handles.iter() {
682 worker.run_available(now, blocking).await?;
684 }
685 let flush_res = if blocking {
687 let ctx = self.node_context.read().await;
688 ctx.flush_all_sender().await
689 } else {
690 match self.node_context.try_read() {
691 Ok(ctx) => ctx.flush_all_sender().await,
692 Err(_) => return Ok(row_cnt),
693 }
694 };
695 match flush_res {
696 Ok(r) => {
697 common_telemetry::trace!("Total flushed {} rows", r);
698 row_cnt += r;
699 }
700 Err(err) => {
701 common_telemetry::error!("Flush send buf errors: {:?}", err);
702 }
703 };
704
705 Ok(row_cnt)
706 }
707
708 pub async fn handle_write_request(
710 &self,
711 region_id: RegionId,
712 rows: Vec<DiffRow>,
713 batch_datatypes: &[ConcreteDataType],
714 ) -> Result<(), Error> {
715 let rows_len = rows.len();
716 let table_id = region_id.table_id();
717 let _timer = METRIC_FLOW_INSERT_ELAPSED
718 .with_label_values(&[table_id.to_string().as_str()])
719 .start_timer();
720 self.node_context
721 .read()
722 .await
723 .send(table_id, rows, batch_datatypes)
724 .await?;
725 trace!(
726 "Handling write request for table_id={} with {} rows",
727 table_id,
728 rows_len
729 );
730 Ok(())
731 }
732}
733
734impl StreamingEngine {
736 pub async fn remove_flow_inner(&self, flow_id: FlowId) -> Result<(), Error> {
738 for handle in self.worker_handles.iter() {
739 if handle.contains_flow(flow_id).await? {
740 handle.remove_flow(flow_id).await?;
741 break;
742 }
743 }
744 self.node_context.write().await.remove_flow(flow_id);
745 Ok(())
746 }
747
748 pub async fn create_flow_inner(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
754 let CreateFlowArgs {
755 flow_id,
756 sink_table_name,
757 source_table_ids,
758 create_if_not_exists,
759 or_replace,
760 expire_after,
761 comment,
762 sql,
763 flow_options,
764 query_ctx,
765 } = args;
766
767 let mut node_ctx = self.node_context.write().await;
768 for source in &source_table_ids {
770 node_ctx
771 .assign_global_id_to_table(&self.table_info_source, None, Some(*source))
772 .await?;
773 }
774 node_ctx
775 .assign_global_id_to_table(&self.table_info_source, Some(sink_table_name.clone()), None)
776 .await?;
777
778 node_ctx.register_task_src_sink(flow_id, &source_table_ids, sink_table_name.clone());
779
780 node_ctx.query_context = query_ctx.map(Arc::new);
781 let flow_plan = sql_to_flow_plan(&mut node_ctx, &self.query_engine, &sql).await?;
783
784 debug!("Flow {:?}'s Plan is {:?}", flow_id, flow_plan);
785
786 if let Some((_, _, real_schema)) = self.fetch_table_pk_schema(&sink_table_name).await? {
789 let auto_schema = relation_desc_to_column_schemas_with_fallback(&flow_plan.schema);
790
791 for (idx, zipped) in auto_schema
795 .iter()
796 .zip_longest(real_schema.iter())
797 .enumerate()
798 {
799 match zipped {
800 EitherOrBoth::Both(auto, real) => {
801 if auto.data_type != real.data_type {
802 InvalidQuerySnafu {
803 reason: format!(
804 "Column {}(name is '{}', flow inferred name is '{}')'s data type mismatch, expect {:?} got {:?}",
805 idx,
806 real.name,
807 auto.name,
808 real.data_type,
809 auto.data_type
810 ),
811 }
812 .fail()?;
813 }
814 }
815 EitherOrBoth::Right(real) if real.data_type.is_timestamp() => {
816 continue;
818 }
819 _ => InvalidQuerySnafu {
820 reason: format!(
821 "schema length mismatched, expected {} found {}",
822 real_schema.len(),
823 auto_schema.len()
824 ),
825 }
826 .fail()?,
827 }
828 }
829 } else {
830 let did_create = self
833 .create_table_from_relation(
834 &format!("flow-id={flow_id}"),
835 &sink_table_name,
836 &flow_plan.schema,
837 )
838 .await?;
839 if !did_create {
840 UnexpectedSnafu {
841 reason: format!("Failed to create table {:?}", sink_table_name),
842 }
843 .fail()?;
844 }
845 }
846
847 node_ctx.add_flow_plan(flow_id, flow_plan.clone());
848
849 let _ = comment;
850 let _ = flow_options;
851
852 let sink_id = node_ctx.table_repr.get_by_name(&sink_table_name).unwrap().1;
854 let sink_sender = node_ctx.get_sink_by_global_id(&sink_id)?;
855
856 let source_ids = source_table_ids
857 .iter()
858 .map(|id| node_ctx.table_repr.get_by_table_id(id).unwrap().1)
859 .collect_vec();
860 let source_receivers = source_ids
861 .iter()
862 .map(|id| {
863 node_ctx
864 .get_source_by_global_id(id)
865 .map(|s| s.get_receiver())
866 })
867 .collect::<Result<Vec<_>, _>>()?;
868 let err_collector = ErrCollector::default();
869 self.flow_err_collectors
870 .write()
871 .await
872 .insert(flow_id, err_collector.clone());
873 let handle = self.get_worker_handle_for_create_flow().await;
875 let create_request = worker::Request::Create {
876 flow_id,
877 plan: flow_plan,
878 sink_id,
879 sink_sender,
880 source_ids,
881 src_recvs: source_receivers,
882 expire_after,
883 or_replace,
884 create_if_not_exists,
885 err_collector,
886 };
887
888 handle.create_flow(create_request).await?;
889 info!("Successfully create flow with id={}", flow_id);
890 Ok(Some(flow_id))
891 }
892
893 pub async fn flush_flow_inner(&self, flow_id: FlowId) -> Result<usize, Error> {
894 debug!("Starting to flush flow_id={:?}", flow_id);
895 drop(self.flush_lock.write().await);
898 let flushed_input_rows = self.node_context.read().await.flush_all_sender().await?;
899 let rows_send = self.run_available(true).await?;
900 let row = self.send_writeback_requests().await?;
901 debug!(
902 "Done to flush flow_id={:?} with {} input rows flushed, {} rows sended and {} output rows flushed",
903 flow_id, flushed_input_rows, rows_send, row
904 );
905 Ok(row)
906 }
907
908 pub async fn flow_exist_inner(&self, flow_id: FlowId) -> Result<bool, Error> {
909 let mut exist = false;
910 for handle in self.worker_handles.iter() {
911 if handle.contains_flow(flow_id).await? {
912 exist = true;
913 break;
914 }
915 }
916 Ok(exist)
917 }
918}
919
920#[derive(Clone, Debug)]
925pub struct FlowTickManager {
926 start: Instant,
928 start_timestamp: repr::Timestamp,
930}
931
932impl FlowTickManager {
933 pub fn new() -> Self {
934 FlowTickManager {
935 start: Instant::now(),
936 start_timestamp: SystemTime::now()
937 .duration_since(SystemTime::UNIX_EPOCH)
938 .unwrap()
939 .as_millis() as repr::Timestamp,
940 }
941 }
942
943 pub fn tick(&self) -> repr::Timestamp {
947 let current = Instant::now();
948 let since_the_epoch = current - self.start;
949 since_the_epoch.as_millis() as repr::Timestamp + self.start_timestamp
950 }
951}