1#![warn(unused_imports)]
18
19use std::collections::BTreeMap;
20use std::sync::Arc;
21use std::time::{Duration, Instant, SystemTime};
22
23use api::v1::{RowDeleteRequest, RowDeleteRequests, RowInsertRequest, RowInsertRequests};
24use common_config::Configurable;
25use common_error::ext::BoxedError;
26use common_meta::key::TableMetadataManagerRef;
27use common_runtime::JoinHandle;
28use common_telemetry::logging::{LoggingOptions, TracingOptions};
29use common_telemetry::{debug, info, trace};
30use datatypes::schema::ColumnSchema;
31use datatypes::value::Value;
32use greptime_proto::v1;
33use itertools::{EitherOrBoth, Itertools};
34use meta_client::MetaClientOptions;
35use query::options::QueryOptions;
36use query::QueryEngine;
37use serde::{Deserialize, Serialize};
38use servers::grpc::GrpcOptions;
39use servers::heartbeat_options::HeartbeatOptions;
40use servers::http::HttpOptions;
41use session::context::QueryContext;
42use snafu::{ensure, OptionExt, ResultExt};
43use store_api::storage::{ConcreteDataType, RegionId};
44use table::metadata::TableId;
45use tokio::sync::broadcast::error::TryRecvError;
46use tokio::sync::{broadcast, watch, Mutex, RwLock};
47
48pub(crate) use crate::adapter::node_context::FlownodeContext;
49use crate::adapter::refill::RefillTask;
50use crate::adapter::table_source::ManagedTableSource;
51use crate::adapter::util::relation_desc_to_column_schemas_with_fallback;
52pub(crate) use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
53use crate::compute::ErrCollector;
54use crate::df_optimizer::sql_to_flow_plan;
55use crate::error::{EvalSnafu, ExternalSnafu, InternalSnafu, InvalidQuerySnafu, UnexpectedSnafu};
56use crate::expr::Batch;
57use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_ROWS, METRIC_FLOW_RUN_INTERVAL_MS};
58use crate::repr::{self, DiffRow, RelationDesc, Row, BATCH_SIZE};
59use crate::{CreateFlowArgs, FlowId, TableName};
60
61pub(crate) mod flownode_impl;
62mod parse_expr;
63pub(crate) mod refill;
64mod stat;
65#[cfg(test)]
66mod tests;
67pub(crate) mod util;
68mod worker;
69
70pub(crate) mod node_context;
71pub(crate) mod table_source;
72
73use crate::error::Error;
74use crate::utils::StateReportHandler;
75use crate::FrontendInvoker;
76
77pub const AUTO_CREATED_PLACEHOLDER_TS_COL: &str = "__ts_placeholder";
79
80pub const AUTO_CREATED_UPDATE_AT_TS_COL: &str = "update_at";
81
82#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
84#[serde(default)]
85pub struct FlowConfig {
86 pub num_workers: usize,
87}
88
89impl Default for FlowConfig {
90 fn default() -> Self {
91 Self {
92 num_workers: (common_config::utils::get_cpus() / 2).max(1),
93 }
94 }
95}
96
97#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
99#[serde(default)]
100pub struct FlownodeOptions {
101 pub node_id: Option<u64>,
102 pub flow: FlowConfig,
103 pub grpc: GrpcOptions,
104 pub http: HttpOptions,
105 pub meta_client: Option<MetaClientOptions>,
106 pub logging: LoggingOptions,
107 pub tracing: TracingOptions,
108 pub heartbeat: HeartbeatOptions,
109 pub query: QueryOptions,
110 pub user_provider: Option<String>,
111}
112
113impl Default for FlownodeOptions {
114 fn default() -> Self {
115 Self {
116 node_id: None,
117 flow: FlowConfig::default(),
118 grpc: GrpcOptions::default().with_bind_addr("127.0.0.1:3004"),
119 http: HttpOptions::default(),
120 meta_client: None,
121 logging: LoggingOptions::default(),
122 tracing: TracingOptions::default(),
123 heartbeat: HeartbeatOptions::default(),
124 query: QueryOptions { parallelism: 1 },
127 user_provider: None,
128 }
129 }
130}
131
132impl Configurable for FlownodeOptions {
133 fn validate_sanitize(&mut self) -> common_config::error::Result<()> {
134 if self.flow.num_workers == 0 {
135 self.flow.num_workers = (common_config::utils::get_cpus() / 2).max(1);
136 }
137 Ok(())
138 }
139}
140
141pub type FlowStreamingEngineRef = Arc<StreamingEngine>;
143
144pub struct StreamingEngine {
149 pub worker_handles: Vec<WorkerHandle>,
152 worker_selector: Mutex<usize>,
154 pub query_engine: Arc<dyn QueryEngine>,
156 table_info_source: ManagedTableSource,
158 frontend_invoker: RwLock<Option<FrontendInvoker>>,
159 node_context: RwLock<FlownodeContext>,
161 refill_tasks: RwLock<BTreeMap<FlowId, RefillTask>>,
163 flow_err_collectors: RwLock<BTreeMap<FlowId, ErrCollector>>,
164 src_send_buf_lens: RwLock<BTreeMap<TableId, watch::Receiver<usize>>>,
165 tick_manager: FlowTickManager,
166 pub node_id: Option<u32>,
168 flush_lock: RwLock<()>,
172 state_report_handler: RwLock<Option<StateReportHandler>>,
174}
175
176impl StreamingEngine {
178 pub async fn set_frontend_invoker(&self, frontend: FrontendInvoker) {
180 *self.frontend_invoker.write().await = Some(frontend);
181 }
182
183 pub fn new(
185 node_id: Option<u32>,
186 query_engine: Arc<dyn QueryEngine>,
187 table_meta: TableMetadataManagerRef,
188 ) -> Self {
189 let srv_map = ManagedTableSource::new(
190 table_meta.table_info_manager().clone(),
191 table_meta.table_name_manager().clone(),
192 );
193 let node_context = FlownodeContext::new(Box::new(srv_map.clone()) as _);
194 let tick_manager = FlowTickManager::new();
195 let worker_handles = Vec::new();
196 StreamingEngine {
197 worker_handles,
198 worker_selector: Mutex::new(0),
199 query_engine,
200 table_info_source: srv_map,
201 frontend_invoker: RwLock::new(None),
202 node_context: RwLock::new(node_context),
203 refill_tasks: Default::default(),
204 flow_err_collectors: Default::default(),
205 src_send_buf_lens: Default::default(),
206 tick_manager,
207 node_id,
208 flush_lock: RwLock::new(()),
209 state_report_handler: RwLock::new(None),
210 }
211 }
212
213 pub async fn with_state_report_handler(self, handler: StateReportHandler) -> Self {
214 *self.state_report_handler.write().await = Some(handler);
215 self
216 }
217
218 pub fn new_with_workers<'s>(
220 node_id: Option<u32>,
221 query_engine: Arc<dyn QueryEngine>,
222 table_meta: TableMetadataManagerRef,
223 num_workers: usize,
224 ) -> (Self, Vec<Worker<'s>>) {
225 let mut zelf = Self::new(node_id, query_engine, table_meta);
226
227 let workers: Vec<_> = (0..num_workers)
228 .map(|_| {
229 let (handle, worker) = create_worker();
230 zelf.add_worker_handle(handle);
231 worker
232 })
233 .collect();
234 (zelf, workers)
235 }
236
237 pub fn add_worker_handle(&mut self, handle: WorkerHandle) {
239 self.worker_handles.push(handle);
240 }
241}
242
243#[derive(Debug)]
244pub enum DiffRequest {
245 Insert(Vec<(Row, repr::Timestamp)>),
246 Delete(Vec<(Row, repr::Timestamp)>),
247}
248
249impl DiffRequest {
250 pub fn len(&self) -> usize {
251 match self {
252 Self::Insert(v) => v.len(),
253 Self::Delete(v) => v.len(),
254 }
255 }
256
257 pub fn is_empty(&self) -> bool {
258 self.len() == 0
259 }
260}
261
262pub fn batches_to_rows_req(batches: Vec<Batch>) -> Result<Vec<DiffRequest>, Error> {
263 let mut reqs = Vec::new();
264 for batch in batches {
265 let mut rows = Vec::with_capacity(batch.row_count());
266 for i in 0..batch.row_count() {
267 let row = batch.get_row(i).context(EvalSnafu)?;
268 rows.push((Row::new(row), 0));
269 }
270 reqs.push(DiffRequest::Insert(rows));
271 }
272 Ok(reqs)
273}
274
275impl StreamingEngine {
277 pub async fn send_writeback_requests(&self) -> Result<usize, Error> {
279 let all_reqs = self.generate_writeback_request().await?;
280 if all_reqs.is_empty() || all_reqs.iter().all(|v| v.1.is_empty()) {
281 return Ok(0);
282 }
283 let mut req_cnt = 0;
284 for (table_name, reqs) in all_reqs {
285 if reqs.is_empty() {
286 continue;
287 }
288 let (catalog, schema) = (table_name[0].clone(), table_name[1].clone());
289 let ctx = Arc::new(QueryContext::with(&catalog, &schema));
290
291 let (is_ts_placeholder, proto_schema) = match self
292 .try_fetch_existing_table(&table_name)
293 .await?
294 .context(UnexpectedSnafu {
295 reason: format!("Table not found: {}", table_name.join(".")),
296 }) {
297 Ok(r) => r,
298 Err(e) => {
299 if self
300 .table_info_source
301 .get_opt_table_id_from_name(&table_name)
302 .await?
303 .is_none()
304 {
305 common_telemetry::warn!(e; "Table `{}` no longer exists, skip writeback", table_name.join("."));
308 continue;
309 } else {
310 return Err(e);
311 }
312 }
313 };
314 let schema_len = proto_schema.len();
315
316 let total_rows = reqs.iter().map(|r| r.len()).sum::<usize>();
317 trace!(
318 "Sending {} writeback requests to table {}, reqs total rows={}",
319 reqs.len(),
320 table_name.join("."),
321 reqs.iter().map(|r| r.len()).sum::<usize>()
322 );
323
324 METRIC_FLOW_ROWS
325 .with_label_values(&["out-streaming"])
326 .inc_by(total_rows as u64);
327
328 let now = self.tick_manager.tick();
329 for req in reqs {
330 match req {
331 DiffRequest::Insert(insert) => {
332 let rows_proto: Vec<v1::Row> = insert
333 .into_iter()
334 .map(|(mut row, _ts)| {
335 if row.len() < proto_schema.len()
338 && proto_schema[row.len()].datatype
339 == greptime_proto::v1::ColumnDataType::TimestampMillisecond
340 as i32
341 {
342 row.extend([Value::from(
343 common_time::Timestamp::new_millisecond(now),
344 )]);
345 }
346 if is_ts_placeholder {
348 ensure!(
349 row.len() == schema_len - 1,
350 InternalSnafu {
351 reason: format!(
352 "Row len mismatch, expect {} got {}",
353 schema_len - 1,
354 row.len()
355 )
356 }
357 );
358 row.extend([Value::from(
359 common_time::Timestamp::new_millisecond(0),
360 )]);
361 }
362 if row.len() != proto_schema.len() {
363 UnexpectedSnafu {
364 reason: format!(
365 "Flow output row length mismatch, expect {} got {}, the columns in schema are: {:?}",
366 proto_schema.len(),
367 row.len(),
368 proto_schema.iter().map(|c|&c.column_name).collect_vec()
369 ),
370 }
371 .fail()?;
372 }
373 Ok(row.into())
374 })
375 .collect::<Result<Vec<_>, Error>>()?;
376 let table_name = table_name.last().unwrap().clone();
377 let req = RowInsertRequest {
378 table_name,
379 rows: Some(v1::Rows {
380 schema: proto_schema.clone(),
381 rows: rows_proto,
382 }),
383 };
384 req_cnt += 1;
385 self.frontend_invoker
386 .read()
387 .await
388 .as_ref()
389 .with_context(|| UnexpectedSnafu {
390 reason: "Expect a frontend invoker for flownode to write back",
391 })?
392 .row_inserts(RowInsertRequests { inserts: vec![req] }, ctx.clone())
393 .await
394 .map_err(BoxedError::new)
395 .with_context(|_| ExternalSnafu {})?;
396 }
397 DiffRequest::Delete(remove) => {
398 info!("original remove rows={:?}", remove);
399 let rows_proto: Vec<v1::Row> = remove
400 .into_iter()
401 .map(|(mut row, _ts)| {
402 row.extend(Some(Value::from(
403 common_time::Timestamp::new_millisecond(0),
404 )));
405 row.into()
406 })
407 .collect::<Vec<_>>();
408 let table_name = table_name.last().unwrap().clone();
409 let req = RowDeleteRequest {
410 table_name,
411 rows: Some(v1::Rows {
412 schema: proto_schema.clone(),
413 rows: rows_proto,
414 }),
415 };
416
417 req_cnt += 1;
418 self.frontend_invoker
419 .read()
420 .await
421 .as_ref()
422 .with_context(|| UnexpectedSnafu {
423 reason: "Expect a frontend invoker for flownode to write back",
424 })?
425 .row_deletes(RowDeleteRequests { deletes: vec![req] }, ctx.clone())
426 .await
427 .map_err(BoxedError::new)
428 .with_context(|_| ExternalSnafu {})?;
429 }
430 }
431 }
432 }
433 Ok(req_cnt)
434 }
435
436 pub async fn generate_writeback_request(
438 &self,
439 ) -> Result<BTreeMap<TableName, Vec<DiffRequest>>, Error> {
440 trace!("Start to generate writeback request");
441 let mut output = BTreeMap::new();
442 let mut total_row_count = 0;
443 for (name, sink_recv) in self
444 .node_context
445 .write()
446 .await
447 .sink_receiver
448 .iter_mut()
449 .map(|(n, (_s, r))| (n, r))
450 {
451 let mut batches = Vec::new();
452 while let Ok(batch) = sink_recv.try_recv() {
453 total_row_count += batch.row_count();
454 batches.push(batch);
455 }
456 let reqs = batches_to_rows_req(batches)?;
457 output.insert(name.clone(), reqs);
458 }
459 trace!("Prepare writeback req: total row count={}", total_row_count);
460 Ok(output)
461 }
462
463 async fn fetch_table_pk_schema(
465 &self,
466 table_name: &TableName,
467 ) -> Result<Option<(Vec<String>, Option<usize>, Vec<ColumnSchema>)>, Error> {
468 if let Some(table_id) = self
469 .table_info_source
470 .get_opt_table_id_from_name(table_name)
471 .await?
472 {
473 let table_info = self
474 .table_info_source
475 .get_table_info_value(&table_id)
476 .await?
477 .unwrap();
478 let meta = table_info.table_info.meta;
479 let primary_keys = meta
480 .primary_key_indices
481 .into_iter()
482 .map(|i| meta.schema.column_schemas[i].name.clone())
483 .collect_vec();
484 let schema = meta.schema.column_schemas;
485 let time_index = meta.schema.timestamp_index;
486 Ok(Some((primary_keys, time_index, schema)))
487 } else {
488 Ok(None)
489 }
490 }
491
492 async fn adjust_auto_created_table_schema(
497 &self,
498 schema: &RelationDesc,
499 ) -> Result<(Vec<String>, Vec<ColumnSchema>, bool), Error> {
500 let primary_keys = schema
504 .typ()
505 .keys
506 .first()
507 .map(|v| {
508 v.column_indices
509 .iter()
510 .map(|i| {
511 schema
512 .get_name(*i)
513 .clone()
514 .unwrap_or_else(|| format!("col_{i}"))
515 })
516 .collect_vec()
517 })
518 .unwrap_or_default();
519 let update_at = ColumnSchema::new(
520 AUTO_CREATED_UPDATE_AT_TS_COL,
521 ConcreteDataType::timestamp_millisecond_datatype(),
522 true,
523 );
524
525 let original_schema = relation_desc_to_column_schemas_with_fallback(schema);
526
527 let mut with_auto_added_col = original_schema.clone();
528 with_auto_added_col.push(update_at);
529
530 let no_time_index = schema.typ().time_index.is_none();
532 if no_time_index {
533 let ts_col = ColumnSchema::new(
534 AUTO_CREATED_PLACEHOLDER_TS_COL,
535 ConcreteDataType::timestamp_millisecond_datatype(),
536 true,
537 )
538 .with_time_index(true);
539 with_auto_added_col.push(ts_col);
540 }
541
542 Ok((primary_keys, with_auto_added_col, no_time_index))
543 }
544}
545
546impl StreamingEngine {
548 async fn start_state_report_handler(self: Arc<Self>) -> Option<JoinHandle<()>> {
552 let state_report_handler = self.state_report_handler.write().await.take();
553 if let Some(mut handler) = state_report_handler {
554 let zelf = self.clone();
555 let handler = common_runtime::spawn_global(async move {
556 while let Some(ret_handler) = handler.recv().await {
557 let state_report = zelf.gen_state_report().await;
558 ret_handler.send(state_report).unwrap_or_else(|err| {
559 common_telemetry::error!(err; "Send state size report error");
560 });
561 }
562 });
563 Some(handler)
564 } else {
565 None
566 }
567 }
568
569 pub fn run_background(
571 self: Arc<Self>,
572 shutdown: Option<broadcast::Receiver<()>>,
573 ) -> JoinHandle<()> {
574 info!("Starting flownode manager's background task");
575 common_runtime::spawn_global(async move {
576 let _state_report_handler = self.clone().start_state_report_handler().await;
577 self.run(shutdown).await;
578 })
579 }
580
581 pub async fn log_all_errors(&self) {
583 for (f_id, f_err) in self.flow_err_collectors.read().await.iter() {
584 let all_errors = f_err.get_all().await;
585 if !all_errors.is_empty() {
586 let all_errors = all_errors
587 .into_iter()
588 .map(|i| format!("{:?}", i))
589 .join("\n");
590 common_telemetry::error!("Flow {} has following errors: {}", f_id, all_errors);
591 }
592 }
593 }
594
595 pub async fn run(&self, mut shutdown: Option<broadcast::Receiver<()>>) {
599 debug!("Starting to run");
600 let default_interval = Duration::from_secs(1);
601 let mut tick_interval = tokio::time::interval(default_interval);
602 tick_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Burst);
604 let mut avg_spd = 0; let mut since_last_run = tokio::time::Instant::now();
606 let run_per_trace = 10;
607 let mut run_cnt = 0;
608 loop {
609 let row_cnt = self.run_available(false).await.unwrap_or_else(|err| {
611 common_telemetry::error!(err;"Run available errors");
612 0
613 });
614
615 if let Err(err) = self.send_writeback_requests().await {
616 common_telemetry::error!(err;"Send writeback request errors");
617 };
618 self.log_all_errors().await;
619
620 match &shutdown.as_mut().map(|s| s.try_recv()) {
622 Some(Ok(())) => {
623 info!("Shutdown flow's main loop");
624 break;
625 }
626 Some(Err(TryRecvError::Empty)) => (),
627 Some(Err(TryRecvError::Closed)) => {
628 common_telemetry::error!("Shutdown channel is closed");
629 break;
630 }
631 Some(Err(TryRecvError::Lagged(num))) => {
632 common_telemetry::error!("Shutdown channel is lagged by {}, meaning multiple shutdown cmd have been issued", num);
633 break;
634 }
635 None => (),
636 }
637
638 let wait_for = since_last_run.elapsed();
641
642 let cur_spd = row_cnt * 1000 / wait_for.as_millis().max(1) as usize;
644 avg_spd = if cur_spd > avg_spd {
646 cur_spd
647 } else {
648 (9 * avg_spd + cur_spd) / 10
649 };
650 let new_wait = BATCH_SIZE * 1000 / avg_spd.max(1); let new_wait = Duration::from_millis(new_wait as u64).min(default_interval);
652
653 if run_cnt >= run_per_trace {
656 trace!("avg_spd={} r/s, cur_spd={} r/s", avg_spd, cur_spd);
657 trace!("Wait for {} ms, row_cnt={}", new_wait.as_millis(), row_cnt);
658 run_cnt = 0;
659 } else {
660 run_cnt += 1;
661 }
662
663 METRIC_FLOW_RUN_INTERVAL_MS.set(new_wait.as_millis() as i64);
664 since_last_run = tokio::time::Instant::now();
665 tokio::select! {
666 _ = tick_interval.tick() => (),
667 _ = tokio::time::sleep(new_wait) => ()
668 }
669 }
670 self.frontend_invoker.write().await.take();
674 }
675
676 pub async fn run_available(&self, blocking: bool) -> Result<usize, Error> {
684 let mut row_cnt = 0;
685
686 let now = self.tick_manager.tick();
687 for worker in self.worker_handles.iter() {
688 worker.run_available(now, blocking).await?;
690 }
691 let flush_res = if blocking {
693 let ctx = self.node_context.read().await;
694 ctx.flush_all_sender().await
695 } else {
696 match self.node_context.try_read() {
697 Ok(ctx) => ctx.flush_all_sender().await,
698 Err(_) => return Ok(row_cnt),
699 }
700 };
701 match flush_res {
702 Ok(r) => {
703 common_telemetry::trace!("Total flushed {} rows", r);
704 row_cnt += r;
705 }
706 Err(err) => {
707 common_telemetry::error!("Flush send buf errors: {:?}", err);
708 }
709 };
710
711 Ok(row_cnt)
712 }
713
714 pub async fn handle_write_request(
716 &self,
717 region_id: RegionId,
718 rows: Vec<DiffRow>,
719 batch_datatypes: &[ConcreteDataType],
720 ) -> Result<(), Error> {
721 let rows_len = rows.len();
722 let table_id = region_id.table_id();
723 let _timer = METRIC_FLOW_INSERT_ELAPSED
724 .with_label_values(&[table_id.to_string().as_str()])
725 .start_timer();
726 self.node_context
727 .read()
728 .await
729 .send(table_id, rows, batch_datatypes)
730 .await?;
731 trace!(
732 "Handling write request for table_id={} with {} rows",
733 table_id,
734 rows_len
735 );
736 Ok(())
737 }
738}
739
740impl StreamingEngine {
742 pub async fn remove_flow_inner(&self, flow_id: FlowId) -> Result<(), Error> {
744 for handle in self.worker_handles.iter() {
745 if handle.contains_flow(flow_id).await? {
746 handle.remove_flow(flow_id).await?;
747 break;
748 }
749 }
750 self.node_context.write().await.remove_flow(flow_id);
751 Ok(())
752 }
753
754 pub async fn create_flow_inner(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
760 let CreateFlowArgs {
761 flow_id,
762 sink_table_name,
763 source_table_ids,
764 create_if_not_exists,
765 or_replace,
766 expire_after,
767 comment,
768 sql,
769 flow_options,
770 query_ctx,
771 } = args;
772
773 let mut node_ctx = self.node_context.write().await;
774 for source in &source_table_ids {
776 node_ctx
777 .assign_global_id_to_table(&self.table_info_source, None, Some(*source))
778 .await?;
779 }
780 node_ctx
781 .assign_global_id_to_table(&self.table_info_source, Some(sink_table_name.clone()), None)
782 .await?;
783
784 node_ctx.register_task_src_sink(flow_id, &source_table_ids, sink_table_name.clone());
785
786 node_ctx.query_context = query_ctx.map(Arc::new);
787 let flow_plan = sql_to_flow_plan(&mut node_ctx, &self.query_engine, &sql).await?;
789
790 debug!("Flow {:?}'s Plan is {:?}", flow_id, flow_plan);
791
792 if let Some((_, _, real_schema)) = self.fetch_table_pk_schema(&sink_table_name).await? {
795 let auto_schema = relation_desc_to_column_schemas_with_fallback(&flow_plan.schema);
796
797 for (idx, zipped) in auto_schema
801 .iter()
802 .zip_longest(real_schema.iter())
803 .enumerate()
804 {
805 match zipped {
806 EitherOrBoth::Both(auto, real) => {
807 if auto.data_type != real.data_type {
808 InvalidQuerySnafu {
809 reason: format!(
810 "Column {}(name is '{}', flow inferred name is '{}')'s data type mismatch, expect {:?} got {:?}",
811 idx,
812 real.name,
813 auto.name,
814 real.data_type,
815 auto.data_type
816 ),
817 }
818 .fail()?;
819 }
820 }
821 EitherOrBoth::Right(real) if real.data_type.is_timestamp() => {
822 continue;
824 }
825 _ => InvalidQuerySnafu {
826 reason: format!(
827 "schema length mismatched, expected {} found {}",
828 real_schema.len(),
829 auto_schema.len()
830 ),
831 }
832 .fail()?,
833 }
834 }
835 } else {
836 let did_create = self
839 .create_table_from_relation(
840 &format!("flow-id={flow_id}"),
841 &sink_table_name,
842 &flow_plan.schema,
843 )
844 .await?;
845 if !did_create {
846 UnexpectedSnafu {
847 reason: format!("Failed to create table {:?}", sink_table_name),
848 }
849 .fail()?;
850 }
851 }
852
853 node_ctx.add_flow_plan(flow_id, flow_plan.clone());
854
855 let _ = comment;
856 let _ = flow_options;
857
858 let sink_id = node_ctx.table_repr.get_by_name(&sink_table_name).unwrap().1;
860 let sink_sender = node_ctx.get_sink_by_global_id(&sink_id)?;
861
862 let source_ids = source_table_ids
863 .iter()
864 .map(|id| node_ctx.table_repr.get_by_table_id(id).unwrap().1)
865 .collect_vec();
866 let source_receivers = source_ids
867 .iter()
868 .map(|id| {
869 node_ctx
870 .get_source_by_global_id(id)
871 .map(|s| s.get_receiver())
872 })
873 .collect::<Result<Vec<_>, _>>()?;
874 let err_collector = ErrCollector::default();
875 self.flow_err_collectors
876 .write()
877 .await
878 .insert(flow_id, err_collector.clone());
879 let handle = self.get_worker_handle_for_create_flow().await;
881 let create_request = worker::Request::Create {
882 flow_id,
883 plan: flow_plan,
884 sink_id,
885 sink_sender,
886 source_ids,
887 src_recvs: source_receivers,
888 expire_after,
889 or_replace,
890 create_if_not_exists,
891 err_collector,
892 };
893
894 handle.create_flow(create_request).await?;
895 info!("Successfully create flow with id={}", flow_id);
896 Ok(Some(flow_id))
897 }
898
899 pub async fn flush_flow_inner(&self, flow_id: FlowId) -> Result<usize, Error> {
900 debug!("Starting to flush flow_id={:?}", flow_id);
901 drop(self.flush_lock.write().await);
904 let flushed_input_rows = self.node_context.read().await.flush_all_sender().await?;
905 let rows_send = self.run_available(true).await?;
906 let row = self.send_writeback_requests().await?;
907 debug!(
908 "Done to flush flow_id={:?} with {} input rows flushed, {} rows sent and {} output rows flushed",
909 flow_id, flushed_input_rows, rows_send, row
910 );
911 Ok(row)
912 }
913
914 pub async fn flow_exist_inner(&self, flow_id: FlowId) -> Result<bool, Error> {
915 let mut exist = false;
916 for handle in self.worker_handles.iter() {
917 if handle.contains_flow(flow_id).await? {
918 exist = true;
919 break;
920 }
921 }
922 Ok(exist)
923 }
924}
925
926#[derive(Clone, Debug)]
931pub struct FlowTickManager {
932 start: Instant,
934 start_timestamp: repr::Timestamp,
936}
937
938impl Default for FlowTickManager {
939 fn default() -> Self {
940 Self::new()
941 }
942}
943
944impl FlowTickManager {
945 pub fn new() -> Self {
946 FlowTickManager {
947 start: Instant::now(),
948 start_timestamp: SystemTime::now()
949 .duration_since(SystemTime::UNIX_EPOCH)
950 .unwrap()
951 .as_millis() as repr::Timestamp,
952 }
953 }
954
955 pub fn tick(&self) -> repr::Timestamp {
959 let current = Instant::now();
960 let since_the_epoch = current - self.start;
961 since_the_epoch.as_millis() as repr::Timestamp + self.start_timestamp
962 }
963}