1#![warn(unused_imports)]
18
19use std::collections::BTreeMap;
20use std::sync::Arc;
21use std::time::{Duration, Instant, SystemTime};
22
23use api::v1::{RowDeleteRequest, RowDeleteRequests, RowInsertRequest, RowInsertRequests};
24use common_config::Configurable;
25use common_error::ext::BoxedError;
26use common_meta::key::TableMetadataManagerRef;
27use common_options::memory::MemoryOptions;
28use common_runtime::JoinHandle;
29use common_stat::get_total_cpu_cores;
30use common_telemetry::logging::{LoggingOptions, TracingOptions};
31use common_telemetry::{debug, info, trace};
32use datatypes::schema::ColumnSchema;
33use datatypes::value::Value;
34use greptime_proto::v1;
35use itertools::{EitherOrBoth, Itertools};
36use meta_client::MetaClientOptions;
37use query::QueryEngine;
38use query::options::QueryOptions;
39use serde::{Deserialize, Serialize};
40use servers::grpc::GrpcOptions;
41use servers::heartbeat_options::HeartbeatOptions;
42use servers::http::HttpOptions;
43use session::context::QueryContext;
44use snafu::{OptionExt, ResultExt, ensure};
45use store_api::storage::{ConcreteDataType, RegionId};
46use table::metadata::TableId;
47use tokio::sync::broadcast::error::TryRecvError;
48use tokio::sync::{Mutex, RwLock, broadcast, watch};
49
50pub(crate) use crate::adapter::node_context::FlownodeContext;
51use crate::adapter::refill::RefillTask;
52use crate::adapter::table_source::ManagedTableSource;
53use crate::adapter::util::relation_desc_to_column_schemas_with_fallback;
54pub(crate) use crate::adapter::worker::{Worker, WorkerHandle, create_worker};
55use crate::batching_mode::BatchingModeOptions;
56use crate::compute::ErrCollector;
57use crate::df_optimizer::sql_to_flow_plan;
58use crate::error::{EvalSnafu, ExternalSnafu, InternalSnafu, InvalidQuerySnafu, UnexpectedSnafu};
59use crate::expr::Batch;
60use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_ROWS, METRIC_FLOW_RUN_INTERVAL_MS};
61use crate::repr::{self, BATCH_SIZE, DiffRow, RelationDesc, Row};
62use crate::{CreateFlowArgs, FlowId, TableName};
63
64pub(crate) mod flownode_impl;
65mod parse_expr;
66pub(crate) mod refill;
67mod stat;
68#[cfg(test)]
69mod tests;
70pub(crate) mod util;
71mod worker;
72
73pub(crate) mod node_context;
74pub(crate) mod table_source;
75
76use crate::FrontendInvoker;
77use crate::error::Error;
78use crate::utils::StateReportHandler;
79
80pub const AUTO_CREATED_PLACEHOLDER_TS_COL: &str = "__ts_placeholder";
82
83pub const AUTO_CREATED_UPDATE_AT_TS_COL: &str = "update_at";
84
85#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
87#[serde(default)]
88pub struct FlowConfig {
89 pub num_workers: usize,
90 pub batching_mode: BatchingModeOptions,
91}
92
93impl Default for FlowConfig {
94 fn default() -> Self {
95 Self {
96 num_workers: (get_total_cpu_cores() / 2).max(1),
97 batching_mode: BatchingModeOptions::default(),
98 }
99 }
100}
101
102#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
104#[serde(default)]
105pub struct FlownodeOptions {
106 pub node_id: Option<u64>,
107 pub flow: FlowConfig,
108 pub grpc: GrpcOptions,
109 pub http: HttpOptions,
110 pub meta_client: Option<MetaClientOptions>,
111 pub logging: LoggingOptions,
112 pub tracing: TracingOptions,
113 pub heartbeat: HeartbeatOptions,
114 pub query: QueryOptions,
115 pub user_provider: Option<String>,
116 pub memory: MemoryOptions,
117}
118
119impl Default for FlownodeOptions {
120 fn default() -> Self {
121 Self {
122 node_id: None,
123 flow: FlowConfig::default(),
124 grpc: GrpcOptions::default().with_bind_addr("127.0.0.1:3004"),
125 http: HttpOptions::default(),
126 meta_client: None,
127 logging: LoggingOptions::default(),
128 tracing: TracingOptions::default(),
129 heartbeat: HeartbeatOptions::default(),
130 query: QueryOptions {
133 parallelism: 1,
134 allow_query_fallback: false,
135 },
136 user_provider: None,
137 memory: MemoryOptions::default(),
138 }
139 }
140}
141
142impl Configurable for FlownodeOptions {
143 fn validate_sanitize(&mut self) -> common_config::error::Result<()> {
144 if self.flow.num_workers == 0 {
145 self.flow.num_workers = (get_total_cpu_cores() / 2).max(1);
146 }
147 Ok(())
148 }
149}
150
151pub type FlowStreamingEngineRef = Arc<StreamingEngine>;
153
154pub struct StreamingEngine {
159 pub worker_handles: Vec<WorkerHandle>,
162 worker_selector: Mutex<usize>,
164 pub query_engine: Arc<dyn QueryEngine>,
166 table_info_source: ManagedTableSource,
168 frontend_invoker: RwLock<Option<FrontendInvoker>>,
169 node_context: RwLock<FlownodeContext>,
171 refill_tasks: RwLock<BTreeMap<FlowId, RefillTask>>,
173 flow_err_collectors: RwLock<BTreeMap<FlowId, ErrCollector>>,
174 src_send_buf_lens: RwLock<BTreeMap<TableId, watch::Receiver<usize>>>,
175 tick_manager: FlowTickManager,
176 pub node_id: Option<u32>,
178 flush_lock: RwLock<()>,
182 state_report_handler: RwLock<Option<StateReportHandler>>,
184}
185
186impl StreamingEngine {
188 pub async fn set_frontend_invoker(&self, frontend: FrontendInvoker) {
190 *self.frontend_invoker.write().await = Some(frontend);
191 }
192
193 pub fn new(
195 node_id: Option<u32>,
196 query_engine: Arc<dyn QueryEngine>,
197 table_meta: TableMetadataManagerRef,
198 ) -> Self {
199 let srv_map = ManagedTableSource::new(
200 table_meta.table_info_manager().clone(),
201 table_meta.table_name_manager().clone(),
202 );
203 let node_context = FlownodeContext::new(Box::new(srv_map.clone()) as _);
204 let tick_manager = FlowTickManager::new();
205 let worker_handles = Vec::new();
206 StreamingEngine {
207 worker_handles,
208 worker_selector: Mutex::new(0),
209 query_engine,
210 table_info_source: srv_map,
211 frontend_invoker: RwLock::new(None),
212 node_context: RwLock::new(node_context),
213 refill_tasks: Default::default(),
214 flow_err_collectors: Default::default(),
215 src_send_buf_lens: Default::default(),
216 tick_manager,
217 node_id,
218 flush_lock: RwLock::new(()),
219 state_report_handler: RwLock::new(None),
220 }
221 }
222
223 pub async fn with_state_report_handler(self, handler: StateReportHandler) -> Self {
224 *self.state_report_handler.write().await = Some(handler);
225 self
226 }
227
228 pub fn new_with_workers<'s>(
230 node_id: Option<u32>,
231 query_engine: Arc<dyn QueryEngine>,
232 table_meta: TableMetadataManagerRef,
233 num_workers: usize,
234 ) -> (Self, Vec<Worker<'s>>) {
235 let mut zelf = Self::new(node_id, query_engine, table_meta);
236
237 let workers: Vec<_> = (0..num_workers)
238 .map(|_| {
239 let (handle, worker) = create_worker();
240 zelf.add_worker_handle(handle);
241 worker
242 })
243 .collect();
244 (zelf, workers)
245 }
246
247 pub fn add_worker_handle(&mut self, handle: WorkerHandle) {
249 self.worker_handles.push(handle);
250 }
251}
252
253#[derive(Debug)]
254pub enum DiffRequest {
255 Insert(Vec<(Row, repr::Timestamp)>),
256 Delete(Vec<(Row, repr::Timestamp)>),
257}
258
259impl DiffRequest {
260 pub fn len(&self) -> usize {
261 match self {
262 Self::Insert(v) => v.len(),
263 Self::Delete(v) => v.len(),
264 }
265 }
266
267 pub fn is_empty(&self) -> bool {
268 self.len() == 0
269 }
270}
271
272pub fn batches_to_rows_req(batches: Vec<Batch>) -> Result<Vec<DiffRequest>, Error> {
273 let mut reqs = Vec::new();
274 for batch in batches {
275 let mut rows = Vec::with_capacity(batch.row_count());
276 for i in 0..batch.row_count() {
277 let row = batch.get_row(i).context(EvalSnafu)?;
278 rows.push((Row::new(row), 0));
279 }
280 reqs.push(DiffRequest::Insert(rows));
281 }
282 Ok(reqs)
283}
284
285impl StreamingEngine {
287 pub async fn send_writeback_requests(&self) -> Result<usize, Error> {
289 let all_reqs = self.generate_writeback_request().await?;
290 if all_reqs.is_empty() || all_reqs.iter().all(|v| v.1.is_empty()) {
291 return Ok(0);
292 }
293 let mut req_cnt = 0;
294 for (table_name, reqs) in all_reqs {
295 if reqs.is_empty() {
296 continue;
297 }
298 let (catalog, schema) = (table_name[0].clone(), table_name[1].clone());
299 let ctx = Arc::new(QueryContext::with(&catalog, &schema));
300
301 let (is_ts_placeholder, proto_schema) = match self
302 .try_fetch_existing_table(&table_name)
303 .await?
304 .context(UnexpectedSnafu {
305 reason: format!("Table not found: {}", table_name.join(".")),
306 }) {
307 Ok(r) => r,
308 Err(e) => {
309 if self
310 .table_info_source
311 .get_opt_table_id_from_name(&table_name)
312 .await?
313 .is_none()
314 {
315 common_telemetry::warn!(e; "Table `{}` no longer exists, skip writeback", table_name.join("."));
318 continue;
319 } else {
320 return Err(e);
321 }
322 }
323 };
324 let schema_len = proto_schema.len();
325
326 let total_rows = reqs.iter().map(|r| r.len()).sum::<usize>();
327 trace!(
328 "Sending {} writeback requests to table {}, reqs total rows={}",
329 reqs.len(),
330 table_name.join("."),
331 reqs.iter().map(|r| r.len()).sum::<usize>()
332 );
333
334 METRIC_FLOW_ROWS
335 .with_label_values(&["out-streaming"])
336 .inc_by(total_rows as u64);
337
338 let now = self.tick_manager.tick();
339 for req in reqs {
340 match req {
341 DiffRequest::Insert(insert) => {
342 let rows_proto: Vec<v1::Row> = insert
343 .into_iter()
344 .map(|(mut row, _ts)| {
345 if row.len() < proto_schema.len()
348 && proto_schema[row.len()].datatype
349 == greptime_proto::v1::ColumnDataType::TimestampMillisecond
350 as i32
351 {
352 row.extend([Value::from(
353 common_time::Timestamp::new_millisecond(now),
354 )]);
355 }
356 if is_ts_placeholder {
358 ensure!(
359 row.len() == schema_len - 1,
360 InternalSnafu {
361 reason: format!(
362 "Row len mismatch, expect {} got {}",
363 schema_len - 1,
364 row.len()
365 )
366 }
367 );
368 row.extend([Value::from(
369 common_time::Timestamp::new_millisecond(0),
370 )]);
371 }
372 if row.len() != proto_schema.len() {
373 UnexpectedSnafu {
374 reason: format!(
375 "Flow output row length mismatch, expect {} got {}, the columns in schema are: {:?}",
376 proto_schema.len(),
377 row.len(),
378 proto_schema.iter().map(|c|&c.column_name).collect_vec()
379 ),
380 }
381 .fail()?;
382 }
383 Ok(row.into())
384 })
385 .collect::<Result<Vec<_>, Error>>()?;
386 let table_name = table_name.last().unwrap().clone();
387 let req = RowInsertRequest {
388 table_name,
389 rows: Some(v1::Rows {
390 schema: proto_schema.clone(),
391 rows: rows_proto,
392 }),
393 };
394 req_cnt += 1;
395 self.frontend_invoker
396 .read()
397 .await
398 .as_ref()
399 .with_context(|| UnexpectedSnafu {
400 reason: "Expect a frontend invoker for flownode to write back",
401 })?
402 .row_inserts(RowInsertRequests { inserts: vec![req] }, ctx.clone())
403 .await
404 .map_err(BoxedError::new)
405 .with_context(|_| ExternalSnafu {})?;
406 }
407 DiffRequest::Delete(remove) => {
408 info!("original remove rows={:?}", remove);
409 let rows_proto: Vec<v1::Row> = remove
410 .into_iter()
411 .map(|(mut row, _ts)| {
412 row.extend(Some(Value::from(
413 common_time::Timestamp::new_millisecond(0),
414 )));
415 row.into()
416 })
417 .collect::<Vec<_>>();
418 let table_name = table_name.last().unwrap().clone();
419 let req = RowDeleteRequest {
420 table_name,
421 rows: Some(v1::Rows {
422 schema: proto_schema.clone(),
423 rows: rows_proto,
424 }),
425 };
426
427 req_cnt += 1;
428 self.frontend_invoker
429 .read()
430 .await
431 .as_ref()
432 .with_context(|| UnexpectedSnafu {
433 reason: "Expect a frontend invoker for flownode to write back",
434 })?
435 .row_deletes(RowDeleteRequests { deletes: vec![req] }, ctx.clone())
436 .await
437 .map_err(BoxedError::new)
438 .with_context(|_| ExternalSnafu {})?;
439 }
440 }
441 }
442 }
443 Ok(req_cnt)
444 }
445
446 pub async fn generate_writeback_request(
448 &self,
449 ) -> Result<BTreeMap<TableName, Vec<DiffRequest>>, Error> {
450 trace!("Start to generate writeback request");
451 let mut output = BTreeMap::new();
452 let mut total_row_count = 0;
453 for (name, sink_recv) in self
454 .node_context
455 .write()
456 .await
457 .sink_receiver
458 .iter_mut()
459 .map(|(n, (_s, r))| (n, r))
460 {
461 let mut batches = Vec::new();
462 while let Ok(batch) = sink_recv.try_recv() {
463 total_row_count += batch.row_count();
464 batches.push(batch);
465 }
466 let reqs = batches_to_rows_req(batches)?;
467 output.insert(name.clone(), reqs);
468 }
469 trace!("Prepare writeback req: total row count={}", total_row_count);
470 Ok(output)
471 }
472
473 async fn fetch_table_pk_schema(
475 &self,
476 table_name: &TableName,
477 ) -> Result<Option<(Vec<String>, Option<usize>, Vec<ColumnSchema>)>, Error> {
478 if let Some(table_id) = self
479 .table_info_source
480 .get_opt_table_id_from_name(table_name)
481 .await?
482 {
483 let table_info = self
484 .table_info_source
485 .get_table_info_value(&table_id)
486 .await?
487 .unwrap();
488 let meta = table_info.table_info.meta;
489 let primary_keys = meta
490 .primary_key_indices
491 .into_iter()
492 .map(|i| meta.schema.column_schemas[i].name.clone())
493 .collect_vec();
494 let schema = meta.schema.column_schemas;
495 let time_index = meta.schema.timestamp_index;
496 Ok(Some((primary_keys, time_index, schema)))
497 } else {
498 Ok(None)
499 }
500 }
501
502 async fn adjust_auto_created_table_schema(
507 &self,
508 schema: &RelationDesc,
509 ) -> Result<(Vec<String>, Vec<ColumnSchema>, bool), Error> {
510 let primary_keys = schema
514 .typ()
515 .keys
516 .first()
517 .map(|v| {
518 v.column_indices
519 .iter()
520 .map(|i| {
521 schema
522 .get_name(*i)
523 .clone()
524 .unwrap_or_else(|| format!("col_{i}"))
525 })
526 .collect_vec()
527 })
528 .unwrap_or_default();
529 let update_at = ColumnSchema::new(
530 AUTO_CREATED_UPDATE_AT_TS_COL,
531 ConcreteDataType::timestamp_millisecond_datatype(),
532 true,
533 );
534
535 let original_schema = relation_desc_to_column_schemas_with_fallback(schema);
536
537 let mut with_auto_added_col = original_schema.clone();
538 with_auto_added_col.push(update_at);
539
540 let no_time_index = schema.typ().time_index.is_none();
542 if no_time_index {
543 let ts_col = ColumnSchema::new(
544 AUTO_CREATED_PLACEHOLDER_TS_COL,
545 ConcreteDataType::timestamp_millisecond_datatype(),
546 true,
547 )
548 .with_time_index(true);
549 with_auto_added_col.push(ts_col);
550 }
551
552 Ok((primary_keys, with_auto_added_col, no_time_index))
553 }
554}
555
556impl StreamingEngine {
558 async fn start_state_report_handler(self: Arc<Self>) -> Option<JoinHandle<()>> {
562 let state_report_handler = self.state_report_handler.write().await.take();
563 if let Some(mut handler) = state_report_handler {
564 let zelf = self.clone();
565 let handler = common_runtime::spawn_global(async move {
566 while let Some(ret_handler) = handler.recv().await {
567 let state_report = zelf.gen_state_report().await;
568 ret_handler.send(state_report).unwrap_or_else(|err| {
569 common_telemetry::error!(err; "Send state size report error");
570 });
571 }
572 });
573 Some(handler)
574 } else {
575 None
576 }
577 }
578
579 pub fn run_background(
581 self: Arc<Self>,
582 shutdown: Option<broadcast::Receiver<()>>,
583 ) -> JoinHandle<()> {
584 info!("Starting flownode manager's background task");
585 common_runtime::spawn_global(async move {
586 let _state_report_handler = self.clone().start_state_report_handler().await;
587 self.run(shutdown).await;
588 })
589 }
590
591 pub async fn log_all_errors(&self) {
593 for (f_id, f_err) in self.flow_err_collectors.read().await.iter() {
594 let all_errors = f_err.get_all().await;
595 if !all_errors.is_empty() {
596 let all_errors = all_errors
597 .into_iter()
598 .map(|i| format!("{:?}", i))
599 .join("\n");
600 common_telemetry::error!("Flow {} has following errors: {}", f_id, all_errors);
601 }
602 }
603 }
604
605 pub async fn run(&self, mut shutdown: Option<broadcast::Receiver<()>>) {
609 debug!("Starting to run");
610 let default_interval = Duration::from_secs(1);
611 let mut tick_interval = tokio::time::interval(default_interval);
612 tick_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Burst);
614 let mut avg_spd = 0; let mut since_last_run = tokio::time::Instant::now();
616 let run_per_trace = 10;
617 let mut run_cnt = 0;
618 loop {
619 let row_cnt = self.run_available(false).await.unwrap_or_else(|err| {
621 common_telemetry::error!(err;"Run available errors");
622 0
623 });
624
625 if let Err(err) = self.send_writeback_requests().await {
626 common_telemetry::error!(err;"Send writeback request errors");
627 };
628 self.log_all_errors().await;
629
630 match &shutdown.as_mut().map(|s| s.try_recv()) {
632 Some(Ok(())) => {
633 info!("Shutdown flow's main loop");
634 break;
635 }
636 Some(Err(TryRecvError::Empty)) => (),
637 Some(Err(TryRecvError::Closed)) => {
638 common_telemetry::error!("Shutdown channel is closed");
639 break;
640 }
641 Some(Err(TryRecvError::Lagged(num))) => {
642 common_telemetry::error!(
643 "Shutdown channel is lagged by {}, meaning multiple shutdown cmd have been issued",
644 num
645 );
646 break;
647 }
648 None => (),
649 }
650
651 let wait_for = since_last_run.elapsed();
654
655 let cur_spd = row_cnt * 1000 / wait_for.as_millis().max(1) as usize;
657 avg_spd = if cur_spd > avg_spd {
659 cur_spd
660 } else {
661 (9 * avg_spd + cur_spd) / 10
662 };
663 let new_wait = BATCH_SIZE * 1000 / avg_spd.max(1); let new_wait = Duration::from_millis(new_wait as u64).min(default_interval);
665
666 if run_cnt >= run_per_trace {
669 trace!("avg_spd={} r/s, cur_spd={} r/s", avg_spd, cur_spd);
670 trace!("Wait for {} ms, row_cnt={}", new_wait.as_millis(), row_cnt);
671 run_cnt = 0;
672 } else {
673 run_cnt += 1;
674 }
675
676 METRIC_FLOW_RUN_INTERVAL_MS.set(new_wait.as_millis() as i64);
677 since_last_run = tokio::time::Instant::now();
678 tokio::select! {
679 _ = tick_interval.tick() => (),
680 _ = tokio::time::sleep(new_wait) => ()
681 }
682 }
683 self.frontend_invoker.write().await.take();
687 }
688
689 pub async fn run_available(&self, blocking: bool) -> Result<usize, Error> {
697 let mut row_cnt = 0;
698
699 let now = self.tick_manager.tick();
700 for worker in self.worker_handles.iter() {
701 worker.run_available(now, blocking).await?;
703 }
704 let flush_res = if blocking {
706 let ctx = self.node_context.read().await;
707 ctx.flush_all_sender().await
708 } else {
709 match self.node_context.try_read() {
710 Ok(ctx) => ctx.flush_all_sender().await,
711 Err(_) => return Ok(row_cnt),
712 }
713 };
714 match flush_res {
715 Ok(r) => {
716 common_telemetry::trace!("Total flushed {} rows", r);
717 row_cnt += r;
718 }
719 Err(err) => {
720 common_telemetry::error!("Flush send buf errors: {:?}", err);
721 }
722 };
723
724 Ok(row_cnt)
725 }
726
727 pub async fn handle_write_request(
729 &self,
730 region_id: RegionId,
731 rows: Vec<DiffRow>,
732 batch_datatypes: &[ConcreteDataType],
733 ) -> Result<(), Error> {
734 let rows_len = rows.len();
735 let table_id = region_id.table_id();
736 let _timer = METRIC_FLOW_INSERT_ELAPSED
737 .with_label_values(&[table_id.to_string().as_str()])
738 .start_timer();
739 self.node_context
740 .read()
741 .await
742 .send(table_id, rows, batch_datatypes)
743 .await?;
744 trace!(
745 "Handling write request for table_id={} with {} rows",
746 table_id, rows_len
747 );
748 Ok(())
749 }
750}
751
752impl StreamingEngine {
754 pub async fn remove_flow_inner(&self, flow_id: FlowId) -> Result<(), Error> {
756 for handle in self.worker_handles.iter() {
757 if handle.contains_flow(flow_id).await? {
758 handle.remove_flow(flow_id).await?;
759 break;
760 }
761 }
762 self.node_context.write().await.remove_flow(flow_id);
763 Ok(())
764 }
765
766 pub async fn create_flow_inner(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
772 let CreateFlowArgs {
773 flow_id,
774 sink_table_name,
775 source_table_ids,
776 create_if_not_exists,
777 or_replace,
778 expire_after,
779 eval_interval: _,
780 comment,
781 sql,
782 flow_options,
783 query_ctx,
784 } = args;
785
786 let mut node_ctx = self.node_context.write().await;
787 for source in &source_table_ids {
789 node_ctx
790 .assign_global_id_to_table(&self.table_info_source, None, Some(*source))
791 .await?;
792 }
793 node_ctx
794 .assign_global_id_to_table(&self.table_info_source, Some(sink_table_name.clone()), None)
795 .await?;
796
797 node_ctx.register_task_src_sink(flow_id, &source_table_ids, sink_table_name.clone());
798
799 node_ctx.query_context = query_ctx.map(Arc::new);
800 let flow_plan = sql_to_flow_plan(&mut node_ctx, &self.query_engine, &sql).await?;
802
803 debug!("Flow {:?}'s Plan is {:?}", flow_id, flow_plan);
804
805 if let Some((_, _, real_schema)) = self.fetch_table_pk_schema(&sink_table_name).await? {
808 let auto_schema = relation_desc_to_column_schemas_with_fallback(&flow_plan.schema);
809
810 for (idx, zipped) in auto_schema
814 .iter()
815 .zip_longest(real_schema.iter())
816 .enumerate()
817 {
818 match zipped {
819 EitherOrBoth::Both(auto, real) => {
820 if auto.data_type != real.data_type {
821 InvalidQuerySnafu {
822 reason: format!(
823 "Column {}(name is '{}', flow inferred name is '{}')'s data type mismatch, expect {:?} got {:?}",
824 idx,
825 real.name,
826 auto.name,
827 real.data_type,
828 auto.data_type
829 ),
830 }
831 .fail()?;
832 }
833 }
834 EitherOrBoth::Right(real) if real.data_type.is_timestamp() => {
835 continue;
837 }
838 _ => InvalidQuerySnafu {
839 reason: format!(
840 "schema length mismatched, expected {} found {}",
841 real_schema.len(),
842 auto_schema.len()
843 ),
844 }
845 .fail()?,
846 }
847 }
848 } else {
849 let did_create = self
852 .create_table_from_relation(
853 &format!("flow-id={flow_id}"),
854 &sink_table_name,
855 &flow_plan.schema,
856 )
857 .await?;
858 if !did_create {
859 UnexpectedSnafu {
860 reason: format!("Failed to create table {:?}", sink_table_name),
861 }
862 .fail()?;
863 }
864 }
865
866 node_ctx.add_flow_plan(flow_id, flow_plan.clone());
867
868 let _ = comment;
869 let _ = flow_options;
870
871 let sink_id = node_ctx.table_repr.get_by_name(&sink_table_name).unwrap().1;
873 let sink_sender = node_ctx.get_sink_by_global_id(&sink_id)?;
874
875 let source_ids = source_table_ids
876 .iter()
877 .map(|id| node_ctx.table_repr.get_by_table_id(id).unwrap().1)
878 .collect_vec();
879 let source_receivers = source_ids
880 .iter()
881 .map(|id| {
882 node_ctx
883 .get_source_by_global_id(id)
884 .map(|s| s.get_receiver())
885 })
886 .collect::<Result<Vec<_>, _>>()?;
887 let err_collector = ErrCollector::default();
888 self.flow_err_collectors
889 .write()
890 .await
891 .insert(flow_id, err_collector.clone());
892 let handle = self.get_worker_handle_for_create_flow().await;
894 let create_request = worker::Request::Create {
895 flow_id,
896 plan: flow_plan,
897 sink_id,
898 sink_sender,
899 source_ids,
900 src_recvs: source_receivers,
901 expire_after,
902 or_replace,
903 create_if_not_exists,
904 err_collector,
905 };
906
907 handle.create_flow(create_request).await?;
908 info!("Successfully create flow with id={}", flow_id);
909 Ok(Some(flow_id))
910 }
911
912 pub async fn flush_flow_inner(&self, flow_id: FlowId) -> Result<usize, Error> {
913 debug!("Starting to flush flow_id={:?}", flow_id);
914 drop(self.flush_lock.write().await);
917 let flushed_input_rows = self.node_context.read().await.flush_all_sender().await?;
918 let rows_send = self.run_available(true).await?;
919 let row = self.send_writeback_requests().await?;
920 debug!(
921 "Done to flush flow_id={:?} with {} input rows flushed, {} rows sent and {} output rows flushed",
922 flow_id, flushed_input_rows, rows_send, row
923 );
924 Ok(row)
925 }
926
927 pub async fn flow_exist_inner(&self, flow_id: FlowId) -> Result<bool, Error> {
928 let mut exist = false;
929 for handle in self.worker_handles.iter() {
930 if handle.contains_flow(flow_id).await? {
931 exist = true;
932 break;
933 }
934 }
935 Ok(exist)
936 }
937}
938
939#[derive(Clone, Debug)]
944pub struct FlowTickManager {
945 start: Instant,
947 start_timestamp: repr::Timestamp,
949}
950
951impl Default for FlowTickManager {
952 fn default() -> Self {
953 Self::new()
954 }
955}
956
957impl FlowTickManager {
958 pub fn new() -> Self {
959 FlowTickManager {
960 start: Instant::now(),
961 start_timestamp: SystemTime::now()
962 .duration_since(SystemTime::UNIX_EPOCH)
963 .unwrap()
964 .as_millis() as repr::Timestamp,
965 }
966 }
967
968 pub fn tick(&self) -> repr::Timestamp {
972 let current = Instant::now();
973 let since_the_epoch = current - self.start;
974 since_the_epoch.as_millis() as repr::Timestamp + self.start_timestamp
975 }
976}