1#![warn(unused_imports)]
18
19use std::collections::BTreeMap;
20use std::sync::Arc;
21use std::time::{Duration, Instant, SystemTime};
22
23use api::v1::{RowDeleteRequest, RowDeleteRequests, RowInsertRequest, RowInsertRequests};
24use common_base::memory_limit::MemoryLimit;
25use common_config::Configurable;
26use common_error::ext::BoxedError;
27use common_meta::key::TableMetadataManagerRef;
28use common_options::memory::MemoryOptions;
29use common_runtime::JoinHandle;
30use common_stat::get_total_cpu_cores;
31use common_telemetry::logging::{LoggingOptions, TracingOptions};
32use common_telemetry::{debug, info, trace};
33use datatypes::schema::ColumnSchema;
34use datatypes::value::Value;
35use greptime_proto::v1;
36use itertools::{EitherOrBoth, Itertools};
37use meta_client::MetaClientOptions;
38use query::QueryEngine;
39use query::options::QueryOptions;
40use serde::{Deserialize, Serialize};
41use servers::grpc::GrpcOptions;
42use servers::http::HttpOptions;
43use session::context::QueryContext;
44use snafu::{OptionExt, ResultExt, ensure};
45use store_api::storage::{ConcreteDataType, RegionId};
46use table::metadata::TableId;
47use tokio::sync::broadcast::error::TryRecvError;
48use tokio::sync::{Mutex, RwLock, broadcast, watch};
49
50pub(crate) use crate::adapter::node_context::FlownodeContext;
51use crate::adapter::refill::RefillTask;
52use crate::adapter::table_source::ManagedTableSource;
53use crate::adapter::util::relation_desc_to_column_schemas_with_fallback;
54pub(crate) use crate::adapter::worker::{Worker, WorkerHandle, create_worker};
55use crate::batching_mode::BatchingModeOptions;
56use crate::compute::ErrCollector;
57use crate::df_optimizer::sql_to_flow_plan;
58use crate::error::{EvalSnafu, ExternalSnafu, InternalSnafu, InvalidQuerySnafu, UnexpectedSnafu};
59use crate::expr::Batch;
60use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_ROWS, METRIC_FLOW_RUN_INTERVAL_MS};
61use crate::repr::{self, BATCH_SIZE, DiffRow, RelationDesc, Row};
62use crate::{CreateFlowArgs, FlowId, TableName};
63
64pub(crate) mod flownode_impl;
65mod parse_expr;
66pub(crate) mod refill;
67mod stat;
68#[cfg(test)]
69mod tests;
70pub(crate) mod util;
71mod worker;
72
73pub(crate) mod node_context;
74pub(crate) mod table_source;
75
76use crate::FrontendInvoker;
77use crate::error::Error;
78
79pub const AUTO_CREATED_PLACEHOLDER_TS_COL: &str = "__ts_placeholder";
81
82pub const AUTO_CREATED_UPDATE_AT_TS_COL: &str = "update_at";
83
84#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
86#[serde(default)]
87pub struct FlowConfig {
88 pub num_workers: usize,
89 pub batching_mode: BatchingModeOptions,
90}
91
92impl Default for FlowConfig {
93 fn default() -> Self {
94 Self {
95 num_workers: (get_total_cpu_cores() / 2).max(1),
96 batching_mode: BatchingModeOptions::default(),
97 }
98 }
99}
100
101#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
103#[serde(default)]
104pub struct FlownodeOptions {
105 pub node_id: Option<u64>,
106 pub flow: FlowConfig,
107 pub grpc: GrpcOptions,
108 pub http: HttpOptions,
109 pub meta_client: Option<MetaClientOptions>,
110 pub logging: LoggingOptions,
111 pub tracing: TracingOptions,
112 pub query: QueryOptions,
113 pub memory: MemoryOptions,
114}
115
116impl Default for FlownodeOptions {
117 fn default() -> Self {
118 Self {
119 node_id: None,
120 flow: FlowConfig::default(),
121 grpc: GrpcOptions::default().with_bind_addr("127.0.0.1:3004"),
122 http: HttpOptions::default(),
123 meta_client: None,
124 logging: LoggingOptions::default(),
125 tracing: TracingOptions::default(),
126 query: QueryOptions {
129 parallelism: 1,
130 allow_query_fallback: false,
131 memory_pool_size: MemoryLimit::default(),
132 enable_per_region_metrics: false,
133 },
134 memory: MemoryOptions::default(),
135 }
136 }
137}
138
139impl Configurable for FlownodeOptions {
140 fn validate_sanitize(&mut self) -> common_config::error::Result<()> {
141 if self.flow.num_workers == 0 {
142 self.flow.num_workers = (get_total_cpu_cores() / 2).max(1);
143 }
144 Ok(())
145 }
146}
147
148pub type FlowStreamingEngineRef = Arc<StreamingEngine>;
150
151pub struct StreamingEngine {
156 pub worker_handles: Vec<WorkerHandle>,
159 worker_selector: Mutex<usize>,
161 pub query_engine: Arc<dyn QueryEngine>,
163 table_info_source: ManagedTableSource,
165 frontend_invoker: RwLock<Option<FrontendInvoker>>,
166 node_context: RwLock<FlownodeContext>,
168 refill_tasks: RwLock<BTreeMap<FlowId, RefillTask>>,
170 flow_err_collectors: RwLock<BTreeMap<FlowId, ErrCollector>>,
171 src_send_buf_lens: RwLock<BTreeMap<TableId, watch::Receiver<usize>>>,
172 tick_manager: FlowTickManager,
173 pub node_id: Option<u32>,
175 flush_lock: RwLock<()>,
179}
180
181impl StreamingEngine {
183 pub async fn set_frontend_invoker(&self, frontend: FrontendInvoker) {
185 *self.frontend_invoker.write().await = Some(frontend);
186 }
187
188 pub fn new(
190 node_id: Option<u32>,
191 query_engine: Arc<dyn QueryEngine>,
192 table_meta: TableMetadataManagerRef,
193 ) -> Self {
194 let srv_map = ManagedTableSource::new(
195 table_meta.table_info_manager().clone(),
196 table_meta.table_name_manager().clone(),
197 );
198 let node_context = FlownodeContext::new(Box::new(srv_map.clone()) as _);
199 let tick_manager = FlowTickManager::new();
200 let worker_handles = Vec::new();
201 StreamingEngine {
202 worker_handles,
203 worker_selector: Mutex::new(0),
204 query_engine,
205 table_info_source: srv_map,
206 frontend_invoker: RwLock::new(None),
207 node_context: RwLock::new(node_context),
208 refill_tasks: Default::default(),
209 flow_err_collectors: Default::default(),
210 src_send_buf_lens: Default::default(),
211 tick_manager,
212 node_id,
213 flush_lock: RwLock::new(()),
214 }
215 }
216
217 pub fn new_with_workers<'s>(
219 node_id: Option<u32>,
220 query_engine: Arc<dyn QueryEngine>,
221 table_meta: TableMetadataManagerRef,
222 num_workers: usize,
223 ) -> (Self, Vec<Worker<'s>>) {
224 let mut zelf = Self::new(node_id, query_engine, table_meta);
225
226 let workers: Vec<_> = (0..num_workers)
227 .map(|_| {
228 let (handle, worker) = create_worker();
229 zelf.add_worker_handle(handle);
230 worker
231 })
232 .collect();
233 (zelf, workers)
234 }
235
236 pub fn add_worker_handle(&mut self, handle: WorkerHandle) {
238 self.worker_handles.push(handle);
239 }
240}
241
242#[derive(Debug)]
243pub enum DiffRequest {
244 Insert(Vec<(Row, repr::Timestamp)>),
245 Delete(Vec<(Row, repr::Timestamp)>),
246}
247
248impl DiffRequest {
249 pub fn len(&self) -> usize {
250 match self {
251 Self::Insert(v) => v.len(),
252 Self::Delete(v) => v.len(),
253 }
254 }
255
256 pub fn is_empty(&self) -> bool {
257 self.len() == 0
258 }
259}
260
261pub fn batches_to_rows_req(batches: Vec<Batch>) -> Result<Vec<DiffRequest>, Error> {
262 let mut reqs = Vec::new();
263 for batch in batches {
264 let mut rows = Vec::with_capacity(batch.row_count());
265 for i in 0..batch.row_count() {
266 let row = batch.get_row(i).context(EvalSnafu)?;
267 rows.push((Row::new(row), 0));
268 }
269 reqs.push(DiffRequest::Insert(rows));
270 }
271 Ok(reqs)
272}
273
274impl StreamingEngine {
276 pub async fn send_writeback_requests(&self) -> Result<usize, Error> {
278 let all_reqs = self.generate_writeback_request().await?;
279 if all_reqs.is_empty() || all_reqs.iter().all(|v| v.1.is_empty()) {
280 return Ok(0);
281 }
282 let mut req_cnt = 0;
283 for (table_name, reqs) in all_reqs {
284 if reqs.is_empty() {
285 continue;
286 }
287 let (catalog, schema) = (table_name[0].clone(), table_name[1].clone());
288 let ctx = Arc::new(QueryContext::with(&catalog, &schema));
289
290 let (is_ts_placeholder, proto_schema) = match self
291 .try_fetch_existing_table(&table_name)
292 .await?
293 .context(UnexpectedSnafu {
294 reason: format!("Table not found: {}", table_name.join(".")),
295 }) {
296 Ok(r) => r,
297 Err(e) => {
298 if self
299 .table_info_source
300 .get_opt_table_id_from_name(&table_name)
301 .await?
302 .is_none()
303 {
304 common_telemetry::warn!(e; "Table `{}` no longer exists, skip writeback", table_name.join("."));
307 continue;
308 } else {
309 return Err(e);
310 }
311 }
312 };
313 let schema_len = proto_schema.len();
314
315 let total_rows = reqs.iter().map(|r| r.len()).sum::<usize>();
316 trace!(
317 "Sending {} writeback requests to table {}, reqs total rows={}",
318 reqs.len(),
319 table_name.join("."),
320 reqs.iter().map(|r| r.len()).sum::<usize>()
321 );
322
323 METRIC_FLOW_ROWS
324 .with_label_values(&["out-streaming"])
325 .inc_by(total_rows as u64);
326
327 let now = self.tick_manager.tick();
328 for req in reqs {
329 match req {
330 DiffRequest::Insert(insert) => {
331 let rows_proto: Vec<v1::Row> = insert
332 .into_iter()
333 .map(|(mut row, _ts)| {
334 if row.len() < proto_schema.len()
337 && proto_schema[row.len()].datatype
338 == greptime_proto::v1::ColumnDataType::TimestampMillisecond
339 as i32
340 {
341 row.extend([Value::from(
342 common_time::Timestamp::new_millisecond(now),
343 )]);
344 }
345 if is_ts_placeholder {
347 ensure!(
348 row.len() == schema_len - 1,
349 InternalSnafu {
350 reason: format!(
351 "Row len mismatch, expect {} got {}",
352 schema_len - 1,
353 row.len()
354 )
355 }
356 );
357 row.extend([Value::from(
358 common_time::Timestamp::new_millisecond(0),
359 )]);
360 }
361 if row.len() != proto_schema.len() {
362 UnexpectedSnafu {
363 reason: format!(
364 "Flow output row length mismatch, expect {} got {}, the columns in schema are: {:?}",
365 proto_schema.len(),
366 row.len(),
367 proto_schema.iter().map(|c|&c.column_name).collect_vec()
368 ),
369 }
370 .fail()?;
371 }
372 Ok(row.into())
373 })
374 .collect::<Result<Vec<_>, Error>>()?;
375 let table_name = table_name.last().unwrap().clone();
376 let req = RowInsertRequest {
377 table_name,
378 rows: Some(v1::Rows {
379 schema: proto_schema.clone(),
380 rows: rows_proto,
381 }),
382 };
383 req_cnt += 1;
384 self.frontend_invoker
385 .read()
386 .await
387 .as_ref()
388 .with_context(|| UnexpectedSnafu {
389 reason: "Expect a frontend invoker for flownode to write back",
390 })?
391 .row_inserts(RowInsertRequests { inserts: vec![req] }, ctx.clone())
392 .await
393 .map_err(BoxedError::new)
394 .with_context(|_| ExternalSnafu {})?;
395 }
396 DiffRequest::Delete(remove) => {
397 info!("original remove rows={:?}", remove);
398 let rows_proto: Vec<v1::Row> = remove
399 .into_iter()
400 .map(|(mut row, _ts)| {
401 row.extend(Some(Value::from(
402 common_time::Timestamp::new_millisecond(0),
403 )));
404 row.into()
405 })
406 .collect::<Vec<_>>();
407 let table_name = table_name.last().unwrap().clone();
408 let req = RowDeleteRequest {
409 table_name,
410 rows: Some(v1::Rows {
411 schema: proto_schema.clone(),
412 rows: rows_proto,
413 }),
414 };
415
416 req_cnt += 1;
417 self.frontend_invoker
418 .read()
419 .await
420 .as_ref()
421 .with_context(|| UnexpectedSnafu {
422 reason: "Expect a frontend invoker for flownode to write back",
423 })?
424 .row_deletes(RowDeleteRequests { deletes: vec![req] }, ctx.clone())
425 .await
426 .map_err(BoxedError::new)
427 .with_context(|_| ExternalSnafu {})?;
428 }
429 }
430 }
431 }
432 Ok(req_cnt)
433 }
434
435 pub async fn generate_writeback_request(
437 &self,
438 ) -> Result<BTreeMap<TableName, Vec<DiffRequest>>, Error> {
439 trace!("Start to generate writeback request");
440 let mut output = BTreeMap::new();
441 let mut total_row_count = 0;
442 for (name, sink_recv) in self
443 .node_context
444 .write()
445 .await
446 .sink_receiver
447 .iter_mut()
448 .map(|(n, (_s, r))| (n, r))
449 {
450 let mut batches = Vec::new();
451 while let Ok(batch) = sink_recv.try_recv() {
452 total_row_count += batch.row_count();
453 batches.push(batch);
454 }
455 let reqs = batches_to_rows_req(batches)?;
456 output.insert(name.clone(), reqs);
457 }
458 trace!("Prepare writeback req: total row count={}", total_row_count);
459 Ok(output)
460 }
461
462 async fn fetch_table_pk_schema(
464 &self,
465 table_name: &TableName,
466 ) -> Result<Option<(Vec<String>, Option<usize>, Vec<ColumnSchema>)>, Error> {
467 if let Some(table_id) = self
468 .table_info_source
469 .get_opt_table_id_from_name(table_name)
470 .await?
471 {
472 let table_info = self
473 .table_info_source
474 .get_table_info_value(&table_id)
475 .await?
476 .unwrap();
477 let meta = table_info.table_info.meta;
478 let schema = meta.schema.column_schemas().to_vec();
479 let primary_keys = meta
480 .primary_key_indices
481 .into_iter()
482 .map(|i| schema[i].name.clone())
483 .collect_vec();
484 let time_index = meta.schema.timestamp_index();
485 Ok(Some((primary_keys, time_index, schema)))
486 } else {
487 Ok(None)
488 }
489 }
490
491 async fn adjust_auto_created_table_schema(
496 &self,
497 schema: &RelationDesc,
498 ) -> Result<(Vec<String>, Vec<ColumnSchema>, bool), Error> {
499 let primary_keys = schema
503 .typ()
504 .keys
505 .first()
506 .map(|v| {
507 v.column_indices
508 .iter()
509 .map(|i| {
510 schema
511 .get_name(*i)
512 .clone()
513 .unwrap_or_else(|| format!("col_{i}"))
514 })
515 .collect_vec()
516 })
517 .unwrap_or_default();
518 let update_at = ColumnSchema::new(
519 AUTO_CREATED_UPDATE_AT_TS_COL,
520 ConcreteDataType::timestamp_millisecond_datatype(),
521 true,
522 );
523
524 let original_schema = relation_desc_to_column_schemas_with_fallback(schema);
525
526 let mut with_auto_added_col = original_schema.clone();
527 with_auto_added_col.push(update_at);
528
529 let no_time_index = schema.typ().time_index.is_none();
531 if no_time_index {
532 let ts_col = ColumnSchema::new(
533 AUTO_CREATED_PLACEHOLDER_TS_COL,
534 ConcreteDataType::timestamp_millisecond_datatype(),
535 true,
536 )
537 .with_time_index(true);
538 with_auto_added_col.push(ts_col);
539 }
540
541 Ok((primary_keys, with_auto_added_col, no_time_index))
542 }
543}
544
545impl StreamingEngine {
547 pub fn run_background(
549 self: Arc<Self>,
550 shutdown: Option<broadcast::Receiver<()>>,
551 ) -> JoinHandle<()> {
552 info!("Starting flownode manager's background task");
553 common_runtime::spawn_global(async move { self.run(shutdown).await })
554 }
555
556 pub async fn log_all_errors(&self) {
558 for (f_id, f_err) in self.flow_err_collectors.read().await.iter() {
559 let all_errors = f_err.get_all().await;
560 if !all_errors.is_empty() {
561 let all_errors = all_errors
562 .into_iter()
563 .map(|i| format!("{:?}", i))
564 .join("\n");
565 common_telemetry::error!("Flow {} has following errors: {}", f_id, all_errors);
566 }
567 }
568 }
569
570 pub async fn run(&self, mut shutdown: Option<broadcast::Receiver<()>>) {
574 debug!("Starting to run");
575 let default_interval = Duration::from_secs(1);
576 let mut tick_interval = tokio::time::interval(default_interval);
577 tick_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Burst);
579 let mut avg_spd = 0; let mut since_last_run = tokio::time::Instant::now();
581 let run_per_trace = 10;
582 let mut run_cnt = 0;
583 loop {
584 let row_cnt = self.run_available(false).await.unwrap_or_else(|err| {
586 common_telemetry::error!(err;"Run available errors");
587 0
588 });
589
590 if let Err(err) = self.send_writeback_requests().await {
591 common_telemetry::error!(err;"Send writeback request errors");
592 };
593 self.log_all_errors().await;
594
595 match &shutdown.as_mut().map(|s| s.try_recv()) {
597 Some(Ok(())) => {
598 info!("Shutdown flow's main loop");
599 break;
600 }
601 Some(Err(TryRecvError::Empty)) => (),
602 Some(Err(TryRecvError::Closed)) => {
603 common_telemetry::error!("Shutdown channel is closed");
604 break;
605 }
606 Some(Err(TryRecvError::Lagged(num))) => {
607 common_telemetry::error!(
608 "Shutdown channel is lagged by {}, meaning multiple shutdown cmd have been issued",
609 num
610 );
611 break;
612 }
613 None => (),
614 }
615
616 let wait_for = since_last_run.elapsed();
619
620 let cur_spd = row_cnt * 1000 / wait_for.as_millis().max(1) as usize;
622 avg_spd = if cur_spd > avg_spd {
624 cur_spd
625 } else {
626 (9 * avg_spd + cur_spd) / 10
627 };
628 let new_wait = BATCH_SIZE * 1000 / avg_spd.max(1); let new_wait = Duration::from_millis(new_wait as u64).min(default_interval);
630
631 if run_cnt >= run_per_trace {
634 trace!("avg_spd={} r/s, cur_spd={} r/s", avg_spd, cur_spd);
635 trace!("Wait for {} ms, row_cnt={}", new_wait.as_millis(), row_cnt);
636 run_cnt = 0;
637 } else {
638 run_cnt += 1;
639 }
640
641 METRIC_FLOW_RUN_INTERVAL_MS.set(new_wait.as_millis() as i64);
642 since_last_run = tokio::time::Instant::now();
643 tokio::select! {
644 _ = tick_interval.tick() => (),
645 _ = tokio::time::sleep(new_wait) => ()
646 }
647 }
648 self.frontend_invoker.write().await.take();
652 }
653
654 pub async fn run_available(&self, blocking: bool) -> Result<usize, Error> {
662 let mut row_cnt = 0;
663
664 let now = self.tick_manager.tick();
665 for worker in self.worker_handles.iter() {
666 worker.run_available(now, blocking).await?;
668 }
669 let flush_res = if blocking {
671 let ctx = self.node_context.read().await;
672 ctx.flush_all_sender().await
673 } else {
674 match self.node_context.try_read() {
675 Ok(ctx) => ctx.flush_all_sender().await,
676 Err(_) => return Ok(row_cnt),
677 }
678 };
679 match flush_res {
680 Ok(r) => {
681 common_telemetry::trace!("Total flushed {} rows", r);
682 row_cnt += r;
683 }
684 Err(err) => {
685 common_telemetry::error!("Flush send buf errors: {:?}", err);
686 }
687 };
688
689 Ok(row_cnt)
690 }
691
692 pub async fn handle_write_request(
694 &self,
695 region_id: RegionId,
696 rows: Vec<DiffRow>,
697 batch_datatypes: &[ConcreteDataType],
698 ) -> Result<(), Error> {
699 let rows_len = rows.len();
700 let table_id = region_id.table_id();
701 let _timer = METRIC_FLOW_INSERT_ELAPSED
702 .with_label_values(&[table_id.to_string().as_str()])
703 .start_timer();
704 self.node_context
705 .read()
706 .await
707 .send(table_id, rows, batch_datatypes)
708 .await?;
709 trace!(
710 "Handling write request for table_id={} with {} rows",
711 table_id, rows_len
712 );
713 Ok(())
714 }
715}
716
717impl StreamingEngine {
719 pub async fn remove_flow_inner(&self, flow_id: FlowId) -> Result<(), Error> {
721 for handle in self.worker_handles.iter() {
722 if handle.contains_flow(flow_id).await? {
723 handle.remove_flow(flow_id).await?;
724 break;
725 }
726 }
727 self.node_context.write().await.remove_flow(flow_id);
728 Ok(())
729 }
730
731 pub async fn create_flow_inner(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
737 let CreateFlowArgs {
738 flow_id,
739 sink_table_name,
740 source_table_ids,
741 create_if_not_exists,
742 or_replace,
743 expire_after,
744 eval_interval: _,
745 comment,
746 sql,
747 flow_options,
748 query_ctx,
749 } = args;
750
751 let mut node_ctx = self.node_context.write().await;
752 for source in &source_table_ids {
754 node_ctx
755 .assign_global_id_to_table(&self.table_info_source, None, Some(*source))
756 .await?;
757 }
758 node_ctx
759 .assign_global_id_to_table(&self.table_info_source, Some(sink_table_name.clone()), None)
760 .await?;
761
762 node_ctx.register_task_src_sink(flow_id, &source_table_ids, sink_table_name.clone());
763
764 node_ctx.query_context = query_ctx.map(Arc::new);
765 let flow_plan = sql_to_flow_plan(&mut node_ctx, &self.query_engine, &sql).await?;
767
768 debug!("Flow {:?}'s Plan is {:?}", flow_id, flow_plan);
769
770 if let Some((_, _, real_schema)) = self.fetch_table_pk_schema(&sink_table_name).await? {
773 let auto_schema = relation_desc_to_column_schemas_with_fallback(&flow_plan.schema);
774
775 for (idx, zipped) in auto_schema
779 .iter()
780 .zip_longest(real_schema.iter())
781 .enumerate()
782 {
783 match zipped {
784 EitherOrBoth::Both(auto, real) => {
785 if auto.data_type != real.data_type {
786 InvalidQuerySnafu {
787 reason: format!(
788 "Column {}(name is '{}', flow inferred name is '{}')'s data type mismatch, expect {:?} got {:?}",
789 idx,
790 real.name,
791 auto.name,
792 real.data_type,
793 auto.data_type
794 ),
795 }
796 .fail()?;
797 }
798 }
799 EitherOrBoth::Right(real) if real.data_type.is_timestamp() => {
800 continue;
802 }
803 _ => InvalidQuerySnafu {
804 reason: format!(
805 "schema length mismatched, expected {} found {}",
806 real_schema.len(),
807 auto_schema.len()
808 ),
809 }
810 .fail()?,
811 }
812 }
813 } else {
814 let did_create = self
817 .create_table_from_relation(
818 &format!("flow-id={flow_id}"),
819 &sink_table_name,
820 &flow_plan.schema,
821 )
822 .await?;
823 if !did_create {
824 UnexpectedSnafu {
825 reason: format!("Failed to create table {:?}", sink_table_name),
826 }
827 .fail()?;
828 }
829 }
830
831 node_ctx.add_flow_plan(flow_id, flow_plan.clone());
832
833 let _ = comment;
834 let _ = flow_options;
835
836 let sink_id = node_ctx.table_repr.get_by_name(&sink_table_name).unwrap().1;
838 let sink_sender = node_ctx.get_sink_by_global_id(&sink_id)?;
839
840 let source_ids = source_table_ids
841 .iter()
842 .map(|id| node_ctx.table_repr.get_by_table_id(id).unwrap().1)
843 .collect_vec();
844 let source_receivers = source_ids
845 .iter()
846 .map(|id| {
847 node_ctx
848 .get_source_by_global_id(id)
849 .map(|s| s.get_receiver())
850 })
851 .collect::<Result<Vec<_>, _>>()?;
852 let err_collector = ErrCollector::default();
853 self.flow_err_collectors
854 .write()
855 .await
856 .insert(flow_id, err_collector.clone());
857 let handle = self.get_worker_handle_for_create_flow().await;
859 let create_request = worker::Request::Create {
860 flow_id,
861 plan: flow_plan,
862 sink_id,
863 sink_sender,
864 source_ids,
865 src_recvs: source_receivers,
866 expire_after,
867 or_replace,
868 create_if_not_exists,
869 err_collector,
870 };
871
872 handle.create_flow(create_request).await?;
873 info!("Successfully create flow with id={}", flow_id);
874 Ok(Some(flow_id))
875 }
876
877 pub async fn flush_flow_inner(&self, flow_id: FlowId) -> Result<usize, Error> {
878 debug!("Starting to flush flow_id={:?}", flow_id);
879 drop(self.flush_lock.write().await);
882 let flushed_input_rows = self.node_context.read().await.flush_all_sender().await?;
883 let rows_send = self.run_available(true).await?;
884 let row = self.send_writeback_requests().await?;
885 debug!(
886 "Done to flush flow_id={:?} with {} input rows flushed, {} rows sent and {} output rows flushed",
887 flow_id, flushed_input_rows, rows_send, row
888 );
889 Ok(row)
890 }
891
892 pub async fn flow_exist_inner(&self, flow_id: FlowId) -> Result<bool, Error> {
893 let mut exist = false;
894 for handle in self.worker_handles.iter() {
895 if handle.contains_flow(flow_id).await? {
896 exist = true;
897 break;
898 }
899 }
900 Ok(exist)
901 }
902}
903
904#[derive(Clone, Debug)]
909pub struct FlowTickManager {
910 start: Instant,
912 start_timestamp: repr::Timestamp,
914}
915
916impl Default for FlowTickManager {
917 fn default() -> Self {
918 Self::new()
919 }
920}
921
922impl FlowTickManager {
923 pub fn new() -> Self {
924 FlowTickManager {
925 start: Instant::now(),
926 start_timestamp: SystemTime::now()
927 .duration_since(SystemTime::UNIX_EPOCH)
928 .unwrap()
929 .as_millis() as repr::Timestamp,
930 }
931 }
932
933 pub fn tick(&self) -> repr::Timestamp {
937 let current = Instant::now();
938 let since_the_epoch = current - self.start;
939 since_the_epoch.as_millis() as repr::Timestamp + self.start_timestamp
940 }
941}