1#![warn(unused_imports)]
18
19use std::collections::BTreeMap;
20use std::sync::Arc;
21use std::time::{Duration, Instant, SystemTime};
22
23use api::v1::{RowDeleteRequest, RowDeleteRequests, RowInsertRequest, RowInsertRequests};
24use common_config::Configurable;
25use common_error::ext::BoxedError;
26use common_meta::key::TableMetadataManagerRef;
27use common_options::memory::MemoryOptions;
28use common_runtime::JoinHandle;
29use common_telemetry::logging::{LoggingOptions, TracingOptions};
30use common_telemetry::{debug, info, trace};
31use datatypes::schema::ColumnSchema;
32use datatypes::value::Value;
33use greptime_proto::v1;
34use itertools::{EitherOrBoth, Itertools};
35use meta_client::MetaClientOptions;
36use query::QueryEngine;
37use query::options::QueryOptions;
38use serde::{Deserialize, Serialize};
39use servers::grpc::GrpcOptions;
40use servers::heartbeat_options::HeartbeatOptions;
41use servers::http::HttpOptions;
42use session::context::QueryContext;
43use snafu::{OptionExt, ResultExt, ensure};
44use store_api::storage::{ConcreteDataType, RegionId};
45use table::metadata::TableId;
46use tokio::sync::broadcast::error::TryRecvError;
47use tokio::sync::{Mutex, RwLock, broadcast, watch};
48
49pub(crate) use crate::adapter::node_context::FlownodeContext;
50use crate::adapter::refill::RefillTask;
51use crate::adapter::table_source::ManagedTableSource;
52use crate::adapter::util::relation_desc_to_column_schemas_with_fallback;
53pub(crate) use crate::adapter::worker::{Worker, WorkerHandle, create_worker};
54use crate::batching_mode::BatchingModeOptions;
55use crate::compute::ErrCollector;
56use crate::df_optimizer::sql_to_flow_plan;
57use crate::error::{EvalSnafu, ExternalSnafu, InternalSnafu, InvalidQuerySnafu, UnexpectedSnafu};
58use crate::expr::Batch;
59use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_ROWS, METRIC_FLOW_RUN_INTERVAL_MS};
60use crate::repr::{self, BATCH_SIZE, DiffRow, RelationDesc, Row};
61use crate::{CreateFlowArgs, FlowId, TableName};
62
63pub(crate) mod flownode_impl;
64mod parse_expr;
65pub(crate) mod refill;
66mod stat;
67#[cfg(test)]
68mod tests;
69pub(crate) mod util;
70mod worker;
71
72pub(crate) mod node_context;
73pub(crate) mod table_source;
74
75use crate::FrontendInvoker;
76use crate::error::Error;
77use crate::utils::StateReportHandler;
78
79pub const AUTO_CREATED_PLACEHOLDER_TS_COL: &str = "__ts_placeholder";
81
82pub const AUTO_CREATED_UPDATE_AT_TS_COL: &str = "update_at";
83
84#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
86#[serde(default)]
87pub struct FlowConfig {
88 pub num_workers: usize,
89 pub batching_mode: BatchingModeOptions,
90}
91
92impl Default for FlowConfig {
93 fn default() -> Self {
94 Self {
95 num_workers: (common_config::utils::get_cpus() / 2).max(1),
96 batching_mode: BatchingModeOptions::default(),
97 }
98 }
99}
100
101#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
103#[serde(default)]
104pub struct FlownodeOptions {
105 pub node_id: Option<u64>,
106 pub flow: FlowConfig,
107 pub grpc: GrpcOptions,
108 pub http: HttpOptions,
109 pub meta_client: Option<MetaClientOptions>,
110 pub logging: LoggingOptions,
111 pub tracing: TracingOptions,
112 pub heartbeat: HeartbeatOptions,
113 pub query: QueryOptions,
114 pub user_provider: Option<String>,
115 pub memory: MemoryOptions,
116}
117
118impl Default for FlownodeOptions {
119 fn default() -> Self {
120 Self {
121 node_id: None,
122 flow: FlowConfig::default(),
123 grpc: GrpcOptions::default().with_bind_addr("127.0.0.1:3004"),
124 http: HttpOptions::default(),
125 meta_client: None,
126 logging: LoggingOptions::default(),
127 tracing: TracingOptions::default(),
128 heartbeat: HeartbeatOptions::default(),
129 query: QueryOptions {
132 parallelism: 1,
133 allow_query_fallback: false,
134 },
135 user_provider: None,
136 memory: MemoryOptions::default(),
137 }
138 }
139}
140
141impl Configurable for FlownodeOptions {
142 fn validate_sanitize(&mut self) -> common_config::error::Result<()> {
143 if self.flow.num_workers == 0 {
144 self.flow.num_workers = (common_config::utils::get_cpus() / 2).max(1);
145 }
146 Ok(())
147 }
148}
149
150pub type FlowStreamingEngineRef = Arc<StreamingEngine>;
152
153pub struct StreamingEngine {
158 pub worker_handles: Vec<WorkerHandle>,
161 worker_selector: Mutex<usize>,
163 pub query_engine: Arc<dyn QueryEngine>,
165 table_info_source: ManagedTableSource,
167 frontend_invoker: RwLock<Option<FrontendInvoker>>,
168 node_context: RwLock<FlownodeContext>,
170 refill_tasks: RwLock<BTreeMap<FlowId, RefillTask>>,
172 flow_err_collectors: RwLock<BTreeMap<FlowId, ErrCollector>>,
173 src_send_buf_lens: RwLock<BTreeMap<TableId, watch::Receiver<usize>>>,
174 tick_manager: FlowTickManager,
175 pub node_id: Option<u32>,
177 flush_lock: RwLock<()>,
181 state_report_handler: RwLock<Option<StateReportHandler>>,
183}
184
185impl StreamingEngine {
187 pub async fn set_frontend_invoker(&self, frontend: FrontendInvoker) {
189 *self.frontend_invoker.write().await = Some(frontend);
190 }
191
192 pub fn new(
194 node_id: Option<u32>,
195 query_engine: Arc<dyn QueryEngine>,
196 table_meta: TableMetadataManagerRef,
197 ) -> Self {
198 let srv_map = ManagedTableSource::new(
199 table_meta.table_info_manager().clone(),
200 table_meta.table_name_manager().clone(),
201 );
202 let node_context = FlownodeContext::new(Box::new(srv_map.clone()) as _);
203 let tick_manager = FlowTickManager::new();
204 let worker_handles = Vec::new();
205 StreamingEngine {
206 worker_handles,
207 worker_selector: Mutex::new(0),
208 query_engine,
209 table_info_source: srv_map,
210 frontend_invoker: RwLock::new(None),
211 node_context: RwLock::new(node_context),
212 refill_tasks: Default::default(),
213 flow_err_collectors: Default::default(),
214 src_send_buf_lens: Default::default(),
215 tick_manager,
216 node_id,
217 flush_lock: RwLock::new(()),
218 state_report_handler: RwLock::new(None),
219 }
220 }
221
222 pub async fn with_state_report_handler(self, handler: StateReportHandler) -> Self {
223 *self.state_report_handler.write().await = Some(handler);
224 self
225 }
226
227 pub fn new_with_workers<'s>(
229 node_id: Option<u32>,
230 query_engine: Arc<dyn QueryEngine>,
231 table_meta: TableMetadataManagerRef,
232 num_workers: usize,
233 ) -> (Self, Vec<Worker<'s>>) {
234 let mut zelf = Self::new(node_id, query_engine, table_meta);
235
236 let workers: Vec<_> = (0..num_workers)
237 .map(|_| {
238 let (handle, worker) = create_worker();
239 zelf.add_worker_handle(handle);
240 worker
241 })
242 .collect();
243 (zelf, workers)
244 }
245
246 pub fn add_worker_handle(&mut self, handle: WorkerHandle) {
248 self.worker_handles.push(handle);
249 }
250}
251
252#[derive(Debug)]
253pub enum DiffRequest {
254 Insert(Vec<(Row, repr::Timestamp)>),
255 Delete(Vec<(Row, repr::Timestamp)>),
256}
257
258impl DiffRequest {
259 pub fn len(&self) -> usize {
260 match self {
261 Self::Insert(v) => v.len(),
262 Self::Delete(v) => v.len(),
263 }
264 }
265
266 pub fn is_empty(&self) -> bool {
267 self.len() == 0
268 }
269}
270
271pub fn batches_to_rows_req(batches: Vec<Batch>) -> Result<Vec<DiffRequest>, Error> {
272 let mut reqs = Vec::new();
273 for batch in batches {
274 let mut rows = Vec::with_capacity(batch.row_count());
275 for i in 0..batch.row_count() {
276 let row = batch.get_row(i).context(EvalSnafu)?;
277 rows.push((Row::new(row), 0));
278 }
279 reqs.push(DiffRequest::Insert(rows));
280 }
281 Ok(reqs)
282}
283
284impl StreamingEngine {
286 pub async fn send_writeback_requests(&self) -> Result<usize, Error> {
288 let all_reqs = self.generate_writeback_request().await?;
289 if all_reqs.is_empty() || all_reqs.iter().all(|v| v.1.is_empty()) {
290 return Ok(0);
291 }
292 let mut req_cnt = 0;
293 for (table_name, reqs) in all_reqs {
294 if reqs.is_empty() {
295 continue;
296 }
297 let (catalog, schema) = (table_name[0].clone(), table_name[1].clone());
298 let ctx = Arc::new(QueryContext::with(&catalog, &schema));
299
300 let (is_ts_placeholder, proto_schema) = match self
301 .try_fetch_existing_table(&table_name)
302 .await?
303 .context(UnexpectedSnafu {
304 reason: format!("Table not found: {}", table_name.join(".")),
305 }) {
306 Ok(r) => r,
307 Err(e) => {
308 if self
309 .table_info_source
310 .get_opt_table_id_from_name(&table_name)
311 .await?
312 .is_none()
313 {
314 common_telemetry::warn!(e; "Table `{}` no longer exists, skip writeback", table_name.join("."));
317 continue;
318 } else {
319 return Err(e);
320 }
321 }
322 };
323 let schema_len = proto_schema.len();
324
325 let total_rows = reqs.iter().map(|r| r.len()).sum::<usize>();
326 trace!(
327 "Sending {} writeback requests to table {}, reqs total rows={}",
328 reqs.len(),
329 table_name.join("."),
330 reqs.iter().map(|r| r.len()).sum::<usize>()
331 );
332
333 METRIC_FLOW_ROWS
334 .with_label_values(&["out-streaming"])
335 .inc_by(total_rows as u64);
336
337 let now = self.tick_manager.tick();
338 for req in reqs {
339 match req {
340 DiffRequest::Insert(insert) => {
341 let rows_proto: Vec<v1::Row> = insert
342 .into_iter()
343 .map(|(mut row, _ts)| {
344 if row.len() < proto_schema.len()
347 && proto_schema[row.len()].datatype
348 == greptime_proto::v1::ColumnDataType::TimestampMillisecond
349 as i32
350 {
351 row.extend([Value::from(
352 common_time::Timestamp::new_millisecond(now),
353 )]);
354 }
355 if is_ts_placeholder {
357 ensure!(
358 row.len() == schema_len - 1,
359 InternalSnafu {
360 reason: format!(
361 "Row len mismatch, expect {} got {}",
362 schema_len - 1,
363 row.len()
364 )
365 }
366 );
367 row.extend([Value::from(
368 common_time::Timestamp::new_millisecond(0),
369 )]);
370 }
371 if row.len() != proto_schema.len() {
372 UnexpectedSnafu {
373 reason: format!(
374 "Flow output row length mismatch, expect {} got {}, the columns in schema are: {:?}",
375 proto_schema.len(),
376 row.len(),
377 proto_schema.iter().map(|c|&c.column_name).collect_vec()
378 ),
379 }
380 .fail()?;
381 }
382 Ok(row.into())
383 })
384 .collect::<Result<Vec<_>, Error>>()?;
385 let table_name = table_name.last().unwrap().clone();
386 let req = RowInsertRequest {
387 table_name,
388 rows: Some(v1::Rows {
389 schema: proto_schema.clone(),
390 rows: rows_proto,
391 }),
392 };
393 req_cnt += 1;
394 self.frontend_invoker
395 .read()
396 .await
397 .as_ref()
398 .with_context(|| UnexpectedSnafu {
399 reason: "Expect a frontend invoker for flownode to write back",
400 })?
401 .row_inserts(RowInsertRequests { inserts: vec![req] }, ctx.clone())
402 .await
403 .map_err(BoxedError::new)
404 .with_context(|_| ExternalSnafu {})?;
405 }
406 DiffRequest::Delete(remove) => {
407 info!("original remove rows={:?}", remove);
408 let rows_proto: Vec<v1::Row> = remove
409 .into_iter()
410 .map(|(mut row, _ts)| {
411 row.extend(Some(Value::from(
412 common_time::Timestamp::new_millisecond(0),
413 )));
414 row.into()
415 })
416 .collect::<Vec<_>>();
417 let table_name = table_name.last().unwrap().clone();
418 let req = RowDeleteRequest {
419 table_name,
420 rows: Some(v1::Rows {
421 schema: proto_schema.clone(),
422 rows: rows_proto,
423 }),
424 };
425
426 req_cnt += 1;
427 self.frontend_invoker
428 .read()
429 .await
430 .as_ref()
431 .with_context(|| UnexpectedSnafu {
432 reason: "Expect a frontend invoker for flownode to write back",
433 })?
434 .row_deletes(RowDeleteRequests { deletes: vec![req] }, ctx.clone())
435 .await
436 .map_err(BoxedError::new)
437 .with_context(|_| ExternalSnafu {})?;
438 }
439 }
440 }
441 }
442 Ok(req_cnt)
443 }
444
445 pub async fn generate_writeback_request(
447 &self,
448 ) -> Result<BTreeMap<TableName, Vec<DiffRequest>>, Error> {
449 trace!("Start to generate writeback request");
450 let mut output = BTreeMap::new();
451 let mut total_row_count = 0;
452 for (name, sink_recv) in self
453 .node_context
454 .write()
455 .await
456 .sink_receiver
457 .iter_mut()
458 .map(|(n, (_s, r))| (n, r))
459 {
460 let mut batches = Vec::new();
461 while let Ok(batch) = sink_recv.try_recv() {
462 total_row_count += batch.row_count();
463 batches.push(batch);
464 }
465 let reqs = batches_to_rows_req(batches)?;
466 output.insert(name.clone(), reqs);
467 }
468 trace!("Prepare writeback req: total row count={}", total_row_count);
469 Ok(output)
470 }
471
472 async fn fetch_table_pk_schema(
474 &self,
475 table_name: &TableName,
476 ) -> Result<Option<(Vec<String>, Option<usize>, Vec<ColumnSchema>)>, Error> {
477 if let Some(table_id) = self
478 .table_info_source
479 .get_opt_table_id_from_name(table_name)
480 .await?
481 {
482 let table_info = self
483 .table_info_source
484 .get_table_info_value(&table_id)
485 .await?
486 .unwrap();
487 let meta = table_info.table_info.meta;
488 let primary_keys = meta
489 .primary_key_indices
490 .into_iter()
491 .map(|i| meta.schema.column_schemas[i].name.clone())
492 .collect_vec();
493 let schema = meta.schema.column_schemas;
494 let time_index = meta.schema.timestamp_index;
495 Ok(Some((primary_keys, time_index, schema)))
496 } else {
497 Ok(None)
498 }
499 }
500
501 async fn adjust_auto_created_table_schema(
506 &self,
507 schema: &RelationDesc,
508 ) -> Result<(Vec<String>, Vec<ColumnSchema>, bool), Error> {
509 let primary_keys = schema
513 .typ()
514 .keys
515 .first()
516 .map(|v| {
517 v.column_indices
518 .iter()
519 .map(|i| {
520 schema
521 .get_name(*i)
522 .clone()
523 .unwrap_or_else(|| format!("col_{i}"))
524 })
525 .collect_vec()
526 })
527 .unwrap_or_default();
528 let update_at = ColumnSchema::new(
529 AUTO_CREATED_UPDATE_AT_TS_COL,
530 ConcreteDataType::timestamp_millisecond_datatype(),
531 true,
532 );
533
534 let original_schema = relation_desc_to_column_schemas_with_fallback(schema);
535
536 let mut with_auto_added_col = original_schema.clone();
537 with_auto_added_col.push(update_at);
538
539 let no_time_index = schema.typ().time_index.is_none();
541 if no_time_index {
542 let ts_col = ColumnSchema::new(
543 AUTO_CREATED_PLACEHOLDER_TS_COL,
544 ConcreteDataType::timestamp_millisecond_datatype(),
545 true,
546 )
547 .with_time_index(true);
548 with_auto_added_col.push(ts_col);
549 }
550
551 Ok((primary_keys, with_auto_added_col, no_time_index))
552 }
553}
554
555impl StreamingEngine {
557 async fn start_state_report_handler(self: Arc<Self>) -> Option<JoinHandle<()>> {
561 let state_report_handler = self.state_report_handler.write().await.take();
562 if let Some(mut handler) = state_report_handler {
563 let zelf = self.clone();
564 let handler = common_runtime::spawn_global(async move {
565 while let Some(ret_handler) = handler.recv().await {
566 let state_report = zelf.gen_state_report().await;
567 ret_handler.send(state_report).unwrap_or_else(|err| {
568 common_telemetry::error!(err; "Send state size report error");
569 });
570 }
571 });
572 Some(handler)
573 } else {
574 None
575 }
576 }
577
578 pub fn run_background(
580 self: Arc<Self>,
581 shutdown: Option<broadcast::Receiver<()>>,
582 ) -> JoinHandle<()> {
583 info!("Starting flownode manager's background task");
584 common_runtime::spawn_global(async move {
585 let _state_report_handler = self.clone().start_state_report_handler().await;
586 self.run(shutdown).await;
587 })
588 }
589
590 pub async fn log_all_errors(&self) {
592 for (f_id, f_err) in self.flow_err_collectors.read().await.iter() {
593 let all_errors = f_err.get_all().await;
594 if !all_errors.is_empty() {
595 let all_errors = all_errors
596 .into_iter()
597 .map(|i| format!("{:?}", i))
598 .join("\n");
599 common_telemetry::error!("Flow {} has following errors: {}", f_id, all_errors);
600 }
601 }
602 }
603
604 pub async fn run(&self, mut shutdown: Option<broadcast::Receiver<()>>) {
608 debug!("Starting to run");
609 let default_interval = Duration::from_secs(1);
610 let mut tick_interval = tokio::time::interval(default_interval);
611 tick_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Burst);
613 let mut avg_spd = 0; let mut since_last_run = tokio::time::Instant::now();
615 let run_per_trace = 10;
616 let mut run_cnt = 0;
617 loop {
618 let row_cnt = self.run_available(false).await.unwrap_or_else(|err| {
620 common_telemetry::error!(err;"Run available errors");
621 0
622 });
623
624 if let Err(err) = self.send_writeback_requests().await {
625 common_telemetry::error!(err;"Send writeback request errors");
626 };
627 self.log_all_errors().await;
628
629 match &shutdown.as_mut().map(|s| s.try_recv()) {
631 Some(Ok(())) => {
632 info!("Shutdown flow's main loop");
633 break;
634 }
635 Some(Err(TryRecvError::Empty)) => (),
636 Some(Err(TryRecvError::Closed)) => {
637 common_telemetry::error!("Shutdown channel is closed");
638 break;
639 }
640 Some(Err(TryRecvError::Lagged(num))) => {
641 common_telemetry::error!(
642 "Shutdown channel is lagged by {}, meaning multiple shutdown cmd have been issued",
643 num
644 );
645 break;
646 }
647 None => (),
648 }
649
650 let wait_for = since_last_run.elapsed();
653
654 let cur_spd = row_cnt * 1000 / wait_for.as_millis().max(1) as usize;
656 avg_spd = if cur_spd > avg_spd {
658 cur_spd
659 } else {
660 (9 * avg_spd + cur_spd) / 10
661 };
662 let new_wait = BATCH_SIZE * 1000 / avg_spd.max(1); let new_wait = Duration::from_millis(new_wait as u64).min(default_interval);
664
665 if run_cnt >= run_per_trace {
668 trace!("avg_spd={} r/s, cur_spd={} r/s", avg_spd, cur_spd);
669 trace!("Wait for {} ms, row_cnt={}", new_wait.as_millis(), row_cnt);
670 run_cnt = 0;
671 } else {
672 run_cnt += 1;
673 }
674
675 METRIC_FLOW_RUN_INTERVAL_MS.set(new_wait.as_millis() as i64);
676 since_last_run = tokio::time::Instant::now();
677 tokio::select! {
678 _ = tick_interval.tick() => (),
679 _ = tokio::time::sleep(new_wait) => ()
680 }
681 }
682 self.frontend_invoker.write().await.take();
686 }
687
688 pub async fn run_available(&self, blocking: bool) -> Result<usize, Error> {
696 let mut row_cnt = 0;
697
698 let now = self.tick_manager.tick();
699 for worker in self.worker_handles.iter() {
700 worker.run_available(now, blocking).await?;
702 }
703 let flush_res = if blocking {
705 let ctx = self.node_context.read().await;
706 ctx.flush_all_sender().await
707 } else {
708 match self.node_context.try_read() {
709 Ok(ctx) => ctx.flush_all_sender().await,
710 Err(_) => return Ok(row_cnt),
711 }
712 };
713 match flush_res {
714 Ok(r) => {
715 common_telemetry::trace!("Total flushed {} rows", r);
716 row_cnt += r;
717 }
718 Err(err) => {
719 common_telemetry::error!("Flush send buf errors: {:?}", err);
720 }
721 };
722
723 Ok(row_cnt)
724 }
725
726 pub async fn handle_write_request(
728 &self,
729 region_id: RegionId,
730 rows: Vec<DiffRow>,
731 batch_datatypes: &[ConcreteDataType],
732 ) -> Result<(), Error> {
733 let rows_len = rows.len();
734 let table_id = region_id.table_id();
735 let _timer = METRIC_FLOW_INSERT_ELAPSED
736 .with_label_values(&[table_id.to_string().as_str()])
737 .start_timer();
738 self.node_context
739 .read()
740 .await
741 .send(table_id, rows, batch_datatypes)
742 .await?;
743 trace!(
744 "Handling write request for table_id={} with {} rows",
745 table_id, rows_len
746 );
747 Ok(())
748 }
749}
750
751impl StreamingEngine {
753 pub async fn remove_flow_inner(&self, flow_id: FlowId) -> Result<(), Error> {
755 for handle in self.worker_handles.iter() {
756 if handle.contains_flow(flow_id).await? {
757 handle.remove_flow(flow_id).await?;
758 break;
759 }
760 }
761 self.node_context.write().await.remove_flow(flow_id);
762 Ok(())
763 }
764
765 pub async fn create_flow_inner(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
771 let CreateFlowArgs {
772 flow_id,
773 sink_table_name,
774 source_table_ids,
775 create_if_not_exists,
776 or_replace,
777 expire_after,
778 eval_interval: _,
779 comment,
780 sql,
781 flow_options,
782 query_ctx,
783 } = args;
784
785 let mut node_ctx = self.node_context.write().await;
786 for source in &source_table_ids {
788 node_ctx
789 .assign_global_id_to_table(&self.table_info_source, None, Some(*source))
790 .await?;
791 }
792 node_ctx
793 .assign_global_id_to_table(&self.table_info_source, Some(sink_table_name.clone()), None)
794 .await?;
795
796 node_ctx.register_task_src_sink(flow_id, &source_table_ids, sink_table_name.clone());
797
798 node_ctx.query_context = query_ctx.map(Arc::new);
799 let flow_plan = sql_to_flow_plan(&mut node_ctx, &self.query_engine, &sql).await?;
801
802 debug!("Flow {:?}'s Plan is {:?}", flow_id, flow_plan);
803
804 if let Some((_, _, real_schema)) = self.fetch_table_pk_schema(&sink_table_name).await? {
807 let auto_schema = relation_desc_to_column_schemas_with_fallback(&flow_plan.schema);
808
809 for (idx, zipped) in auto_schema
813 .iter()
814 .zip_longest(real_schema.iter())
815 .enumerate()
816 {
817 match zipped {
818 EitherOrBoth::Both(auto, real) => {
819 if auto.data_type != real.data_type {
820 InvalidQuerySnafu {
821 reason: format!(
822 "Column {}(name is '{}', flow inferred name is '{}')'s data type mismatch, expect {:?} got {:?}",
823 idx,
824 real.name,
825 auto.name,
826 real.data_type,
827 auto.data_type
828 ),
829 }
830 .fail()?;
831 }
832 }
833 EitherOrBoth::Right(real) if real.data_type.is_timestamp() => {
834 continue;
836 }
837 _ => InvalidQuerySnafu {
838 reason: format!(
839 "schema length mismatched, expected {} found {}",
840 real_schema.len(),
841 auto_schema.len()
842 ),
843 }
844 .fail()?,
845 }
846 }
847 } else {
848 let did_create = self
851 .create_table_from_relation(
852 &format!("flow-id={flow_id}"),
853 &sink_table_name,
854 &flow_plan.schema,
855 )
856 .await?;
857 if !did_create {
858 UnexpectedSnafu {
859 reason: format!("Failed to create table {:?}", sink_table_name),
860 }
861 .fail()?;
862 }
863 }
864
865 node_ctx.add_flow_plan(flow_id, flow_plan.clone());
866
867 let _ = comment;
868 let _ = flow_options;
869
870 let sink_id = node_ctx.table_repr.get_by_name(&sink_table_name).unwrap().1;
872 let sink_sender = node_ctx.get_sink_by_global_id(&sink_id)?;
873
874 let source_ids = source_table_ids
875 .iter()
876 .map(|id| node_ctx.table_repr.get_by_table_id(id).unwrap().1)
877 .collect_vec();
878 let source_receivers = source_ids
879 .iter()
880 .map(|id| {
881 node_ctx
882 .get_source_by_global_id(id)
883 .map(|s| s.get_receiver())
884 })
885 .collect::<Result<Vec<_>, _>>()?;
886 let err_collector = ErrCollector::default();
887 self.flow_err_collectors
888 .write()
889 .await
890 .insert(flow_id, err_collector.clone());
891 let handle = self.get_worker_handle_for_create_flow().await;
893 let create_request = worker::Request::Create {
894 flow_id,
895 plan: flow_plan,
896 sink_id,
897 sink_sender,
898 source_ids,
899 src_recvs: source_receivers,
900 expire_after,
901 or_replace,
902 create_if_not_exists,
903 err_collector,
904 };
905
906 handle.create_flow(create_request).await?;
907 info!("Successfully create flow with id={}", flow_id);
908 Ok(Some(flow_id))
909 }
910
911 pub async fn flush_flow_inner(&self, flow_id: FlowId) -> Result<usize, Error> {
912 debug!("Starting to flush flow_id={:?}", flow_id);
913 drop(self.flush_lock.write().await);
916 let flushed_input_rows = self.node_context.read().await.flush_all_sender().await?;
917 let rows_send = self.run_available(true).await?;
918 let row = self.send_writeback_requests().await?;
919 debug!(
920 "Done to flush flow_id={:?} with {} input rows flushed, {} rows sent and {} output rows flushed",
921 flow_id, flushed_input_rows, rows_send, row
922 );
923 Ok(row)
924 }
925
926 pub async fn flow_exist_inner(&self, flow_id: FlowId) -> Result<bool, Error> {
927 let mut exist = false;
928 for handle in self.worker_handles.iter() {
929 if handle.contains_flow(flow_id).await? {
930 exist = true;
931 break;
932 }
933 }
934 Ok(exist)
935 }
936}
937
938#[derive(Clone, Debug)]
943pub struct FlowTickManager {
944 start: Instant,
946 start_timestamp: repr::Timestamp,
948}
949
950impl Default for FlowTickManager {
951 fn default() -> Self {
952 Self::new()
953 }
954}
955
956impl FlowTickManager {
957 pub fn new() -> Self {
958 FlowTickManager {
959 start: Instant::now(),
960 start_timestamp: SystemTime::now()
961 .duration_since(SystemTime::UNIX_EPOCH)
962 .unwrap()
963 .as_millis() as repr::Timestamp,
964 }
965 }
966
967 pub fn tick(&self) -> repr::Timestamp {
971 let current = Instant::now();
972 let since_the_epoch = current - self.start;
973 since_the_epoch.as_millis() as repr::Timestamp + self.start_timestamp
974 }
975}