1#![warn(unused_imports)]
18
19use std::collections::BTreeMap;
20use std::sync::Arc;
21use std::time::{Duration, Instant, SystemTime};
22
23use api::v1::{RowDeleteRequest, RowDeleteRequests, RowInsertRequest, RowInsertRequests};
24use common_base::memory_limit::MemoryLimit;
25use common_config::Configurable;
26use common_error::ext::BoxedError;
27use common_meta::key::TableMetadataManagerRef;
28use common_options::memory::MemoryOptions;
29use common_runtime::JoinHandle;
30use common_stat::get_total_cpu_cores;
31use common_telemetry::logging::{LoggingOptions, TracingOptions};
32use common_telemetry::{debug, info, trace};
33use datatypes::schema::ColumnSchema;
34use datatypes::value::Value;
35use greptime_proto::v1;
36use itertools::{EitherOrBoth, Itertools};
37use meta_client::MetaClientOptions;
38use query::QueryEngine;
39use query::options::QueryOptions;
40use serde::{Deserialize, Serialize};
41use servers::grpc::GrpcOptions;
42use servers::heartbeat_options::HeartbeatOptions;
43use servers::http::HttpOptions;
44use session::context::QueryContext;
45use snafu::{OptionExt, ResultExt, ensure};
46use store_api::storage::{ConcreteDataType, RegionId};
47use table::metadata::TableId;
48use tokio::sync::broadcast::error::TryRecvError;
49use tokio::sync::{Mutex, RwLock, broadcast, watch};
50
51pub(crate) use crate::adapter::node_context::FlownodeContext;
52use crate::adapter::refill::RefillTask;
53use crate::adapter::table_source::ManagedTableSource;
54use crate::adapter::util::relation_desc_to_column_schemas_with_fallback;
55pub(crate) use crate::adapter::worker::{Worker, WorkerHandle, create_worker};
56use crate::batching_mode::BatchingModeOptions;
57use crate::compute::ErrCollector;
58use crate::df_optimizer::sql_to_flow_plan;
59use crate::error::{EvalSnafu, ExternalSnafu, InternalSnafu, InvalidQuerySnafu, UnexpectedSnafu};
60use crate::expr::Batch;
61use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_ROWS, METRIC_FLOW_RUN_INTERVAL_MS};
62use crate::repr::{self, BATCH_SIZE, DiffRow, RelationDesc, Row};
63use crate::{CreateFlowArgs, FlowId, TableName};
64
65pub(crate) mod flownode_impl;
66mod parse_expr;
67pub(crate) mod refill;
68mod stat;
69#[cfg(test)]
70mod tests;
71pub(crate) mod util;
72mod worker;
73
74pub(crate) mod node_context;
75pub(crate) mod table_source;
76
77use crate::FrontendInvoker;
78use crate::error::Error;
79use crate::utils::StateReportHandler;
80
81pub const AUTO_CREATED_PLACEHOLDER_TS_COL: &str = "__ts_placeholder";
83
84pub const AUTO_CREATED_UPDATE_AT_TS_COL: &str = "update_at";
85
86#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
88#[serde(default)]
89pub struct FlowConfig {
90 pub num_workers: usize,
91 pub batching_mode: BatchingModeOptions,
92}
93
94impl Default for FlowConfig {
95 fn default() -> Self {
96 Self {
97 num_workers: (get_total_cpu_cores() / 2).max(1),
98 batching_mode: BatchingModeOptions::default(),
99 }
100 }
101}
102
103#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
105#[serde(default)]
106pub struct FlownodeOptions {
107 pub node_id: Option<u64>,
108 pub flow: FlowConfig,
109 pub grpc: GrpcOptions,
110 pub http: HttpOptions,
111 pub meta_client: Option<MetaClientOptions>,
112 pub logging: LoggingOptions,
113 pub tracing: TracingOptions,
114 pub heartbeat: HeartbeatOptions,
115 pub query: QueryOptions,
116 pub user_provider: Option<String>,
117 pub memory: MemoryOptions,
118}
119
120impl Default for FlownodeOptions {
121 fn default() -> Self {
122 Self {
123 node_id: None,
124 flow: FlowConfig::default(),
125 grpc: GrpcOptions::default().with_bind_addr("127.0.0.1:3004"),
126 http: HttpOptions::default(),
127 meta_client: None,
128 logging: LoggingOptions::default(),
129 tracing: TracingOptions::default(),
130 heartbeat: HeartbeatOptions::default(),
131 query: QueryOptions {
134 parallelism: 1,
135 allow_query_fallback: false,
136 memory_pool_size: MemoryLimit::default(),
137 },
138 user_provider: None,
139 memory: MemoryOptions::default(),
140 }
141 }
142}
143
144impl Configurable for FlownodeOptions {
145 fn validate_sanitize(&mut self) -> common_config::error::Result<()> {
146 if self.flow.num_workers == 0 {
147 self.flow.num_workers = (get_total_cpu_cores() / 2).max(1);
148 }
149 Ok(())
150 }
151}
152
153pub type FlowStreamingEngineRef = Arc<StreamingEngine>;
155
156pub struct StreamingEngine {
161 pub worker_handles: Vec<WorkerHandle>,
164 worker_selector: Mutex<usize>,
166 pub query_engine: Arc<dyn QueryEngine>,
168 table_info_source: ManagedTableSource,
170 frontend_invoker: RwLock<Option<FrontendInvoker>>,
171 node_context: RwLock<FlownodeContext>,
173 refill_tasks: RwLock<BTreeMap<FlowId, RefillTask>>,
175 flow_err_collectors: RwLock<BTreeMap<FlowId, ErrCollector>>,
176 src_send_buf_lens: RwLock<BTreeMap<TableId, watch::Receiver<usize>>>,
177 tick_manager: FlowTickManager,
178 pub node_id: Option<u32>,
180 flush_lock: RwLock<()>,
184 state_report_handler: RwLock<Option<StateReportHandler>>,
186}
187
188impl StreamingEngine {
190 pub async fn set_frontend_invoker(&self, frontend: FrontendInvoker) {
192 *self.frontend_invoker.write().await = Some(frontend);
193 }
194
195 pub fn new(
197 node_id: Option<u32>,
198 query_engine: Arc<dyn QueryEngine>,
199 table_meta: TableMetadataManagerRef,
200 ) -> Self {
201 let srv_map = ManagedTableSource::new(
202 table_meta.table_info_manager().clone(),
203 table_meta.table_name_manager().clone(),
204 );
205 let node_context = FlownodeContext::new(Box::new(srv_map.clone()) as _);
206 let tick_manager = FlowTickManager::new();
207 let worker_handles = Vec::new();
208 StreamingEngine {
209 worker_handles,
210 worker_selector: Mutex::new(0),
211 query_engine,
212 table_info_source: srv_map,
213 frontend_invoker: RwLock::new(None),
214 node_context: RwLock::new(node_context),
215 refill_tasks: Default::default(),
216 flow_err_collectors: Default::default(),
217 src_send_buf_lens: Default::default(),
218 tick_manager,
219 node_id,
220 flush_lock: RwLock::new(()),
221 state_report_handler: RwLock::new(None),
222 }
223 }
224
225 pub async fn with_state_report_handler(self, handler: StateReportHandler) -> Self {
226 *self.state_report_handler.write().await = Some(handler);
227 self
228 }
229
230 pub fn new_with_workers<'s>(
232 node_id: Option<u32>,
233 query_engine: Arc<dyn QueryEngine>,
234 table_meta: TableMetadataManagerRef,
235 num_workers: usize,
236 ) -> (Self, Vec<Worker<'s>>) {
237 let mut zelf = Self::new(node_id, query_engine, table_meta);
238
239 let workers: Vec<_> = (0..num_workers)
240 .map(|_| {
241 let (handle, worker) = create_worker();
242 zelf.add_worker_handle(handle);
243 worker
244 })
245 .collect();
246 (zelf, workers)
247 }
248
249 pub fn add_worker_handle(&mut self, handle: WorkerHandle) {
251 self.worker_handles.push(handle);
252 }
253}
254
255#[derive(Debug)]
256pub enum DiffRequest {
257 Insert(Vec<(Row, repr::Timestamp)>),
258 Delete(Vec<(Row, repr::Timestamp)>),
259}
260
261impl DiffRequest {
262 pub fn len(&self) -> usize {
263 match self {
264 Self::Insert(v) => v.len(),
265 Self::Delete(v) => v.len(),
266 }
267 }
268
269 pub fn is_empty(&self) -> bool {
270 self.len() == 0
271 }
272}
273
274pub fn batches_to_rows_req(batches: Vec<Batch>) -> Result<Vec<DiffRequest>, Error> {
275 let mut reqs = Vec::new();
276 for batch in batches {
277 let mut rows = Vec::with_capacity(batch.row_count());
278 for i in 0..batch.row_count() {
279 let row = batch.get_row(i).context(EvalSnafu)?;
280 rows.push((Row::new(row), 0));
281 }
282 reqs.push(DiffRequest::Insert(rows));
283 }
284 Ok(reqs)
285}
286
287impl StreamingEngine {
289 pub async fn send_writeback_requests(&self) -> Result<usize, Error> {
291 let all_reqs = self.generate_writeback_request().await?;
292 if all_reqs.is_empty() || all_reqs.iter().all(|v| v.1.is_empty()) {
293 return Ok(0);
294 }
295 let mut req_cnt = 0;
296 for (table_name, reqs) in all_reqs {
297 if reqs.is_empty() {
298 continue;
299 }
300 let (catalog, schema) = (table_name[0].clone(), table_name[1].clone());
301 let ctx = Arc::new(QueryContext::with(&catalog, &schema));
302
303 let (is_ts_placeholder, proto_schema) = match self
304 .try_fetch_existing_table(&table_name)
305 .await?
306 .context(UnexpectedSnafu {
307 reason: format!("Table not found: {}", table_name.join(".")),
308 }) {
309 Ok(r) => r,
310 Err(e) => {
311 if self
312 .table_info_source
313 .get_opt_table_id_from_name(&table_name)
314 .await?
315 .is_none()
316 {
317 common_telemetry::warn!(e; "Table `{}` no longer exists, skip writeback", table_name.join("."));
320 continue;
321 } else {
322 return Err(e);
323 }
324 }
325 };
326 let schema_len = proto_schema.len();
327
328 let total_rows = reqs.iter().map(|r| r.len()).sum::<usize>();
329 trace!(
330 "Sending {} writeback requests to table {}, reqs total rows={}",
331 reqs.len(),
332 table_name.join("."),
333 reqs.iter().map(|r| r.len()).sum::<usize>()
334 );
335
336 METRIC_FLOW_ROWS
337 .with_label_values(&["out-streaming"])
338 .inc_by(total_rows as u64);
339
340 let now = self.tick_manager.tick();
341 for req in reqs {
342 match req {
343 DiffRequest::Insert(insert) => {
344 let rows_proto: Vec<v1::Row> = insert
345 .into_iter()
346 .map(|(mut row, _ts)| {
347 if row.len() < proto_schema.len()
350 && proto_schema[row.len()].datatype
351 == greptime_proto::v1::ColumnDataType::TimestampMillisecond
352 as i32
353 {
354 row.extend([Value::from(
355 common_time::Timestamp::new_millisecond(now),
356 )]);
357 }
358 if is_ts_placeholder {
360 ensure!(
361 row.len() == schema_len - 1,
362 InternalSnafu {
363 reason: format!(
364 "Row len mismatch, expect {} got {}",
365 schema_len - 1,
366 row.len()
367 )
368 }
369 );
370 row.extend([Value::from(
371 common_time::Timestamp::new_millisecond(0),
372 )]);
373 }
374 if row.len() != proto_schema.len() {
375 UnexpectedSnafu {
376 reason: format!(
377 "Flow output row length mismatch, expect {} got {}, the columns in schema are: {:?}",
378 proto_schema.len(),
379 row.len(),
380 proto_schema.iter().map(|c|&c.column_name).collect_vec()
381 ),
382 }
383 .fail()?;
384 }
385 Ok(row.into())
386 })
387 .collect::<Result<Vec<_>, Error>>()?;
388 let table_name = table_name.last().unwrap().clone();
389 let req = RowInsertRequest {
390 table_name,
391 rows: Some(v1::Rows {
392 schema: proto_schema.clone(),
393 rows: rows_proto,
394 }),
395 };
396 req_cnt += 1;
397 self.frontend_invoker
398 .read()
399 .await
400 .as_ref()
401 .with_context(|| UnexpectedSnafu {
402 reason: "Expect a frontend invoker for flownode to write back",
403 })?
404 .row_inserts(RowInsertRequests { inserts: vec![req] }, ctx.clone())
405 .await
406 .map_err(BoxedError::new)
407 .with_context(|_| ExternalSnafu {})?;
408 }
409 DiffRequest::Delete(remove) => {
410 info!("original remove rows={:?}", remove);
411 let rows_proto: Vec<v1::Row> = remove
412 .into_iter()
413 .map(|(mut row, _ts)| {
414 row.extend(Some(Value::from(
415 common_time::Timestamp::new_millisecond(0),
416 )));
417 row.into()
418 })
419 .collect::<Vec<_>>();
420 let table_name = table_name.last().unwrap().clone();
421 let req = RowDeleteRequest {
422 table_name,
423 rows: Some(v1::Rows {
424 schema: proto_schema.clone(),
425 rows: rows_proto,
426 }),
427 };
428
429 req_cnt += 1;
430 self.frontend_invoker
431 .read()
432 .await
433 .as_ref()
434 .with_context(|| UnexpectedSnafu {
435 reason: "Expect a frontend invoker for flownode to write back",
436 })?
437 .row_deletes(RowDeleteRequests { deletes: vec![req] }, ctx.clone())
438 .await
439 .map_err(BoxedError::new)
440 .with_context(|_| ExternalSnafu {})?;
441 }
442 }
443 }
444 }
445 Ok(req_cnt)
446 }
447
448 pub async fn generate_writeback_request(
450 &self,
451 ) -> Result<BTreeMap<TableName, Vec<DiffRequest>>, Error> {
452 trace!("Start to generate writeback request");
453 let mut output = BTreeMap::new();
454 let mut total_row_count = 0;
455 for (name, sink_recv) in self
456 .node_context
457 .write()
458 .await
459 .sink_receiver
460 .iter_mut()
461 .map(|(n, (_s, r))| (n, r))
462 {
463 let mut batches = Vec::new();
464 while let Ok(batch) = sink_recv.try_recv() {
465 total_row_count += batch.row_count();
466 batches.push(batch);
467 }
468 let reqs = batches_to_rows_req(batches)?;
469 output.insert(name.clone(), reqs);
470 }
471 trace!("Prepare writeback req: total row count={}", total_row_count);
472 Ok(output)
473 }
474
475 async fn fetch_table_pk_schema(
477 &self,
478 table_name: &TableName,
479 ) -> Result<Option<(Vec<String>, Option<usize>, Vec<ColumnSchema>)>, Error> {
480 if let Some(table_id) = self
481 .table_info_source
482 .get_opt_table_id_from_name(table_name)
483 .await?
484 {
485 let table_info = self
486 .table_info_source
487 .get_table_info_value(&table_id)
488 .await?
489 .unwrap();
490 let meta = table_info.table_info.meta;
491 let primary_keys = meta
492 .primary_key_indices
493 .into_iter()
494 .map(|i| meta.schema.column_schemas[i].name.clone())
495 .collect_vec();
496 let schema = meta.schema.column_schemas;
497 let time_index = meta.schema.timestamp_index;
498 Ok(Some((primary_keys, time_index, schema)))
499 } else {
500 Ok(None)
501 }
502 }
503
504 async fn adjust_auto_created_table_schema(
509 &self,
510 schema: &RelationDesc,
511 ) -> Result<(Vec<String>, Vec<ColumnSchema>, bool), Error> {
512 let primary_keys = schema
516 .typ()
517 .keys
518 .first()
519 .map(|v| {
520 v.column_indices
521 .iter()
522 .map(|i| {
523 schema
524 .get_name(*i)
525 .clone()
526 .unwrap_or_else(|| format!("col_{i}"))
527 })
528 .collect_vec()
529 })
530 .unwrap_or_default();
531 let update_at = ColumnSchema::new(
532 AUTO_CREATED_UPDATE_AT_TS_COL,
533 ConcreteDataType::timestamp_millisecond_datatype(),
534 true,
535 );
536
537 let original_schema = relation_desc_to_column_schemas_with_fallback(schema);
538
539 let mut with_auto_added_col = original_schema.clone();
540 with_auto_added_col.push(update_at);
541
542 let no_time_index = schema.typ().time_index.is_none();
544 if no_time_index {
545 let ts_col = ColumnSchema::new(
546 AUTO_CREATED_PLACEHOLDER_TS_COL,
547 ConcreteDataType::timestamp_millisecond_datatype(),
548 true,
549 )
550 .with_time_index(true);
551 with_auto_added_col.push(ts_col);
552 }
553
554 Ok((primary_keys, with_auto_added_col, no_time_index))
555 }
556}
557
558impl StreamingEngine {
560 async fn start_state_report_handler(self: Arc<Self>) -> Option<JoinHandle<()>> {
564 let state_report_handler = self.state_report_handler.write().await.take();
565 if let Some(mut handler) = state_report_handler {
566 let zelf = self.clone();
567 let handler = common_runtime::spawn_global(async move {
568 while let Some(ret_handler) = handler.recv().await {
569 let state_report = zelf.gen_state_report().await;
570 ret_handler.send(state_report).unwrap_or_else(|err| {
571 common_telemetry::error!(err; "Send state size report error");
572 });
573 }
574 });
575 Some(handler)
576 } else {
577 None
578 }
579 }
580
581 pub fn run_background(
583 self: Arc<Self>,
584 shutdown: Option<broadcast::Receiver<()>>,
585 ) -> JoinHandle<()> {
586 info!("Starting flownode manager's background task");
587 common_runtime::spawn_global(async move {
588 let _state_report_handler = self.clone().start_state_report_handler().await;
589 self.run(shutdown).await;
590 })
591 }
592
593 pub async fn log_all_errors(&self) {
595 for (f_id, f_err) in self.flow_err_collectors.read().await.iter() {
596 let all_errors = f_err.get_all().await;
597 if !all_errors.is_empty() {
598 let all_errors = all_errors
599 .into_iter()
600 .map(|i| format!("{:?}", i))
601 .join("\n");
602 common_telemetry::error!("Flow {} has following errors: {}", f_id, all_errors);
603 }
604 }
605 }
606
607 pub async fn run(&self, mut shutdown: Option<broadcast::Receiver<()>>) {
611 debug!("Starting to run");
612 let default_interval = Duration::from_secs(1);
613 let mut tick_interval = tokio::time::interval(default_interval);
614 tick_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Burst);
616 let mut avg_spd = 0; let mut since_last_run = tokio::time::Instant::now();
618 let run_per_trace = 10;
619 let mut run_cnt = 0;
620 loop {
621 let row_cnt = self.run_available(false).await.unwrap_or_else(|err| {
623 common_telemetry::error!(err;"Run available errors");
624 0
625 });
626
627 if let Err(err) = self.send_writeback_requests().await {
628 common_telemetry::error!(err;"Send writeback request errors");
629 };
630 self.log_all_errors().await;
631
632 match &shutdown.as_mut().map(|s| s.try_recv()) {
634 Some(Ok(())) => {
635 info!("Shutdown flow's main loop");
636 break;
637 }
638 Some(Err(TryRecvError::Empty)) => (),
639 Some(Err(TryRecvError::Closed)) => {
640 common_telemetry::error!("Shutdown channel is closed");
641 break;
642 }
643 Some(Err(TryRecvError::Lagged(num))) => {
644 common_telemetry::error!(
645 "Shutdown channel is lagged by {}, meaning multiple shutdown cmd have been issued",
646 num
647 );
648 break;
649 }
650 None => (),
651 }
652
653 let wait_for = since_last_run.elapsed();
656
657 let cur_spd = row_cnt * 1000 / wait_for.as_millis().max(1) as usize;
659 avg_spd = if cur_spd > avg_spd {
661 cur_spd
662 } else {
663 (9 * avg_spd + cur_spd) / 10
664 };
665 let new_wait = BATCH_SIZE * 1000 / avg_spd.max(1); let new_wait = Duration::from_millis(new_wait as u64).min(default_interval);
667
668 if run_cnt >= run_per_trace {
671 trace!("avg_spd={} r/s, cur_spd={} r/s", avg_spd, cur_spd);
672 trace!("Wait for {} ms, row_cnt={}", new_wait.as_millis(), row_cnt);
673 run_cnt = 0;
674 } else {
675 run_cnt += 1;
676 }
677
678 METRIC_FLOW_RUN_INTERVAL_MS.set(new_wait.as_millis() as i64);
679 since_last_run = tokio::time::Instant::now();
680 tokio::select! {
681 _ = tick_interval.tick() => (),
682 _ = tokio::time::sleep(new_wait) => ()
683 }
684 }
685 self.frontend_invoker.write().await.take();
689 }
690
691 pub async fn run_available(&self, blocking: bool) -> Result<usize, Error> {
699 let mut row_cnt = 0;
700
701 let now = self.tick_manager.tick();
702 for worker in self.worker_handles.iter() {
703 worker.run_available(now, blocking).await?;
705 }
706 let flush_res = if blocking {
708 let ctx = self.node_context.read().await;
709 ctx.flush_all_sender().await
710 } else {
711 match self.node_context.try_read() {
712 Ok(ctx) => ctx.flush_all_sender().await,
713 Err(_) => return Ok(row_cnt),
714 }
715 };
716 match flush_res {
717 Ok(r) => {
718 common_telemetry::trace!("Total flushed {} rows", r);
719 row_cnt += r;
720 }
721 Err(err) => {
722 common_telemetry::error!("Flush send buf errors: {:?}", err);
723 }
724 };
725
726 Ok(row_cnt)
727 }
728
729 pub async fn handle_write_request(
731 &self,
732 region_id: RegionId,
733 rows: Vec<DiffRow>,
734 batch_datatypes: &[ConcreteDataType],
735 ) -> Result<(), Error> {
736 let rows_len = rows.len();
737 let table_id = region_id.table_id();
738 let _timer = METRIC_FLOW_INSERT_ELAPSED
739 .with_label_values(&[table_id.to_string().as_str()])
740 .start_timer();
741 self.node_context
742 .read()
743 .await
744 .send(table_id, rows, batch_datatypes)
745 .await?;
746 trace!(
747 "Handling write request for table_id={} with {} rows",
748 table_id, rows_len
749 );
750 Ok(())
751 }
752}
753
754impl StreamingEngine {
756 pub async fn remove_flow_inner(&self, flow_id: FlowId) -> Result<(), Error> {
758 for handle in self.worker_handles.iter() {
759 if handle.contains_flow(flow_id).await? {
760 handle.remove_flow(flow_id).await?;
761 break;
762 }
763 }
764 self.node_context.write().await.remove_flow(flow_id);
765 Ok(())
766 }
767
768 pub async fn create_flow_inner(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
774 let CreateFlowArgs {
775 flow_id,
776 sink_table_name,
777 source_table_ids,
778 create_if_not_exists,
779 or_replace,
780 expire_after,
781 eval_interval: _,
782 comment,
783 sql,
784 flow_options,
785 query_ctx,
786 } = args;
787
788 let mut node_ctx = self.node_context.write().await;
789 for source in &source_table_ids {
791 node_ctx
792 .assign_global_id_to_table(&self.table_info_source, None, Some(*source))
793 .await?;
794 }
795 node_ctx
796 .assign_global_id_to_table(&self.table_info_source, Some(sink_table_name.clone()), None)
797 .await?;
798
799 node_ctx.register_task_src_sink(flow_id, &source_table_ids, sink_table_name.clone());
800
801 node_ctx.query_context = query_ctx.map(Arc::new);
802 let flow_plan = sql_to_flow_plan(&mut node_ctx, &self.query_engine, &sql).await?;
804
805 debug!("Flow {:?}'s Plan is {:?}", flow_id, flow_plan);
806
807 if let Some((_, _, real_schema)) = self.fetch_table_pk_schema(&sink_table_name).await? {
810 let auto_schema = relation_desc_to_column_schemas_with_fallback(&flow_plan.schema);
811
812 for (idx, zipped) in auto_schema
816 .iter()
817 .zip_longest(real_schema.iter())
818 .enumerate()
819 {
820 match zipped {
821 EitherOrBoth::Both(auto, real) => {
822 if auto.data_type != real.data_type {
823 InvalidQuerySnafu {
824 reason: format!(
825 "Column {}(name is '{}', flow inferred name is '{}')'s data type mismatch, expect {:?} got {:?}",
826 idx,
827 real.name,
828 auto.name,
829 real.data_type,
830 auto.data_type
831 ),
832 }
833 .fail()?;
834 }
835 }
836 EitherOrBoth::Right(real) if real.data_type.is_timestamp() => {
837 continue;
839 }
840 _ => InvalidQuerySnafu {
841 reason: format!(
842 "schema length mismatched, expected {} found {}",
843 real_schema.len(),
844 auto_schema.len()
845 ),
846 }
847 .fail()?,
848 }
849 }
850 } else {
851 let did_create = self
854 .create_table_from_relation(
855 &format!("flow-id={flow_id}"),
856 &sink_table_name,
857 &flow_plan.schema,
858 )
859 .await?;
860 if !did_create {
861 UnexpectedSnafu {
862 reason: format!("Failed to create table {:?}", sink_table_name),
863 }
864 .fail()?;
865 }
866 }
867
868 node_ctx.add_flow_plan(flow_id, flow_plan.clone());
869
870 let _ = comment;
871 let _ = flow_options;
872
873 let sink_id = node_ctx.table_repr.get_by_name(&sink_table_name).unwrap().1;
875 let sink_sender = node_ctx.get_sink_by_global_id(&sink_id)?;
876
877 let source_ids = source_table_ids
878 .iter()
879 .map(|id| node_ctx.table_repr.get_by_table_id(id).unwrap().1)
880 .collect_vec();
881 let source_receivers = source_ids
882 .iter()
883 .map(|id| {
884 node_ctx
885 .get_source_by_global_id(id)
886 .map(|s| s.get_receiver())
887 })
888 .collect::<Result<Vec<_>, _>>()?;
889 let err_collector = ErrCollector::default();
890 self.flow_err_collectors
891 .write()
892 .await
893 .insert(flow_id, err_collector.clone());
894 let handle = self.get_worker_handle_for_create_flow().await;
896 let create_request = worker::Request::Create {
897 flow_id,
898 plan: flow_plan,
899 sink_id,
900 sink_sender,
901 source_ids,
902 src_recvs: source_receivers,
903 expire_after,
904 or_replace,
905 create_if_not_exists,
906 err_collector,
907 };
908
909 handle.create_flow(create_request).await?;
910 info!("Successfully create flow with id={}", flow_id);
911 Ok(Some(flow_id))
912 }
913
914 pub async fn flush_flow_inner(&self, flow_id: FlowId) -> Result<usize, Error> {
915 debug!("Starting to flush flow_id={:?}", flow_id);
916 drop(self.flush_lock.write().await);
919 let flushed_input_rows = self.node_context.read().await.flush_all_sender().await?;
920 let rows_send = self.run_available(true).await?;
921 let row = self.send_writeback_requests().await?;
922 debug!(
923 "Done to flush flow_id={:?} with {} input rows flushed, {} rows sent and {} output rows flushed",
924 flow_id, flushed_input_rows, rows_send, row
925 );
926 Ok(row)
927 }
928
929 pub async fn flow_exist_inner(&self, flow_id: FlowId) -> Result<bool, Error> {
930 let mut exist = false;
931 for handle in self.worker_handles.iter() {
932 if handle.contains_flow(flow_id).await? {
933 exist = true;
934 break;
935 }
936 }
937 Ok(exist)
938 }
939}
940
941#[derive(Clone, Debug)]
946pub struct FlowTickManager {
947 start: Instant,
949 start_timestamp: repr::Timestamp,
951}
952
953impl Default for FlowTickManager {
954 fn default() -> Self {
955 Self::new()
956 }
957}
958
959impl FlowTickManager {
960 pub fn new() -> Self {
961 FlowTickManager {
962 start: Instant::now(),
963 start_timestamp: SystemTime::now()
964 .duration_since(SystemTime::UNIX_EPOCH)
965 .unwrap()
966 .as_millis() as repr::Timestamp,
967 }
968 }
969
970 pub fn tick(&self) -> repr::Timestamp {
974 let current = Instant::now();
975 let since_the_epoch = current - self.start;
976 since_the_epoch.as_millis() as repr::Timestamp + self.start_timestamp
977 }
978}