1use std::collections::{HashMap, HashSet};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19
20use api::v1::flow::{
21 CreateRequest, DirtyWindowRequests, DropRequest, FlowRequest, FlowResponse, FlushFlow,
22 flow_request,
23};
24use api::v1::region::InsertRequests;
25use catalog::CatalogManager;
26use common_base::Plugins;
27use common_error::ext::BoxedError;
28use common_meta::ddl::create_flow::{
29 FlowType, INTERNAL_EVAL_SCHEDULE_KEY, effective_eval_schedule_from_flow_info,
30};
31use common_meta::error::Result as MetaResult;
32use common_meta::key::flow::FlowMetadataManager;
33use common_meta::key::flow::flow_info::FlowScheduleConfig;
34use common_meta::key::flow::flow_state::FlowStat;
35use common_runtime::JoinHandle;
36use common_telemetry::{error, info, trace, warn};
37use datatypes::value::Value;
38use futures::TryStreamExt;
39use itertools::Itertools;
40use operator::utils::try_to_session_query_context;
41use session::context::QueryContextBuilder;
42use snafu::{IntoError, OptionExt, ResultExt, ensure};
43use store_api::storage::{RegionId, TableId};
44use tokio::sync::{Mutex, RwLock};
45
46use crate::adapter::{CreateFlowArgs, StreamingEngine};
47use crate::batching_mode::engine::BatchingEngine;
48use crate::engine::{FlowEngine, FlowStatProvider};
49use crate::error::{
50 CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, FlowNotRecoveredSnafu,
51 IllegalCheckTaskStateSnafu, InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu,
52 NoAvailableFrontendSnafu, SyncCheckTaskSnafu, UnexpectedSnafu, UnsupportedSnafu,
53};
54use crate::metrics::{METRIC_FLOW_ROWS, METRIC_FLOW_TASK_COUNT};
55use crate::repr::{self, DiffRow};
56use crate::utils::StateReportHandler;
57use crate::{Error, FlowId};
58
59pub type FlowDualEngineRef = Arc<FlowDualEngine>;
61
62pub struct FlowDualEngine {
67 streaming_engine: Arc<StreamingEngine>,
68 batching_engine: Arc<BatchingEngine>,
69 state_report_handler: RwLock<Option<StateReportHandler>>,
71 src_table2flow: RwLock<SrcTableToFlow>,
73 flow_metadata_manager: Arc<FlowMetadataManager>,
74 catalog_manager: Arc<dyn CatalogManager>,
75 check_task: tokio::sync::Mutex<Option<ConsistentCheckTask>>,
76 plugins: Plugins,
77 done_recovering: AtomicBool,
78}
79
80impl FlowDualEngine {
81 pub fn new(
82 streaming_engine: Arc<StreamingEngine>,
83 batching_engine: Arc<BatchingEngine>,
84 flow_metadata_manager: Arc<FlowMetadataManager>,
85 catalog_manager: Arc<dyn CatalogManager>,
86 plugins: Plugins,
87 ) -> Self {
88 Self {
89 streaming_engine,
90 batching_engine,
91 state_report_handler: Default::default(),
92 src_table2flow: RwLock::new(SrcTableToFlow::default()),
93 flow_metadata_manager,
94 catalog_manager,
95 check_task: Mutex::new(None),
96 plugins,
97 done_recovering: AtomicBool::new(false),
98 }
99 }
100
101 pub fn set_done_recovering(&self) {
104 info!("FlowDualEngine done recovering");
105 self.done_recovering
106 .store(true, std::sync::atomic::Ordering::Release);
107 }
108
109 pub fn is_recover_done(&self) -> bool {
111 self.done_recovering
112 .load(std::sync::atomic::Ordering::Acquire)
113 }
114
115 async fn wait_for_all_flow_recover(&self, waiting_req_cnt: usize) -> Result<(), Error> {
117 if self.is_recover_done() {
118 return Ok(());
119 }
120
121 warn!(
122 "FlowDualEngine is not done recovering, {} insert request waiting for recovery",
123 waiting_req_cnt
124 );
125 let mut retry = 0;
128 let max_retry = 3;
129 while retry < max_retry && !self.is_recover_done() {
130 warn!(
131 "FlowDualEngine is not done recovering, retry {} in 1s",
132 retry
133 );
134 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
135 retry += 1;
136 }
137 if retry == max_retry {
138 return FlowNotRecoveredSnafu.fail();
139 } else {
140 info!("FlowDualEngine is done recovering");
141 }
142 Ok(())
144 }
145
146 pub fn plugins(&self) -> &Plugins {
147 &self.plugins
148 }
149
150 pub fn is_distributed(&self) -> bool {
152 self.streaming_engine.node_id.is_some()
153 }
154
155 pub fn streaming_engine(&self) -> Arc<StreamingEngine> {
156 self.streaming_engine.clone()
157 }
158
159 pub fn batching_engine(&self) -> Arc<BatchingEngine> {
160 self.batching_engine.clone()
161 }
162
163 pub async fn set_state_report_handler(&self, handler: StateReportHandler) {
164 *self.state_report_handler.write().await = Some(handler);
165 }
166
167 pub async fn gen_state_report(&self) -> FlowStat {
168 let streaming = self.streaming_engine.flow_stat().await;
169 let batching = self.batching_engine.flow_stat().await;
170
171 let mut state_size = streaming.state_size;
172 state_size.extend(batching.state_size);
173
174 let mut last_exec_time_map = streaming.last_exec_time_map;
175 last_exec_time_map.extend(batching.last_exec_time_map);
176
177 FlowStat {
178 state_size,
179 last_exec_time_map,
180 }
181 }
182
183 pub async fn start_state_report_task(self: Arc<Self>) -> Option<JoinHandle<()>> {
187 let state_report_handler = self.state_report_handler.write().await.take();
188 if let Some(mut handler) = state_report_handler {
189 let zelf = self.clone();
190 let handler = common_runtime::spawn_global(async move {
191 while let Some(ret_handler) = handler.recv().await {
192 let state_report = zelf.gen_state_report().await;
193 ret_handler.send(state_report).unwrap_or_else(|err| {
194 common_telemetry::error!(err; "Send state report error");
195 });
196 }
197 });
198 Some(handler)
199 } else {
200 None
201 }
202 }
203
204 async fn wait_for_available_frontend(&self, timeout: std::time::Duration) -> Result<(), Error> {
208 if !self.is_distributed() {
209 return Ok(());
210 }
211 let frontend_client = self.batching_engine().frontend_client.clone();
212 let sleep_duration = std::time::Duration::from_millis(1_000);
213 let now = std::time::Instant::now();
214 loop {
215 let frontend_list = frontend_client.scan_for_frontend().await?;
216 if !frontend_list.is_empty() {
217 let fe_list = frontend_list
218 .iter()
219 .map(|peer| &peer.addr)
220 .collect::<Vec<_>>();
221 let probe_failures = frontend_client
222 .check_all_frontends_without_auth(&frontend_list)
223 .await?;
224 if probe_failures.is_empty() {
225 info!(
226 "Available frontend found and unauthenticated probe succeeded: {:?}",
227 fe_list
228 );
229 return Ok(());
230 }
231 warn!(
232 "Unauthenticated frontend probe failed, will retry. frontends={:?}, failures={:?}",
233 fe_list, probe_failures
234 );
235 }
236 let elapsed = now.elapsed();
237 tokio::time::sleep(sleep_duration).await;
238 info!("Waiting for available frontend, elapsed={:?}", elapsed);
239 if elapsed >= timeout {
240 return NoAvailableFrontendSnafu {
241 timeout,
242 context: "No frontend accepted unauthenticated flownode probe",
243 }
244 .fail();
245 }
246 }
247 }
248
249 async fn try_sync_with_check_task(
253 &self,
254 flow_id: FlowId,
255 allow_drop: bool,
256 ) -> Result<(), Error> {
257 info!("Try to sync with check task for flow {}", flow_id);
259 let mut retry = 0;
260 let max_retry = 10;
261 while retry < max_retry {
263 if let Some(task) = self.check_task.lock().await.as_ref() {
264 task.trigger(false, allow_drop).await?;
265 break;
266 }
267 retry += 1;
268 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
269 }
270
271 if retry == max_retry {
272 error!(
273 "Can't sync with check task for flow {} with allow_drop={}",
274 flow_id, allow_drop
275 );
276 return SyncCheckTaskSnafu {
277 flow_id,
278 allow_drop,
279 }
280 .fail();
281 }
282 info!("Successfully sync with check task for flow {}", flow_id);
283
284 Ok(())
285 }
286
287 async fn check_flow_consistent(
290 &self,
291 allow_create: bool,
292 allow_drop: bool,
293 ) -> Result<(), Error> {
294 let nodeid = self.streaming_engine.node_id;
296 let should_exists: Vec<_> = if let Some(nodeid) = nodeid {
297 let to_be_recover = self
300 .flow_metadata_manager
301 .flownode_flow_manager()
302 .flows(nodeid.into())
303 .try_collect::<Vec<_>>()
304 .await
305 .context(ListFlowsSnafu {
306 id: Some(nodeid.into()),
307 })?;
308 to_be_recover.into_iter().map(|(id, _)| id).collect()
309 } else {
310 let all_catalogs = self
313 .catalog_manager
314 .catalog_names()
315 .await
316 .map_err(BoxedError::new)
317 .context(ExternalSnafu)?;
318 let mut all_flow_ids = vec![];
319 for catalog in all_catalogs {
320 let flows = self
321 .flow_metadata_manager
322 .flow_name_manager()
323 .flow_names(&catalog)
324 .await
325 .try_collect::<Vec<_>>()
326 .await
327 .map_err(BoxedError::new)
328 .context(ExternalSnafu)?;
329
330 all_flow_ids.extend(flows.into_iter().map(|(_, id)| id.flow_id()));
331 }
332 all_flow_ids
333 };
334 let should_exists = should_exists
335 .into_iter()
336 .map(|i| i as FlowId)
337 .collect::<HashSet<_>>();
338 let actual_exists = self.list_flows().await?.into_iter().collect::<HashSet<_>>();
339 let to_be_created = should_exists
340 .iter()
341 .filter(|id| !actual_exists.contains(id))
342 .collect::<Vec<_>>();
343 let to_be_dropped = actual_exists
344 .iter()
345 .filter(|id| !should_exists.contains(id))
346 .collect::<Vec<_>>();
347
348 if !to_be_created.is_empty() {
349 if allow_create {
350 info!(
351 "Recovering {} flows: {:?}",
352 to_be_created.len(),
353 to_be_created
354 );
355 let mut errors = vec![];
356 for flow_id in to_be_created.clone() {
357 let flow_id = *flow_id;
358 let info = self
359 .flow_metadata_manager
360 .flow_info_manager()
361 .get(flow_id as u32)
362 .await
363 .map_err(BoxedError::new)
364 .context(ExternalSnafu)?
365 .context(FlowNotFoundSnafu { id: flow_id })?;
366
367 let sink_table_name = [
368 info.sink_table_name().catalog_name.clone(),
369 info.sink_table_name().schema_name.clone(),
370 info.sink_table_name().table_name.clone(),
371 ];
372 let args = CreateFlowArgs {
373 flow_id,
374 sink_table_name,
375 source_table_ids: info.source_table_ids().to_vec(),
376 create_if_not_exists: true,
380 or_replace: true,
381 expire_after: info.expire_after(),
382 eval_interval: info.eval_interval(),
383 comment: Some(info.comment().clone()),
384 sql: info.raw_sql().clone(),
385 flow_options: info.options().clone(),
386 eval_schedule: effective_eval_schedule_from_flow_info(&info),
387 query_ctx: info
388 .query_context()
389 .clone()
390 .map(|ctx| {
391 try_to_session_query_context(ctx)
392 .map_err(BoxedError::new)
393 .context(ExternalSnafu)
394 })
395 .transpose()?
396 .or_else(|| {
399 Some(
400 QueryContextBuilder::default()
401 .current_catalog(info.catalog_name().clone())
402 .build(),
403 )
404 }),
405 };
406 if let Err(err) = self
407 .create_flow(args)
408 .await
409 .map_err(BoxedError::new)
410 .with_context(|_| CreateFlowSnafu {
411 sql: info.raw_sql().clone(),
412 })
413 {
414 errors.push((flow_id, err));
415 }
416 }
417 if errors.is_empty() {
418 info!("Recover flows successfully, flows: {:?}", to_be_created);
419 }
420
421 for (flow_id, err) in errors {
422 warn!("Failed to recreate flow {}, err={:#?}", flow_id, err);
423 }
424 } else {
425 warn!(
426 "Flows do not exist in flownode for node {:?}, flow_ids={:?}",
427 nodeid, to_be_created
428 );
429 }
430 }
431 if !to_be_dropped.is_empty() {
432 if allow_drop {
433 info!("Dropping flows: {:?}", to_be_dropped);
434 let mut errors = vec![];
435 for flow_id in to_be_dropped {
436 let flow_id = *flow_id;
437 if let Err(err) = self.remove_flow(flow_id).await {
438 errors.push((flow_id, err));
439 }
440 }
441 for (flow_id, err) in errors {
442 warn!("Failed to drop flow {}, err={:#?}", flow_id, err);
443 }
444 } else {
445 warn!(
446 "Flows do not exist in metadata for node {:?}, flow_ids={:?}",
447 nodeid, to_be_dropped
448 );
449 }
450 }
451 Ok(())
452 }
453
454 pub async fn start_flow_consistent_check_task(self: &Arc<Self>) -> Result<(), Error> {
456 let mut check_task = self.check_task.lock().await;
457 ensure!(
458 check_task.is_none(),
459 IllegalCheckTaskStateSnafu {
460 reason: "Flow consistent check task already exists",
461 }
462 );
463 let task = ConsistentCheckTask::start_check_task(self).await?;
464 *check_task = Some(task);
465 Ok(())
466 }
467
468 pub async fn stop_flow_consistent_check_task(&self) -> Result<(), Error> {
469 info!("Stopping flow consistent check task");
470 let mut check_task = self.check_task.lock().await;
471
472 ensure!(
473 check_task.is_some(),
474 IllegalCheckTaskStateSnafu {
475 reason: "Flow consistent check task does not exist",
476 }
477 );
478
479 check_task.take().unwrap().stop().await?;
480 info!("Stopped flow consistent check task");
481 Ok(())
482 }
483
484 pub async fn reconcile_flows_from_metadata(&self) -> Result<(), Error> {
486 self.check_flow_consistent(true, true).await
487 }
488
489 async fn flow_exist_in_metadata(&self, flow_id: FlowId) -> Result<bool, Error> {
491 self.flow_metadata_manager
492 .flow_info_manager()
493 .get(flow_id as u32)
494 .await
495 .map_err(BoxedError::new)
496 .context(ExternalSnafu)
497 .map(|info| info.is_some())
498 }
499}
500
501struct ConsistentCheckTask {
502 handle: JoinHandle<()>,
503 shutdown_tx: tokio::sync::mpsc::Sender<()>,
504 trigger_tx: tokio::sync::mpsc::Sender<(bool, bool, tokio::sync::oneshot::Sender<()>)>,
505}
506
507impl ConsistentCheckTask {
508 async fn start_check_task(engine: &Arc<FlowDualEngine>) -> Result<Self, Error> {
509 let engine = engine.clone();
510 let min_refresh_duration = engine
511 .batching_engine()
512 .batch_opts
513 .experimental_min_refresh_duration;
514 let frontend_scan_timeout = engine
515 .batching_engine()
516 .batch_opts
517 .experimental_frontend_scan_timeout;
518 engine
519 .wait_for_available_frontend(frontend_scan_timeout)
520 .await?;
521 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
522 let (trigger_tx, mut trigger_rx) =
523 tokio::sync::mpsc::channel::<(bool, bool, tokio::sync::oneshot::Sender<()>)>(10);
524 let handle = common_runtime::spawn_global(async move {
525 let mut recover_retry = 0;
527 while let Err(err) = engine.check_flow_consistent(true, false).await {
528 recover_retry += 1;
529 error!(
530 "Failed to recover flows:\n {err:?}, retry {} in {}s",
531 recover_retry,
532 min_refresh_duration.as_secs()
533 );
534 tokio::time::sleep(min_refresh_duration).await;
535 }
536
537 engine.set_done_recovering();
538
539 let (mut allow_create, mut allow_drop) = (false, false);
541 let mut ret_signal: Option<tokio::sync::oneshot::Sender<()>> = None;
542 loop {
543 if let Err(err) = engine.check_flow_consistent(allow_create, allow_drop).await {
544 error!(err; "Failed to check flow consistent");
545 }
546 if let Some(done) = ret_signal.take() {
547 let _ = done.send(());
548 }
549 tokio::select! {
550 _ = rx.recv() => break,
551 incoming = trigger_rx.recv() => if let Some(incoming) = incoming {
552 (allow_create, allow_drop) = (incoming.0, incoming.1);
553 ret_signal = Some(incoming.2);
554 },
555 _ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
556 (allow_create, allow_drop) = (false, false);
557 },
558 }
559 }
560 });
561 Ok(ConsistentCheckTask {
562 handle,
563 shutdown_tx: tx,
564 trigger_tx,
565 })
566 }
567
568 async fn trigger(&self, allow_create: bool, allow_drop: bool) -> Result<(), Error> {
569 let (tx, rx) = tokio::sync::oneshot::channel();
570 self.trigger_tx
571 .send((allow_create, allow_drop, tx))
572 .await
573 .map_err(|_| {
574 IllegalCheckTaskStateSnafu {
575 reason: "Failed to send trigger signal",
576 }
577 .build()
578 })?;
579 rx.await.map_err(|_| {
580 IllegalCheckTaskStateSnafu {
581 reason: "Failed to receive trigger signal",
582 }
583 .build()
584 })?;
585 Ok(())
586 }
587
588 async fn stop(self) -> Result<(), Error> {
589 self.shutdown_tx.send(()).await.map_err(|_| {
590 IllegalCheckTaskStateSnafu {
591 reason: "Failed to send shutdown signal",
592 }
593 .build()
594 })?;
595 self.handle.abort();
597 Ok(())
598 }
599}
600
601#[derive(Default)]
602struct SrcTableToFlow {
603 stream: HashMap<TableId, HashSet<FlowId>>,
605 batch: HashMap<TableId, HashSet<FlowId>>,
607 flow_infos: HashMap<FlowId, (FlowType, Vec<TableId>)>,
609}
610
611impl SrcTableToFlow {
612 fn in_stream(&self, table_id: TableId) -> bool {
613 self.stream.contains_key(&table_id)
614 }
615 fn in_batch(&self, table_id: TableId) -> bool {
616 self.batch.contains_key(&table_id)
617 }
618 fn add_flow(&mut self, flow_id: FlowId, flow_type: FlowType, src_table_ids: Vec<TableId>) {
619 let mapping = match flow_type {
620 FlowType::Streaming => &mut self.stream,
621 FlowType::Batching => &mut self.batch,
622 };
623
624 for src_table in src_table_ids.clone() {
625 mapping
626 .entry(src_table)
627 .and_modify(|flows| {
628 flows.insert(flow_id);
629 })
630 .or_insert_with(|| {
631 let mut set = HashSet::new();
632 set.insert(flow_id);
633 set
634 });
635 }
636 self.flow_infos.insert(flow_id, (flow_type, src_table_ids));
637 }
638
639 fn remove_flow(&mut self, flow_id: FlowId) {
640 let mapping = match self.get_flow_type(flow_id) {
641 Some(FlowType::Streaming) => &mut self.stream,
642 Some(FlowType::Batching) => &mut self.batch,
643 None => return,
644 };
645 if let Some((_, src_table_ids)) = self.flow_infos.remove(&flow_id) {
646 for src_table in src_table_ids {
647 if let Some(flows) = mapping.get_mut(&src_table) {
648 flows.remove(&flow_id);
649 }
650 }
651 }
652 }
653
654 fn get_flow_type(&self, flow_id: FlowId) -> Option<FlowType> {
655 self.flow_infos
656 .get(&flow_id)
657 .map(|(flow_type, _)| flow_type)
658 .cloned()
659 }
660}
661
662impl FlowEngine for FlowDualEngine {
663 async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
664 let flow_type = args
665 .flow_options
666 .get(FlowType::FLOW_TYPE_KEY)
667 .map(|s| s.as_str());
668
669 let flow_type = match flow_type {
670 Some(FlowType::BATCHING) => FlowType::Batching,
671 Some(FlowType::STREAMING) => FlowType::Streaming,
672 None => FlowType::Batching,
673 Some(flow_type) => {
674 return InternalSnafu {
675 reason: format!("Invalid flow type: {}", flow_type),
676 }
677 .fail();
678 }
679 };
680
681 let flow_id = args.flow_id;
682 let src_table_ids = args.source_table_ids.clone();
683
684 let res = match flow_type {
685 FlowType::Batching => self.batching_engine.create_flow(args).await,
686 FlowType::Streaming => self.streaming_engine.create_flow(args).await,
687 }?;
688
689 self.src_table2flow
690 .write()
691 .await
692 .add_flow(flow_id, flow_type, src_table_ids);
693
694 Ok(res)
695 }
696
697 async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
698 let flow_type = self.src_table2flow.read().await.get_flow_type(flow_id);
699
700 match flow_type {
701 Some(FlowType::Batching) => self.batching_engine.remove_flow(flow_id).await,
702 Some(FlowType::Streaming) => self.streaming_engine.remove_flow(flow_id).await,
703 None => {
704 warn!(
708 "Flow {} is not exist in the underlying engine, but exist in metadata",
709 flow_id
710 );
711 self.try_sync_with_check_task(flow_id, true).await?;
712
713 Ok(())
714 }
715 }?;
716 self.src_table2flow.write().await.remove_flow(flow_id);
718 Ok(())
719 }
720
721 async fn flush_flow(&self, flow_id: FlowId) -> Result<usize, Error> {
722 self.try_sync_with_check_task(flow_id, false).await?;
724 let flow_type = self.src_table2flow.read().await.get_flow_type(flow_id);
725 match flow_type {
726 Some(FlowType::Batching) => self.batching_engine.flush_flow(flow_id).await,
727 Some(FlowType::Streaming) => self.streaming_engine.flush_flow(flow_id).await,
728 None => {
729 warn!(
730 "Currently flow={flow_id} doesn't exist in flownode, ignore flush_flow request"
731 );
732 Ok(0)
733 }
734 }
735 }
736
737 async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error> {
738 let flow_type = self.src_table2flow.read().await.get_flow_type(flow_id);
739 match flow_type {
741 Some(FlowType::Batching) => self.batching_engine.flow_exist(flow_id).await,
742 Some(FlowType::Streaming) => self.streaming_engine.flow_exist(flow_id).await,
743 None => Ok(false),
744 }
745 }
746
747 async fn list_flows(&self) -> Result<impl IntoIterator<Item = FlowId>, Error> {
748 let stream_flows = self.streaming_engine.list_flows().await?;
749 let batch_flows = self.batching_engine.list_flows().await?;
750
751 Ok(stream_flows.into_iter().chain(batch_flows))
752 }
753
754 async fn handle_flow_inserts(
755 &self,
756 request: api::v1::region::InsertRequests,
757 ) -> Result<(), Error> {
758 self.wait_for_all_flow_recover(request.requests.len())
759 .await?;
760 let mut to_stream_engine = Vec::with_capacity(request.requests.len());
762 let mut to_batch_engine = request.requests;
763
764 let mut batching_row_cnt = 0;
765 let mut streaming_row_cnt = 0;
766
767 {
768 let src_table2flow = self.src_table2flow.read().await;
770 to_batch_engine.retain(|req| {
771 let region_id = RegionId::from(req.region_id);
772 let table_id = region_id.table_id();
773 let is_in_stream = src_table2flow.in_stream(table_id);
774 let is_in_batch = src_table2flow.in_batch(table_id);
775 if is_in_stream {
776 streaming_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
777 to_stream_engine.push(req.clone());
778 }
779 if is_in_batch {
780 batching_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
781 return true;
782 }
783 if !is_in_batch && !is_in_stream {
784 warn!("Table {} is not any flow's source table", table_id)
786 }
787 false
788 });
789 }
792
793 METRIC_FLOW_ROWS
794 .with_label_values(&["in-streaming"])
795 .inc_by(streaming_row_cnt as u64);
796
797 METRIC_FLOW_ROWS
798 .with_label_values(&["in-batching"])
799 .inc_by(batching_row_cnt as u64);
800
801 let streaming_engine = self.streaming_engine.clone();
802 let stream_handler: JoinHandle<Result<(), Error>> =
803 common_runtime::spawn_global(async move {
804 streaming_engine
805 .handle_flow_inserts(api::v1::region::InsertRequests {
806 requests: to_stream_engine,
807 })
808 .await?;
809 Ok(())
810 });
811 self.batching_engine
812 .handle_flow_inserts(api::v1::region::InsertRequests {
813 requests: to_batch_engine,
814 })
815 .await?;
816 stream_handler.await.context(JoinTaskSnafu)??;
817
818 Ok(())
819 }
820
821 async fn handle_mark_window_dirty(
822 &self,
823 req: api::v1::flow::DirtyWindowRequests,
824 ) -> Result<(), Error> {
825 self.batching_engine.handle_mark_window_dirty(req).await
826 }
827}
828
829#[async_trait::async_trait]
830impl common_meta::node_manager::Flownode for FlowDualEngine {
831 async fn handle(&self, request: FlowRequest) -> MetaResult<FlowResponse> {
832 let query_ctx = request
833 .header
834 .and_then(|h| h.query_context)
835 .map(|ctx| ctx.into());
836 match request.body {
837 Some(flow_request::Body::Create(CreateRequest {
838 flow_id: Some(task_id),
839 source_table_ids,
840 sink_table_name: Some(sink_table_name),
841 create_if_not_exists,
842 expire_after,
843 eval_interval,
844 comment,
845 sql,
846 mut flow_options,
847 or_replace,
848 })) => {
849 let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec();
850 let sink_table_name = [
851 sink_table_name.catalog_name,
852 sink_table_name.schema_name,
853 sink_table_name.table_name,
854 ];
855 let expire_after = expire_after.map(|e| e.value);
856
857 let eval_schedule = decode_internal_eval_schedule(&mut flow_options)
858 .map_err(to_meta_err(snafu::location!()))?;
859
860 let args = CreateFlowArgs {
861 flow_id: task_id.id as u64,
862 sink_table_name,
863 source_table_ids,
864 create_if_not_exists,
865 or_replace,
866 expire_after,
867 eval_interval: eval_interval.map(|e| e.seconds),
868 comment: Some(comment),
869 sql: sql.clone(),
870 flow_options,
871 query_ctx,
872 eval_schedule,
873 };
874 let ret = self
875 .create_flow(args)
876 .await
877 .map_err(BoxedError::new)
878 .with_context(|_| CreateFlowSnafu { sql: sql.clone() })
879 .map_err(to_meta_err(snafu::location!()))?;
880 METRIC_FLOW_TASK_COUNT.inc();
881 Ok(FlowResponse {
882 affected_flows: ret
883 .map(|id| greptime_proto::v1::FlowId { id: id as u32 })
884 .into_iter()
885 .collect_vec(),
886 ..Default::default()
887 })
888 }
889 Some(flow_request::Body::Drop(DropRequest {
890 flow_id: Some(flow_id),
891 })) => {
892 self.remove_flow(flow_id.id as u64)
893 .await
894 .map_err(to_meta_err(snafu::location!()))?;
895 METRIC_FLOW_TASK_COUNT.dec();
896 Ok(Default::default())
897 }
898 Some(flow_request::Body::Flush(FlushFlow {
899 flow_id: Some(flow_id),
900 })) => {
901 let row = self
902 .flush_flow(flow_id.id as u64)
903 .await
904 .map_err(to_meta_err(snafu::location!()))?;
905 Ok(FlowResponse {
906 affected_flows: vec![flow_id],
907 affected_rows: row as u64,
908 ..Default::default()
909 })
910 }
911 other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
912 }
913 }
914
915 async fn handle_inserts(&self, request: InsertRequests) -> MetaResult<FlowResponse> {
916 FlowEngine::handle_flow_inserts(self, request)
917 .await
918 .map(|_| Default::default())
919 .map_err(to_meta_err(snafu::location!()))
920 }
921
922 async fn handle_mark_window_dirty(&self, req: DirtyWindowRequests) -> MetaResult<FlowResponse> {
923 self.batching_engine()
924 .handle_mark_dirty_time_window(req)
925 .await
926 .map(|_| FlowResponse::default())
927 .map_err(to_meta_err(snafu::location!()))
928 }
929}
930
931fn decode_internal_eval_schedule(
934 flow_options: &mut HashMap<String, String>,
935) -> Result<Option<FlowScheduleConfig>, Error> {
936 match flow_options.remove(INTERNAL_EVAL_SCHEDULE_KEY) {
937 Some(json) => serde_json::from_str::<FlowScheduleConfig>(&json)
938 .map(Some)
939 .map_err(|err| {
940 InternalSnafu {
941 reason: format!("Invalid internal eval schedule payload: {err}"),
942 }
943 .build()
944 }),
945 None => Ok(None),
946 }
947}
948
949fn to_meta_err(
951 location: snafu::Location,
952) -> impl FnOnce(crate::error::Error) -> common_meta::error::Error {
953 move |err: crate::error::Error| -> common_meta::error::Error {
954 match err {
955 crate::error::Error::FlowNotFound { id, .. } => {
956 common_meta::error::Error::FlowNotFound {
957 flow_name: format!("flow_id={id}"),
958 location,
959 }
960 }
961 _ => common_meta::error::Error::External {
962 location,
963 source: BoxedError::new(err),
964 },
965 }
966 }
967}
968
969impl FlowEngine for StreamingEngine {
970 async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
971 self.create_flow_inner(args).await
972 }
973
974 async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
975 self.remove_flow_inner(flow_id).await
976 }
977
978 async fn flush_flow(&self, flow_id: FlowId) -> Result<usize, Error> {
979 self.flush_flow_inner(flow_id).await
980 }
981
982 async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error> {
983 self.flow_exist_inner(flow_id).await
984 }
985
986 async fn list_flows(&self) -> Result<impl IntoIterator<Item = FlowId>, Error> {
987 Ok(self
988 .flow_err_collectors
989 .read()
990 .await
991 .keys()
992 .cloned()
993 .collect::<Vec<_>>())
994 }
995
996 async fn handle_flow_inserts(
997 &self,
998 request: api::v1::region::InsertRequests,
999 ) -> Result<(), Error> {
1000 self.handle_inserts_inner(request).await
1001 }
1002
1003 async fn handle_mark_window_dirty(
1004 &self,
1005 _req: api::v1::flow::DirtyWindowRequests,
1006 ) -> Result<(), Error> {
1007 UnsupportedSnafu {
1008 reason: "handle_mark_window_dirty in streaming engine",
1009 }
1010 .fail()
1011 }
1012}
1013
1014#[derive(Debug, Clone)]
1016enum FetchFromRow {
1017 Idx(usize),
1018 Default(Value),
1019}
1020
1021impl FetchFromRow {
1022 fn fetch(&self, row: &repr::Row) -> Value {
1024 match self {
1025 FetchFromRow::Idx(idx) => row.get(*idx).unwrap().clone(),
1026 FetchFromRow::Default(v) => v.clone(),
1027 }
1028 }
1029}
1030
1031impl StreamingEngine {
1032 async fn handle_inserts_inner(
1033 &self,
1034 request: InsertRequests,
1035 ) -> std::result::Result<(), Error> {
1036 let _flush_lock = self.flush_lock.try_read();
1040 for write_request in request.requests {
1041 let region_id = write_request.region_id;
1042 let table_id = RegionId::from(region_id).table_id();
1043
1044 let (insert_schema, rows_proto) = write_request
1045 .rows
1046 .map(|r| (r.schema, r.rows))
1047 .unwrap_or_default();
1048
1049 let now = self.tick_manager.tick();
1051
1052 let (table_types, fetch_order) = {
1053 let ctx = self.node_context.read().await;
1054
1055 let table_schema = ctx.table_source.table_from_id(&table_id).await?;
1057 let default_vals = table_schema
1058 .default_values
1059 .iter()
1060 .zip(table_schema.relation_desc.typ().column_types.iter())
1061 .map(|(v, ty)| {
1062 v.as_ref().and_then(|v| {
1063 match v.create_default(ty.scalar_type(), ty.nullable()) {
1064 Ok(v) => Some(v),
1065 Err(err) => {
1066 common_telemetry::error!(err; "Failed to create default value");
1067 None
1068 }
1069 }
1070 })
1071 })
1072 .collect_vec();
1073
1074 let table_types = table_schema
1075 .relation_desc
1076 .typ()
1077 .column_types
1078 .clone()
1079 .into_iter()
1080 .map(|t| t.scalar_type)
1081 .collect_vec();
1082 let table_col_names = table_schema.relation_desc.names;
1083 let table_col_names = table_col_names
1084 .iter().enumerate()
1085 .map(|(idx,name)| match name {
1086 Some(name) => Ok(name.clone()),
1087 None => InternalSnafu {
1088 reason: format!("Expect column {idx} of table id={table_id} to have name in table schema, found None"),
1089 }
1090 .fail(),
1091 })
1092 .collect::<Result<Vec<_>, _>>()?;
1093 let name_to_col = HashMap::<_, _>::from_iter(
1094 insert_schema
1095 .iter()
1096 .enumerate()
1097 .map(|(i, name)| (&name.column_name, i)),
1098 );
1099
1100 let fetch_order: Vec<FetchFromRow> = table_col_names
1101 .iter()
1102 .zip(default_vals)
1103 .map(|(col_name, col_default_val)| {
1104 name_to_col
1105 .get(col_name)
1106 .copied()
1107 .map(FetchFromRow::Idx)
1108 .or_else(|| col_default_val.clone().map(FetchFromRow::Default))
1109 .with_context(|| UnexpectedSnafu {
1110 reason: format!(
1111 "Column not found: {}, default_value: {:?}",
1112 col_name, col_default_val
1113 ),
1114 })
1115 })
1116 .try_collect()?;
1117
1118 trace!("Reordering columns: {:?}", fetch_order);
1119 (table_types, fetch_order)
1120 };
1121
1122 let rows: Vec<DiffRow> = rows_proto
1124 .into_iter()
1125 .map(|r| {
1126 let r = repr::Row::from(r);
1127 let reordered = fetch_order.iter().map(|i| i.fetch(&r)).collect_vec();
1128 repr::Row::new(reordered)
1129 })
1130 .map(|r| (r, now, 1))
1131 .collect_vec();
1132 if let Err(err) = self
1133 .handle_write_request(region_id.into(), rows, &table_types)
1134 .await
1135 {
1136 let err = BoxedError::new(err);
1137 let flow_ids = self
1138 .node_context
1139 .read()
1140 .await
1141 .get_flow_ids(table_id)
1142 .into_iter()
1143 .flatten()
1144 .cloned()
1145 .collect_vec();
1146 let err = InsertIntoFlowSnafu {
1147 region_id,
1148 flow_ids,
1149 }
1150 .into_error(err);
1151 common_telemetry::error!(err; "Failed to handle write request");
1152 return Err(err);
1153 }
1154 }
1155 Ok(())
1156 }
1157}
1158
1159#[cfg(test)]
1160mod tests {
1161 use std::collections::HashMap;
1162
1163 use common_meta::ddl::create_flow::INTERNAL_EVAL_SCHEDULE_KEY;
1164
1165 use super::decode_internal_eval_schedule;
1166 use crate::error::Error;
1167
1168 #[test]
1169 fn test_malformed_internal_eval_schedule_json_is_error() {
1170 let mut flow_options = HashMap::new();
1171 flow_options.insert(
1172 INTERNAL_EVAL_SCHEDULE_KEY.to_string(),
1173 "not-json".to_string(),
1174 );
1175
1176 let err = decode_internal_eval_schedule(&mut flow_options).unwrap_err();
1177 assert!(matches!(
1178 err,
1179 Error::Internal { reason, .. }
1180 if reason.contains("Invalid internal eval schedule payload")
1181 ));
1182 assert!(!flow_options.contains_key(INTERNAL_EVAL_SCHEDULE_KEY));
1183 }
1184}