1#![warn(unused_imports)]
18
19use std::collections::BTreeMap;
20use std::sync::Arc;
21use std::time::{Duration, Instant, SystemTime};
22
23use api::v1::{RowDeleteRequest, RowDeleteRequests, RowInsertRequest, RowInsertRequests};
24use common_config::Configurable;
25use common_error::ext::BoxedError;
26use common_meta::key::TableMetadataManagerRef;
27use common_runtime::JoinHandle;
28use common_telemetry::logging::{LoggingOptions, TracingOptions};
29use common_telemetry::{debug, info, trace};
30use datatypes::schema::ColumnSchema;
31use datatypes::value::Value;
32use greptime_proto::v1;
33use itertools::{EitherOrBoth, Itertools};
34use meta_client::MetaClientOptions;
35use query::options::QueryOptions;
36use query::QueryEngine;
37use serde::{Deserialize, Serialize};
38use servers::grpc::GrpcOptions;
39use servers::heartbeat_options::HeartbeatOptions;
40use servers::http::HttpOptions;
41use session::context::QueryContext;
42use snafu::{ensure, OptionExt, ResultExt};
43use store_api::storage::{ConcreteDataType, RegionId};
44use table::metadata::TableId;
45use tokio::sync::broadcast::error::TryRecvError;
46use tokio::sync::{broadcast, watch, Mutex, RwLock};
47
48pub(crate) use crate::adapter::node_context::FlownodeContext;
49use crate::adapter::refill::RefillTask;
50use crate::adapter::table_source::ManagedTableSource;
51use crate::adapter::util::relation_desc_to_column_schemas_with_fallback;
52pub(crate) use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
53use crate::batching_mode::BatchingModeOptions;
54use crate::compute::ErrCollector;
55use crate::df_optimizer::sql_to_flow_plan;
56use crate::error::{EvalSnafu, ExternalSnafu, InternalSnafu, InvalidQuerySnafu, UnexpectedSnafu};
57use crate::expr::Batch;
58use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_ROWS, METRIC_FLOW_RUN_INTERVAL_MS};
59use crate::repr::{self, DiffRow, RelationDesc, Row, BATCH_SIZE};
60use crate::{CreateFlowArgs, FlowId, TableName};
61
62pub(crate) mod flownode_impl;
63mod parse_expr;
64pub(crate) mod refill;
65mod stat;
66#[cfg(test)]
67mod tests;
68pub(crate) mod util;
69mod worker;
70
71pub(crate) mod node_context;
72pub(crate) mod table_source;
73
74use crate::error::Error;
75use crate::utils::StateReportHandler;
76use crate::FrontendInvoker;
77
78pub const AUTO_CREATED_PLACEHOLDER_TS_COL: &str = "__ts_placeholder";
80
81pub const AUTO_CREATED_UPDATE_AT_TS_COL: &str = "update_at";
82
83#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
85#[serde(default)]
86pub struct FlowConfig {
87 pub num_workers: usize,
88 pub batching_mode: BatchingModeOptions,
89}
90
91impl Default for FlowConfig {
92 fn default() -> Self {
93 Self {
94 num_workers: (common_config::utils::get_cpus() / 2).max(1),
95 batching_mode: BatchingModeOptions::default(),
96 }
97 }
98}
99
100#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
102#[serde(default)]
103pub struct FlownodeOptions {
104 pub node_id: Option<u64>,
105 pub flow: FlowConfig,
106 pub grpc: GrpcOptions,
107 pub http: HttpOptions,
108 pub meta_client: Option<MetaClientOptions>,
109 pub logging: LoggingOptions,
110 pub tracing: TracingOptions,
111 pub heartbeat: HeartbeatOptions,
112 pub query: QueryOptions,
113 pub user_provider: Option<String>,
114}
115
116impl Default for FlownodeOptions {
117 fn default() -> Self {
118 Self {
119 node_id: None,
120 flow: FlowConfig::default(),
121 grpc: GrpcOptions::default().with_bind_addr("127.0.0.1:3004"),
122 http: HttpOptions::default(),
123 meta_client: None,
124 logging: LoggingOptions::default(),
125 tracing: TracingOptions::default(),
126 heartbeat: HeartbeatOptions::default(),
127 query: QueryOptions { parallelism: 1 },
130 user_provider: None,
131 }
132 }
133}
134
135impl Configurable for FlownodeOptions {
136 fn validate_sanitize(&mut self) -> common_config::error::Result<()> {
137 if self.flow.num_workers == 0 {
138 self.flow.num_workers = (common_config::utils::get_cpus() / 2).max(1);
139 }
140 Ok(())
141 }
142}
143
144pub type FlowStreamingEngineRef = Arc<StreamingEngine>;
146
147pub struct StreamingEngine {
152 pub worker_handles: Vec<WorkerHandle>,
155 worker_selector: Mutex<usize>,
157 pub query_engine: Arc<dyn QueryEngine>,
159 table_info_source: ManagedTableSource,
161 frontend_invoker: RwLock<Option<FrontendInvoker>>,
162 node_context: RwLock<FlownodeContext>,
164 refill_tasks: RwLock<BTreeMap<FlowId, RefillTask>>,
166 flow_err_collectors: RwLock<BTreeMap<FlowId, ErrCollector>>,
167 src_send_buf_lens: RwLock<BTreeMap<TableId, watch::Receiver<usize>>>,
168 tick_manager: FlowTickManager,
169 pub node_id: Option<u32>,
171 flush_lock: RwLock<()>,
175 state_report_handler: RwLock<Option<StateReportHandler>>,
177}
178
179impl StreamingEngine {
181 pub async fn set_frontend_invoker(&self, frontend: FrontendInvoker) {
183 *self.frontend_invoker.write().await = Some(frontend);
184 }
185
186 pub fn new(
188 node_id: Option<u32>,
189 query_engine: Arc<dyn QueryEngine>,
190 table_meta: TableMetadataManagerRef,
191 ) -> Self {
192 let srv_map = ManagedTableSource::new(
193 table_meta.table_info_manager().clone(),
194 table_meta.table_name_manager().clone(),
195 );
196 let node_context = FlownodeContext::new(Box::new(srv_map.clone()) as _);
197 let tick_manager = FlowTickManager::new();
198 let worker_handles = Vec::new();
199 StreamingEngine {
200 worker_handles,
201 worker_selector: Mutex::new(0),
202 query_engine,
203 table_info_source: srv_map,
204 frontend_invoker: RwLock::new(None),
205 node_context: RwLock::new(node_context),
206 refill_tasks: Default::default(),
207 flow_err_collectors: Default::default(),
208 src_send_buf_lens: Default::default(),
209 tick_manager,
210 node_id,
211 flush_lock: RwLock::new(()),
212 state_report_handler: RwLock::new(None),
213 }
214 }
215
216 pub async fn with_state_report_handler(self, handler: StateReportHandler) -> Self {
217 *self.state_report_handler.write().await = Some(handler);
218 self
219 }
220
221 pub fn new_with_workers<'s>(
223 node_id: Option<u32>,
224 query_engine: Arc<dyn QueryEngine>,
225 table_meta: TableMetadataManagerRef,
226 num_workers: usize,
227 ) -> (Self, Vec<Worker<'s>>) {
228 let mut zelf = Self::new(node_id, query_engine, table_meta);
229
230 let workers: Vec<_> = (0..num_workers)
231 .map(|_| {
232 let (handle, worker) = create_worker();
233 zelf.add_worker_handle(handle);
234 worker
235 })
236 .collect();
237 (zelf, workers)
238 }
239
240 pub fn add_worker_handle(&mut self, handle: WorkerHandle) {
242 self.worker_handles.push(handle);
243 }
244}
245
246#[derive(Debug)]
247pub enum DiffRequest {
248 Insert(Vec<(Row, repr::Timestamp)>),
249 Delete(Vec<(Row, repr::Timestamp)>),
250}
251
252impl DiffRequest {
253 pub fn len(&self) -> usize {
254 match self {
255 Self::Insert(v) => v.len(),
256 Self::Delete(v) => v.len(),
257 }
258 }
259
260 pub fn is_empty(&self) -> bool {
261 self.len() == 0
262 }
263}
264
265pub fn batches_to_rows_req(batches: Vec<Batch>) -> Result<Vec<DiffRequest>, Error> {
266 let mut reqs = Vec::new();
267 for batch in batches {
268 let mut rows = Vec::with_capacity(batch.row_count());
269 for i in 0..batch.row_count() {
270 let row = batch.get_row(i).context(EvalSnafu)?;
271 rows.push((Row::new(row), 0));
272 }
273 reqs.push(DiffRequest::Insert(rows));
274 }
275 Ok(reqs)
276}
277
278impl StreamingEngine {
280 pub async fn send_writeback_requests(&self) -> Result<usize, Error> {
282 let all_reqs = self.generate_writeback_request().await?;
283 if all_reqs.is_empty() || all_reqs.iter().all(|v| v.1.is_empty()) {
284 return Ok(0);
285 }
286 let mut req_cnt = 0;
287 for (table_name, reqs) in all_reqs {
288 if reqs.is_empty() {
289 continue;
290 }
291 let (catalog, schema) = (table_name[0].clone(), table_name[1].clone());
292 let ctx = Arc::new(QueryContext::with(&catalog, &schema));
293
294 let (is_ts_placeholder, proto_schema) = match self
295 .try_fetch_existing_table(&table_name)
296 .await?
297 .context(UnexpectedSnafu {
298 reason: format!("Table not found: {}", table_name.join(".")),
299 }) {
300 Ok(r) => r,
301 Err(e) => {
302 if self
303 .table_info_source
304 .get_opt_table_id_from_name(&table_name)
305 .await?
306 .is_none()
307 {
308 common_telemetry::warn!(e; "Table `{}` no longer exists, skip writeback", table_name.join("."));
311 continue;
312 } else {
313 return Err(e);
314 }
315 }
316 };
317 let schema_len = proto_schema.len();
318
319 let total_rows = reqs.iter().map(|r| r.len()).sum::<usize>();
320 trace!(
321 "Sending {} writeback requests to table {}, reqs total rows={}",
322 reqs.len(),
323 table_name.join("."),
324 reqs.iter().map(|r| r.len()).sum::<usize>()
325 );
326
327 METRIC_FLOW_ROWS
328 .with_label_values(&["out-streaming"])
329 .inc_by(total_rows as u64);
330
331 let now = self.tick_manager.tick();
332 for req in reqs {
333 match req {
334 DiffRequest::Insert(insert) => {
335 let rows_proto: Vec<v1::Row> = insert
336 .into_iter()
337 .map(|(mut row, _ts)| {
338 if row.len() < proto_schema.len()
341 && proto_schema[row.len()].datatype
342 == greptime_proto::v1::ColumnDataType::TimestampMillisecond
343 as i32
344 {
345 row.extend([Value::from(
346 common_time::Timestamp::new_millisecond(now),
347 )]);
348 }
349 if is_ts_placeholder {
351 ensure!(
352 row.len() == schema_len - 1,
353 InternalSnafu {
354 reason: format!(
355 "Row len mismatch, expect {} got {}",
356 schema_len - 1,
357 row.len()
358 )
359 }
360 );
361 row.extend([Value::from(
362 common_time::Timestamp::new_millisecond(0),
363 )]);
364 }
365 if row.len() != proto_schema.len() {
366 UnexpectedSnafu {
367 reason: format!(
368 "Flow output row length mismatch, expect {} got {}, the columns in schema are: {:?}",
369 proto_schema.len(),
370 row.len(),
371 proto_schema.iter().map(|c|&c.column_name).collect_vec()
372 ),
373 }
374 .fail()?;
375 }
376 Ok(row.into())
377 })
378 .collect::<Result<Vec<_>, Error>>()?;
379 let table_name = table_name.last().unwrap().clone();
380 let req = RowInsertRequest {
381 table_name,
382 rows: Some(v1::Rows {
383 schema: proto_schema.clone(),
384 rows: rows_proto,
385 }),
386 };
387 req_cnt += 1;
388 self.frontend_invoker
389 .read()
390 .await
391 .as_ref()
392 .with_context(|| UnexpectedSnafu {
393 reason: "Expect a frontend invoker for flownode to write back",
394 })?
395 .row_inserts(RowInsertRequests { inserts: vec![req] }, ctx.clone())
396 .await
397 .map_err(BoxedError::new)
398 .with_context(|_| ExternalSnafu {})?;
399 }
400 DiffRequest::Delete(remove) => {
401 info!("original remove rows={:?}", remove);
402 let rows_proto: Vec<v1::Row> = remove
403 .into_iter()
404 .map(|(mut row, _ts)| {
405 row.extend(Some(Value::from(
406 common_time::Timestamp::new_millisecond(0),
407 )));
408 row.into()
409 })
410 .collect::<Vec<_>>();
411 let table_name = table_name.last().unwrap().clone();
412 let req = RowDeleteRequest {
413 table_name,
414 rows: Some(v1::Rows {
415 schema: proto_schema.clone(),
416 rows: rows_proto,
417 }),
418 };
419
420 req_cnt += 1;
421 self.frontend_invoker
422 .read()
423 .await
424 .as_ref()
425 .with_context(|| UnexpectedSnafu {
426 reason: "Expect a frontend invoker for flownode to write back",
427 })?
428 .row_deletes(RowDeleteRequests { deletes: vec![req] }, ctx.clone())
429 .await
430 .map_err(BoxedError::new)
431 .with_context(|_| ExternalSnafu {})?;
432 }
433 }
434 }
435 }
436 Ok(req_cnt)
437 }
438
439 pub async fn generate_writeback_request(
441 &self,
442 ) -> Result<BTreeMap<TableName, Vec<DiffRequest>>, Error> {
443 trace!("Start to generate writeback request");
444 let mut output = BTreeMap::new();
445 let mut total_row_count = 0;
446 for (name, sink_recv) in self
447 .node_context
448 .write()
449 .await
450 .sink_receiver
451 .iter_mut()
452 .map(|(n, (_s, r))| (n, r))
453 {
454 let mut batches = Vec::new();
455 while let Ok(batch) = sink_recv.try_recv() {
456 total_row_count += batch.row_count();
457 batches.push(batch);
458 }
459 let reqs = batches_to_rows_req(batches)?;
460 output.insert(name.clone(), reqs);
461 }
462 trace!("Prepare writeback req: total row count={}", total_row_count);
463 Ok(output)
464 }
465
466 async fn fetch_table_pk_schema(
468 &self,
469 table_name: &TableName,
470 ) -> Result<Option<(Vec<String>, Option<usize>, Vec<ColumnSchema>)>, Error> {
471 if let Some(table_id) = self
472 .table_info_source
473 .get_opt_table_id_from_name(table_name)
474 .await?
475 {
476 let table_info = self
477 .table_info_source
478 .get_table_info_value(&table_id)
479 .await?
480 .unwrap();
481 let meta = table_info.table_info.meta;
482 let primary_keys = meta
483 .primary_key_indices
484 .into_iter()
485 .map(|i| meta.schema.column_schemas[i].name.clone())
486 .collect_vec();
487 let schema = meta.schema.column_schemas;
488 let time_index = meta.schema.timestamp_index;
489 Ok(Some((primary_keys, time_index, schema)))
490 } else {
491 Ok(None)
492 }
493 }
494
495 async fn adjust_auto_created_table_schema(
500 &self,
501 schema: &RelationDesc,
502 ) -> Result<(Vec<String>, Vec<ColumnSchema>, bool), Error> {
503 let primary_keys = schema
507 .typ()
508 .keys
509 .first()
510 .map(|v| {
511 v.column_indices
512 .iter()
513 .map(|i| {
514 schema
515 .get_name(*i)
516 .clone()
517 .unwrap_or_else(|| format!("col_{i}"))
518 })
519 .collect_vec()
520 })
521 .unwrap_or_default();
522 let update_at = ColumnSchema::new(
523 AUTO_CREATED_UPDATE_AT_TS_COL,
524 ConcreteDataType::timestamp_millisecond_datatype(),
525 true,
526 );
527
528 let original_schema = relation_desc_to_column_schemas_with_fallback(schema);
529
530 let mut with_auto_added_col = original_schema.clone();
531 with_auto_added_col.push(update_at);
532
533 let no_time_index = schema.typ().time_index.is_none();
535 if no_time_index {
536 let ts_col = ColumnSchema::new(
537 AUTO_CREATED_PLACEHOLDER_TS_COL,
538 ConcreteDataType::timestamp_millisecond_datatype(),
539 true,
540 )
541 .with_time_index(true);
542 with_auto_added_col.push(ts_col);
543 }
544
545 Ok((primary_keys, with_auto_added_col, no_time_index))
546 }
547}
548
549impl StreamingEngine {
551 async fn start_state_report_handler(self: Arc<Self>) -> Option<JoinHandle<()>> {
555 let state_report_handler = self.state_report_handler.write().await.take();
556 if let Some(mut handler) = state_report_handler {
557 let zelf = self.clone();
558 let handler = common_runtime::spawn_global(async move {
559 while let Some(ret_handler) = handler.recv().await {
560 let state_report = zelf.gen_state_report().await;
561 ret_handler.send(state_report).unwrap_or_else(|err| {
562 common_telemetry::error!(err; "Send state size report error");
563 });
564 }
565 });
566 Some(handler)
567 } else {
568 None
569 }
570 }
571
572 pub fn run_background(
574 self: Arc<Self>,
575 shutdown: Option<broadcast::Receiver<()>>,
576 ) -> JoinHandle<()> {
577 info!("Starting flownode manager's background task");
578 common_runtime::spawn_global(async move {
579 let _state_report_handler = self.clone().start_state_report_handler().await;
580 self.run(shutdown).await;
581 })
582 }
583
584 pub async fn log_all_errors(&self) {
586 for (f_id, f_err) in self.flow_err_collectors.read().await.iter() {
587 let all_errors = f_err.get_all().await;
588 if !all_errors.is_empty() {
589 let all_errors = all_errors
590 .into_iter()
591 .map(|i| format!("{:?}", i))
592 .join("\n");
593 common_telemetry::error!("Flow {} has following errors: {}", f_id, all_errors);
594 }
595 }
596 }
597
598 pub async fn run(&self, mut shutdown: Option<broadcast::Receiver<()>>) {
602 debug!("Starting to run");
603 let default_interval = Duration::from_secs(1);
604 let mut tick_interval = tokio::time::interval(default_interval);
605 tick_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Burst);
607 let mut avg_spd = 0; let mut since_last_run = tokio::time::Instant::now();
609 let run_per_trace = 10;
610 let mut run_cnt = 0;
611 loop {
612 let row_cnt = self.run_available(false).await.unwrap_or_else(|err| {
614 common_telemetry::error!(err;"Run available errors");
615 0
616 });
617
618 if let Err(err) = self.send_writeback_requests().await {
619 common_telemetry::error!(err;"Send writeback request errors");
620 };
621 self.log_all_errors().await;
622
623 match &shutdown.as_mut().map(|s| s.try_recv()) {
625 Some(Ok(())) => {
626 info!("Shutdown flow's main loop");
627 break;
628 }
629 Some(Err(TryRecvError::Empty)) => (),
630 Some(Err(TryRecvError::Closed)) => {
631 common_telemetry::error!("Shutdown channel is closed");
632 break;
633 }
634 Some(Err(TryRecvError::Lagged(num))) => {
635 common_telemetry::error!("Shutdown channel is lagged by {}, meaning multiple shutdown cmd have been issued", num);
636 break;
637 }
638 None => (),
639 }
640
641 let wait_for = since_last_run.elapsed();
644
645 let cur_spd = row_cnt * 1000 / wait_for.as_millis().max(1) as usize;
647 avg_spd = if cur_spd > avg_spd {
649 cur_spd
650 } else {
651 (9 * avg_spd + cur_spd) / 10
652 };
653 let new_wait = BATCH_SIZE * 1000 / avg_spd.max(1); let new_wait = Duration::from_millis(new_wait as u64).min(default_interval);
655
656 if run_cnt >= run_per_trace {
659 trace!("avg_spd={} r/s, cur_spd={} r/s", avg_spd, cur_spd);
660 trace!("Wait for {} ms, row_cnt={}", new_wait.as_millis(), row_cnt);
661 run_cnt = 0;
662 } else {
663 run_cnt += 1;
664 }
665
666 METRIC_FLOW_RUN_INTERVAL_MS.set(new_wait.as_millis() as i64);
667 since_last_run = tokio::time::Instant::now();
668 tokio::select! {
669 _ = tick_interval.tick() => (),
670 _ = tokio::time::sleep(new_wait) => ()
671 }
672 }
673 self.frontend_invoker.write().await.take();
677 }
678
679 pub async fn run_available(&self, blocking: bool) -> Result<usize, Error> {
687 let mut row_cnt = 0;
688
689 let now = self.tick_manager.tick();
690 for worker in self.worker_handles.iter() {
691 worker.run_available(now, blocking).await?;
693 }
694 let flush_res = if blocking {
696 let ctx = self.node_context.read().await;
697 ctx.flush_all_sender().await
698 } else {
699 match self.node_context.try_read() {
700 Ok(ctx) => ctx.flush_all_sender().await,
701 Err(_) => return Ok(row_cnt),
702 }
703 };
704 match flush_res {
705 Ok(r) => {
706 common_telemetry::trace!("Total flushed {} rows", r);
707 row_cnt += r;
708 }
709 Err(err) => {
710 common_telemetry::error!("Flush send buf errors: {:?}", err);
711 }
712 };
713
714 Ok(row_cnt)
715 }
716
717 pub async fn handle_write_request(
719 &self,
720 region_id: RegionId,
721 rows: Vec<DiffRow>,
722 batch_datatypes: &[ConcreteDataType],
723 ) -> Result<(), Error> {
724 let rows_len = rows.len();
725 let table_id = region_id.table_id();
726 let _timer = METRIC_FLOW_INSERT_ELAPSED
727 .with_label_values(&[table_id.to_string().as_str()])
728 .start_timer();
729 self.node_context
730 .read()
731 .await
732 .send(table_id, rows, batch_datatypes)
733 .await?;
734 trace!(
735 "Handling write request for table_id={} with {} rows",
736 table_id,
737 rows_len
738 );
739 Ok(())
740 }
741}
742
743impl StreamingEngine {
745 pub async fn remove_flow_inner(&self, flow_id: FlowId) -> Result<(), Error> {
747 for handle in self.worker_handles.iter() {
748 if handle.contains_flow(flow_id).await? {
749 handle.remove_flow(flow_id).await?;
750 break;
751 }
752 }
753 self.node_context.write().await.remove_flow(flow_id);
754 Ok(())
755 }
756
757 pub async fn create_flow_inner(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
763 let CreateFlowArgs {
764 flow_id,
765 sink_table_name,
766 source_table_ids,
767 create_if_not_exists,
768 or_replace,
769 expire_after,
770 comment,
771 sql,
772 flow_options,
773 query_ctx,
774 } = args;
775
776 let mut node_ctx = self.node_context.write().await;
777 for source in &source_table_ids {
779 node_ctx
780 .assign_global_id_to_table(&self.table_info_source, None, Some(*source))
781 .await?;
782 }
783 node_ctx
784 .assign_global_id_to_table(&self.table_info_source, Some(sink_table_name.clone()), None)
785 .await?;
786
787 node_ctx.register_task_src_sink(flow_id, &source_table_ids, sink_table_name.clone());
788
789 node_ctx.query_context = query_ctx.map(Arc::new);
790 let flow_plan = sql_to_flow_plan(&mut node_ctx, &self.query_engine, &sql).await?;
792
793 debug!("Flow {:?}'s Plan is {:?}", flow_id, flow_plan);
794
795 if let Some((_, _, real_schema)) = self.fetch_table_pk_schema(&sink_table_name).await? {
798 let auto_schema = relation_desc_to_column_schemas_with_fallback(&flow_plan.schema);
799
800 for (idx, zipped) in auto_schema
804 .iter()
805 .zip_longest(real_schema.iter())
806 .enumerate()
807 {
808 match zipped {
809 EitherOrBoth::Both(auto, real) => {
810 if auto.data_type != real.data_type {
811 InvalidQuerySnafu {
812 reason: format!(
813 "Column {}(name is '{}', flow inferred name is '{}')'s data type mismatch, expect {:?} got {:?}",
814 idx,
815 real.name,
816 auto.name,
817 real.data_type,
818 auto.data_type
819 ),
820 }
821 .fail()?;
822 }
823 }
824 EitherOrBoth::Right(real) if real.data_type.is_timestamp() => {
825 continue;
827 }
828 _ => InvalidQuerySnafu {
829 reason: format!(
830 "schema length mismatched, expected {} found {}",
831 real_schema.len(),
832 auto_schema.len()
833 ),
834 }
835 .fail()?,
836 }
837 }
838 } else {
839 let did_create = self
842 .create_table_from_relation(
843 &format!("flow-id={flow_id}"),
844 &sink_table_name,
845 &flow_plan.schema,
846 )
847 .await?;
848 if !did_create {
849 UnexpectedSnafu {
850 reason: format!("Failed to create table {:?}", sink_table_name),
851 }
852 .fail()?;
853 }
854 }
855
856 node_ctx.add_flow_plan(flow_id, flow_plan.clone());
857
858 let _ = comment;
859 let _ = flow_options;
860
861 let sink_id = node_ctx.table_repr.get_by_name(&sink_table_name).unwrap().1;
863 let sink_sender = node_ctx.get_sink_by_global_id(&sink_id)?;
864
865 let source_ids = source_table_ids
866 .iter()
867 .map(|id| node_ctx.table_repr.get_by_table_id(id).unwrap().1)
868 .collect_vec();
869 let source_receivers = source_ids
870 .iter()
871 .map(|id| {
872 node_ctx
873 .get_source_by_global_id(id)
874 .map(|s| s.get_receiver())
875 })
876 .collect::<Result<Vec<_>, _>>()?;
877 let err_collector = ErrCollector::default();
878 self.flow_err_collectors
879 .write()
880 .await
881 .insert(flow_id, err_collector.clone());
882 let handle = self.get_worker_handle_for_create_flow().await;
884 let create_request = worker::Request::Create {
885 flow_id,
886 plan: flow_plan,
887 sink_id,
888 sink_sender,
889 source_ids,
890 src_recvs: source_receivers,
891 expire_after,
892 or_replace,
893 create_if_not_exists,
894 err_collector,
895 };
896
897 handle.create_flow(create_request).await?;
898 info!("Successfully create flow with id={}", flow_id);
899 Ok(Some(flow_id))
900 }
901
902 pub async fn flush_flow_inner(&self, flow_id: FlowId) -> Result<usize, Error> {
903 debug!("Starting to flush flow_id={:?}", flow_id);
904 drop(self.flush_lock.write().await);
907 let flushed_input_rows = self.node_context.read().await.flush_all_sender().await?;
908 let rows_send = self.run_available(true).await?;
909 let row = self.send_writeback_requests().await?;
910 debug!(
911 "Done to flush flow_id={:?} with {} input rows flushed, {} rows sent and {} output rows flushed",
912 flow_id, flushed_input_rows, rows_send, row
913 );
914 Ok(row)
915 }
916
917 pub async fn flow_exist_inner(&self, flow_id: FlowId) -> Result<bool, Error> {
918 let mut exist = false;
919 for handle in self.worker_handles.iter() {
920 if handle.contains_flow(flow_id).await? {
921 exist = true;
922 break;
923 }
924 }
925 Ok(exist)
926 }
927}
928
929#[derive(Clone, Debug)]
934pub struct FlowTickManager {
935 start: Instant,
937 start_timestamp: repr::Timestamp,
939}
940
941impl Default for FlowTickManager {
942 fn default() -> Self {
943 Self::new()
944 }
945}
946
947impl FlowTickManager {
948 pub fn new() -> Self {
949 FlowTickManager {
950 start: Instant::now(),
951 start_timestamp: SystemTime::now()
952 .duration_since(SystemTime::UNIX_EPOCH)
953 .unwrap()
954 .as_millis() as repr::Timestamp,
955 }
956 }
957
958 pub fn tick(&self) -> repr::Timestamp {
962 let current = Instant::now();
963 let since_the_epoch = current - self.start;
964 since_the_epoch.as_millis() as repr::Timestamp + self.start_timestamp
965 }
966}