1use std::collections::{HashMap, HashSet};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19
20use api::v1::flow::{
21 CreateRequest, DirtyWindowRequests, DropRequest, FlowRequest, FlowResponse, FlushFlow,
22 flow_request,
23};
24use api::v1::region::InsertRequests;
25use catalog::CatalogManager;
26use common_base::Plugins;
27use common_error::ext::BoxedError;
28use common_meta::ddl::create_flow::FlowType;
29use common_meta::error::Result as MetaResult;
30use common_meta::key::flow::FlowMetadataManager;
31use common_meta::key::flow::flow_state::FlowStat;
32use common_runtime::JoinHandle;
33use common_telemetry::{error, info, trace, warn};
34use datatypes::value::Value;
35use futures::TryStreamExt;
36use itertools::Itertools;
37use operator::utils::try_to_session_query_context;
38use session::context::QueryContextBuilder;
39use snafu::{IntoError, OptionExt, ResultExt, ensure};
40use store_api::storage::{RegionId, TableId};
41use tokio::sync::{Mutex, RwLock};
42
43use crate::adapter::{CreateFlowArgs, StreamingEngine};
44use crate::batching_mode::engine::BatchingEngine;
45use crate::engine::{FlowEngine, FlowStatProvider};
46use crate::error::{
47 CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, FlowNotRecoveredSnafu,
48 IllegalCheckTaskStateSnafu, InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu,
49 NoAvailableFrontendSnafu, SyncCheckTaskSnafu, UnexpectedSnafu, UnsupportedSnafu,
50};
51use crate::metrics::{METRIC_FLOW_ROWS, METRIC_FLOW_TASK_COUNT};
52use crate::repr::{self, DiffRow};
53use crate::utils::StateReportHandler;
54use crate::{Error, FlowId};
55
56pub type FlowDualEngineRef = Arc<FlowDualEngine>;
58
59pub struct FlowDualEngine {
64 streaming_engine: Arc<StreamingEngine>,
65 batching_engine: Arc<BatchingEngine>,
66 state_report_handler: RwLock<Option<StateReportHandler>>,
68 src_table2flow: RwLock<SrcTableToFlow>,
70 flow_metadata_manager: Arc<FlowMetadataManager>,
71 catalog_manager: Arc<dyn CatalogManager>,
72 check_task: tokio::sync::Mutex<Option<ConsistentCheckTask>>,
73 plugins: Plugins,
74 done_recovering: AtomicBool,
75}
76
77impl FlowDualEngine {
78 pub fn new(
79 streaming_engine: Arc<StreamingEngine>,
80 batching_engine: Arc<BatchingEngine>,
81 flow_metadata_manager: Arc<FlowMetadataManager>,
82 catalog_manager: Arc<dyn CatalogManager>,
83 plugins: Plugins,
84 ) -> Self {
85 Self {
86 streaming_engine,
87 batching_engine,
88 state_report_handler: Default::default(),
89 src_table2flow: RwLock::new(SrcTableToFlow::default()),
90 flow_metadata_manager,
91 catalog_manager,
92 check_task: Mutex::new(None),
93 plugins,
94 done_recovering: AtomicBool::new(false),
95 }
96 }
97
98 pub fn set_done_recovering(&self) {
101 info!("FlowDualEngine done recovering");
102 self.done_recovering
103 .store(true, std::sync::atomic::Ordering::Release);
104 }
105
106 pub fn is_recover_done(&self) -> bool {
108 self.done_recovering
109 .load(std::sync::atomic::Ordering::Acquire)
110 }
111
112 async fn wait_for_all_flow_recover(&self, waiting_req_cnt: usize) -> Result<(), Error> {
114 if self.is_recover_done() {
115 return Ok(());
116 }
117
118 warn!(
119 "FlowDualEngine is not done recovering, {} insert request waiting for recovery",
120 waiting_req_cnt
121 );
122 let mut retry = 0;
125 let max_retry = 3;
126 while retry < max_retry && !self.is_recover_done() {
127 warn!(
128 "FlowDualEngine is not done recovering, retry {} in 1s",
129 retry
130 );
131 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
132 retry += 1;
133 }
134 if retry == max_retry {
135 return FlowNotRecoveredSnafu.fail();
136 } else {
137 info!("FlowDualEngine is done recovering");
138 }
139 Ok(())
141 }
142
143 pub fn plugins(&self) -> &Plugins {
144 &self.plugins
145 }
146
147 pub fn is_distributed(&self) -> bool {
149 self.streaming_engine.node_id.is_some()
150 }
151
152 pub fn streaming_engine(&self) -> Arc<StreamingEngine> {
153 self.streaming_engine.clone()
154 }
155
156 pub fn batching_engine(&self) -> Arc<BatchingEngine> {
157 self.batching_engine.clone()
158 }
159
160 pub async fn set_state_report_handler(&self, handler: StateReportHandler) {
161 *self.state_report_handler.write().await = Some(handler);
162 }
163
164 pub async fn gen_state_report(&self) -> FlowStat {
165 let streaming = self.streaming_engine.flow_stat().await;
166 let batching = self.batching_engine.flow_stat().await;
167
168 let mut state_size = streaming.state_size;
169 state_size.extend(batching.state_size);
170
171 let mut last_exec_time_map = streaming.last_exec_time_map;
172 last_exec_time_map.extend(batching.last_exec_time_map);
173
174 FlowStat {
175 state_size,
176 last_exec_time_map,
177 }
178 }
179
180 pub async fn start_state_report_task(self: Arc<Self>) -> Option<JoinHandle<()>> {
184 let state_report_handler = self.state_report_handler.write().await.take();
185 if let Some(mut handler) = state_report_handler {
186 let zelf = self.clone();
187 let handler = common_runtime::spawn_global(async move {
188 while let Some(ret_handler) = handler.recv().await {
189 let state_report = zelf.gen_state_report().await;
190 ret_handler.send(state_report).unwrap_or_else(|err| {
191 common_telemetry::error!(err; "Send state report error");
192 });
193 }
194 });
195 Some(handler)
196 } else {
197 None
198 }
199 }
200
201 async fn wait_for_available_frontend(&self, timeout: std::time::Duration) -> Result<(), Error> {
205 if !self.is_distributed() {
206 return Ok(());
207 }
208 let frontend_client = self.batching_engine().frontend_client.clone();
209 let sleep_duration = std::time::Duration::from_millis(1_000);
210 let now = std::time::Instant::now();
211 loop {
212 let frontend_list = frontend_client.scan_for_frontend().await?;
213 if !frontend_list.is_empty() {
214 let fe_list = frontend_list
215 .iter()
216 .map(|(_, info)| &info.peer.addr)
217 .collect::<Vec<_>>();
218 info!("Available frontend found: {:?}", fe_list);
219 return Ok(());
220 }
221 let elapsed = now.elapsed();
222 tokio::time::sleep(sleep_duration).await;
223 info!("Waiting for available frontend, elapsed={:?}", elapsed);
224 if elapsed >= timeout {
225 return NoAvailableFrontendSnafu {
226 timeout,
227 context: "No available frontend found in cluster info",
228 }
229 .fail();
230 }
231 }
232 }
233
234 async fn try_sync_with_check_task(
238 &self,
239 flow_id: FlowId,
240 allow_drop: bool,
241 ) -> Result<(), Error> {
242 info!("Try to sync with check task for flow {}", flow_id);
244 let mut retry = 0;
245 let max_retry = 10;
246 while retry < max_retry {
248 if let Some(task) = self.check_task.lock().await.as_ref() {
249 task.trigger(false, allow_drop).await?;
250 break;
251 }
252 retry += 1;
253 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
254 }
255
256 if retry == max_retry {
257 error!(
258 "Can't sync with check task for flow {} with allow_drop={}",
259 flow_id, allow_drop
260 );
261 return SyncCheckTaskSnafu {
262 flow_id,
263 allow_drop,
264 }
265 .fail();
266 }
267 info!("Successfully sync with check task for flow {}", flow_id);
268
269 Ok(())
270 }
271
272 async fn check_flow_consistent(
275 &self,
276 allow_create: bool,
277 allow_drop: bool,
278 ) -> Result<(), Error> {
279 let nodeid = self.streaming_engine.node_id;
281 let should_exists: Vec<_> = if let Some(nodeid) = nodeid {
282 let to_be_recover = self
285 .flow_metadata_manager
286 .flownode_flow_manager()
287 .flows(nodeid.into())
288 .try_collect::<Vec<_>>()
289 .await
290 .context(ListFlowsSnafu {
291 id: Some(nodeid.into()),
292 })?;
293 to_be_recover.into_iter().map(|(id, _)| id).collect()
294 } else {
295 let all_catalogs = self
298 .catalog_manager
299 .catalog_names()
300 .await
301 .map_err(BoxedError::new)
302 .context(ExternalSnafu)?;
303 let mut all_flow_ids = vec![];
304 for catalog in all_catalogs {
305 let flows = self
306 .flow_metadata_manager
307 .flow_name_manager()
308 .flow_names(&catalog)
309 .await
310 .try_collect::<Vec<_>>()
311 .await
312 .map_err(BoxedError::new)
313 .context(ExternalSnafu)?;
314
315 all_flow_ids.extend(flows.into_iter().map(|(_, id)| id.flow_id()));
316 }
317 all_flow_ids
318 };
319 let should_exists = should_exists
320 .into_iter()
321 .map(|i| i as FlowId)
322 .collect::<HashSet<_>>();
323 let actual_exists = self.list_flows().await?.into_iter().collect::<HashSet<_>>();
324 let to_be_created = should_exists
325 .iter()
326 .filter(|id| !actual_exists.contains(id))
327 .collect::<Vec<_>>();
328 let to_be_dropped = actual_exists
329 .iter()
330 .filter(|id| !should_exists.contains(id))
331 .collect::<Vec<_>>();
332
333 if !to_be_created.is_empty() {
334 if allow_create {
335 info!(
336 "Recovering {} flows: {:?}",
337 to_be_created.len(),
338 to_be_created
339 );
340 let mut errors = vec![];
341 for flow_id in to_be_created.clone() {
342 let flow_id = *flow_id;
343 let info = self
344 .flow_metadata_manager
345 .flow_info_manager()
346 .get(flow_id as u32)
347 .await
348 .map_err(BoxedError::new)
349 .context(ExternalSnafu)?
350 .context(FlowNotFoundSnafu { id: flow_id })?;
351
352 let sink_table_name = [
353 info.sink_table_name().catalog_name.clone(),
354 info.sink_table_name().schema_name.clone(),
355 info.sink_table_name().table_name.clone(),
356 ];
357 let args = CreateFlowArgs {
358 flow_id,
359 sink_table_name,
360 source_table_ids: info.source_table_ids().to_vec(),
361 create_if_not_exists: true,
365 or_replace: true,
366 expire_after: info.expire_after(),
367 eval_interval: info.eval_interval(),
368 comment: Some(info.comment().clone()),
369 sql: info.raw_sql().clone(),
370 flow_options: info.options().clone(),
371 query_ctx: info
372 .query_context()
373 .clone()
374 .map(|ctx| {
375 try_to_session_query_context(ctx)
376 .map_err(BoxedError::new)
377 .context(ExternalSnafu)
378 })
379 .transpose()?
380 .or_else(|| {
383 Some(
384 QueryContextBuilder::default()
385 .current_catalog(info.catalog_name().clone())
386 .build(),
387 )
388 }),
389 };
390 if let Err(err) = self
391 .create_flow(args)
392 .await
393 .map_err(BoxedError::new)
394 .with_context(|_| CreateFlowSnafu {
395 sql: info.raw_sql().clone(),
396 })
397 {
398 errors.push((flow_id, err));
399 }
400 }
401 if errors.is_empty() {
402 info!("Recover flows successfully, flows: {:?}", to_be_created);
403 }
404
405 for (flow_id, err) in errors {
406 warn!("Failed to recreate flow {}, err={:#?}", flow_id, err);
407 }
408 } else {
409 warn!(
410 "Flows do not exist in flownode for node {:?}, flow_ids={:?}",
411 nodeid, to_be_created
412 );
413 }
414 }
415 if !to_be_dropped.is_empty() {
416 if allow_drop {
417 info!("Dropping flows: {:?}", to_be_dropped);
418 let mut errors = vec![];
419 for flow_id in to_be_dropped {
420 let flow_id = *flow_id;
421 if let Err(err) = self.remove_flow(flow_id).await {
422 errors.push((flow_id, err));
423 }
424 }
425 for (flow_id, err) in errors {
426 warn!("Failed to drop flow {}, err={:#?}", flow_id, err);
427 }
428 } else {
429 warn!(
430 "Flows do not exist in metadata for node {:?}, flow_ids={:?}",
431 nodeid, to_be_dropped
432 );
433 }
434 }
435 Ok(())
436 }
437
438 pub async fn start_flow_consistent_check_task(self: &Arc<Self>) -> Result<(), Error> {
440 let mut check_task = self.check_task.lock().await;
441 ensure!(
442 check_task.is_none(),
443 IllegalCheckTaskStateSnafu {
444 reason: "Flow consistent check task already exists",
445 }
446 );
447 let task = ConsistentCheckTask::start_check_task(self).await?;
448 *check_task = Some(task);
449 Ok(())
450 }
451
452 pub async fn stop_flow_consistent_check_task(&self) -> Result<(), Error> {
453 info!("Stopping flow consistent check task");
454 let mut check_task = self.check_task.lock().await;
455
456 ensure!(
457 check_task.is_some(),
458 IllegalCheckTaskStateSnafu {
459 reason: "Flow consistent check task does not exist",
460 }
461 );
462
463 check_task.take().unwrap().stop().await?;
464 info!("Stopped flow consistent check task");
465 Ok(())
466 }
467
468 async fn flow_exist_in_metadata(&self, flow_id: FlowId) -> Result<bool, Error> {
470 self.flow_metadata_manager
471 .flow_info_manager()
472 .get(flow_id as u32)
473 .await
474 .map_err(BoxedError::new)
475 .context(ExternalSnafu)
476 .map(|info| info.is_some())
477 }
478}
479
480struct ConsistentCheckTask {
481 handle: JoinHandle<()>,
482 shutdown_tx: tokio::sync::mpsc::Sender<()>,
483 trigger_tx: tokio::sync::mpsc::Sender<(bool, bool, tokio::sync::oneshot::Sender<()>)>,
484}
485
486impl ConsistentCheckTask {
487 async fn start_check_task(engine: &Arc<FlowDualEngine>) -> Result<Self, Error> {
488 let engine = engine.clone();
489 let min_refresh_duration = engine
490 .batching_engine()
491 .batch_opts
492 .experimental_min_refresh_duration;
493 let frontend_scan_timeout = engine
494 .batching_engine()
495 .batch_opts
496 .experimental_frontend_scan_timeout;
497 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
498 let (trigger_tx, mut trigger_rx) =
499 tokio::sync::mpsc::channel::<(bool, bool, tokio::sync::oneshot::Sender<()>)>(10);
500 let handle = common_runtime::spawn_global(async move {
501 if let Err(err) = engine
503 .wait_for_available_frontend(frontend_scan_timeout)
504 .await
505 {
506 warn!("No frontend is available yet:\n {err:?}");
507 }
508
509 let mut recover_retry = 0;
511 while let Err(err) = engine.check_flow_consistent(true, false).await {
512 recover_retry += 1;
513 error!(
514 "Failed to recover flows:\n {err:?}, retry {} in {}s",
515 recover_retry,
516 min_refresh_duration.as_secs()
517 );
518 tokio::time::sleep(min_refresh_duration).await;
519 }
520
521 engine.set_done_recovering();
522
523 let (mut allow_create, mut allow_drop) = (false, false);
525 let mut ret_signal: Option<tokio::sync::oneshot::Sender<()>> = None;
526 loop {
527 if let Err(err) = engine.check_flow_consistent(allow_create, allow_drop).await {
528 error!(err; "Failed to check flow consistent");
529 }
530 if let Some(done) = ret_signal.take() {
531 let _ = done.send(());
532 }
533 tokio::select! {
534 _ = rx.recv() => break,
535 incoming = trigger_rx.recv() => if let Some(incoming) = incoming {
536 (allow_create, allow_drop) = (incoming.0, incoming.1);
537 ret_signal = Some(incoming.2);
538 },
539 _ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
540 (allow_create, allow_drop) = (false, false);
541 },
542 }
543 }
544 });
545 Ok(ConsistentCheckTask {
546 handle,
547 shutdown_tx: tx,
548 trigger_tx,
549 })
550 }
551
552 async fn trigger(&self, allow_create: bool, allow_drop: bool) -> Result<(), Error> {
553 let (tx, rx) = tokio::sync::oneshot::channel();
554 self.trigger_tx
555 .send((allow_create, allow_drop, tx))
556 .await
557 .map_err(|_| {
558 IllegalCheckTaskStateSnafu {
559 reason: "Failed to send trigger signal",
560 }
561 .build()
562 })?;
563 rx.await.map_err(|_| {
564 IllegalCheckTaskStateSnafu {
565 reason: "Failed to receive trigger signal",
566 }
567 .build()
568 })?;
569 Ok(())
570 }
571
572 async fn stop(self) -> Result<(), Error> {
573 self.shutdown_tx.send(()).await.map_err(|_| {
574 IllegalCheckTaskStateSnafu {
575 reason: "Failed to send shutdown signal",
576 }
577 .build()
578 })?;
579 self.handle.abort();
581 Ok(())
582 }
583}
584
585#[derive(Default)]
586struct SrcTableToFlow {
587 stream: HashMap<TableId, HashSet<FlowId>>,
589 batch: HashMap<TableId, HashSet<FlowId>>,
591 flow_infos: HashMap<FlowId, (FlowType, Vec<TableId>)>,
593}
594
595impl SrcTableToFlow {
596 fn in_stream(&self, table_id: TableId) -> bool {
597 self.stream.contains_key(&table_id)
598 }
599 fn in_batch(&self, table_id: TableId) -> bool {
600 self.batch.contains_key(&table_id)
601 }
602 fn add_flow(&mut self, flow_id: FlowId, flow_type: FlowType, src_table_ids: Vec<TableId>) {
603 let mapping = match flow_type {
604 FlowType::Streaming => &mut self.stream,
605 FlowType::Batching => &mut self.batch,
606 };
607
608 for src_table in src_table_ids.clone() {
609 mapping
610 .entry(src_table)
611 .and_modify(|flows| {
612 flows.insert(flow_id);
613 })
614 .or_insert_with(|| {
615 let mut set = HashSet::new();
616 set.insert(flow_id);
617 set
618 });
619 }
620 self.flow_infos.insert(flow_id, (flow_type, src_table_ids));
621 }
622
623 fn remove_flow(&mut self, flow_id: FlowId) {
624 let mapping = match self.get_flow_type(flow_id) {
625 Some(FlowType::Streaming) => &mut self.stream,
626 Some(FlowType::Batching) => &mut self.batch,
627 None => return,
628 };
629 if let Some((_, src_table_ids)) = self.flow_infos.remove(&flow_id) {
630 for src_table in src_table_ids {
631 if let Some(flows) = mapping.get_mut(&src_table) {
632 flows.remove(&flow_id);
633 }
634 }
635 }
636 }
637
638 fn get_flow_type(&self, flow_id: FlowId) -> Option<FlowType> {
639 self.flow_infos
640 .get(&flow_id)
641 .map(|(flow_type, _)| flow_type)
642 .cloned()
643 }
644}
645
646impl FlowEngine for FlowDualEngine {
647 async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
648 let flow_type = args
649 .flow_options
650 .get(FlowType::FLOW_TYPE_KEY)
651 .map(|s| s.as_str());
652
653 let flow_type = match flow_type {
654 Some(FlowType::BATCHING) => FlowType::Batching,
655 Some(FlowType::STREAMING) => FlowType::Streaming,
656 None => FlowType::Batching,
657 Some(flow_type) => {
658 return InternalSnafu {
659 reason: format!("Invalid flow type: {}", flow_type),
660 }
661 .fail();
662 }
663 };
664
665 let flow_id = args.flow_id;
666 let src_table_ids = args.source_table_ids.clone();
667
668 let res = match flow_type {
669 FlowType::Batching => self.batching_engine.create_flow(args).await,
670 FlowType::Streaming => self.streaming_engine.create_flow(args).await,
671 }?;
672
673 self.src_table2flow
674 .write()
675 .await
676 .add_flow(flow_id, flow_type, src_table_ids);
677
678 Ok(res)
679 }
680
681 async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
682 let flow_type = self.src_table2flow.read().await.get_flow_type(flow_id);
683
684 match flow_type {
685 Some(FlowType::Batching) => self.batching_engine.remove_flow(flow_id).await,
686 Some(FlowType::Streaming) => self.streaming_engine.remove_flow(flow_id).await,
687 None => {
688 warn!(
692 "Flow {} is not exist in the underlying engine, but exist in metadata",
693 flow_id
694 );
695 self.try_sync_with_check_task(flow_id, true).await?;
696
697 Ok(())
698 }
699 }?;
700 self.src_table2flow.write().await.remove_flow(flow_id);
702 Ok(())
703 }
704
705 async fn flush_flow(&self, flow_id: FlowId) -> Result<usize, Error> {
706 self.try_sync_with_check_task(flow_id, false).await?;
708 let flow_type = self.src_table2flow.read().await.get_flow_type(flow_id);
709 match flow_type {
710 Some(FlowType::Batching) => self.batching_engine.flush_flow(flow_id).await,
711 Some(FlowType::Streaming) => self.streaming_engine.flush_flow(flow_id).await,
712 None => {
713 warn!(
714 "Currently flow={flow_id} doesn't exist in flownode, ignore flush_flow request"
715 );
716 Ok(0)
717 }
718 }
719 }
720
721 async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error> {
722 let flow_type = self.src_table2flow.read().await.get_flow_type(flow_id);
723 match flow_type {
725 Some(FlowType::Batching) => self.batching_engine.flow_exist(flow_id).await,
726 Some(FlowType::Streaming) => self.streaming_engine.flow_exist(flow_id).await,
727 None => Ok(false),
728 }
729 }
730
731 async fn list_flows(&self) -> Result<impl IntoIterator<Item = FlowId>, Error> {
732 let stream_flows = self.streaming_engine.list_flows().await?;
733 let batch_flows = self.batching_engine.list_flows().await?;
734
735 Ok(stream_flows.into_iter().chain(batch_flows))
736 }
737
738 async fn handle_flow_inserts(
739 &self,
740 request: api::v1::region::InsertRequests,
741 ) -> Result<(), Error> {
742 self.wait_for_all_flow_recover(request.requests.len())
743 .await?;
744 let mut to_stream_engine = Vec::with_capacity(request.requests.len());
746 let mut to_batch_engine = request.requests;
747
748 let mut batching_row_cnt = 0;
749 let mut streaming_row_cnt = 0;
750
751 {
752 let src_table2flow = self.src_table2flow.read().await;
754 to_batch_engine.retain(|req| {
755 let region_id = RegionId::from(req.region_id);
756 let table_id = region_id.table_id();
757 let is_in_stream = src_table2flow.in_stream(table_id);
758 let is_in_batch = src_table2flow.in_batch(table_id);
759 if is_in_stream {
760 streaming_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
761 to_stream_engine.push(req.clone());
762 }
763 if is_in_batch {
764 batching_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
765 return true;
766 }
767 if !is_in_batch && !is_in_stream {
768 warn!("Table {} is not any flow's source table", table_id)
770 }
771 false
772 });
773 }
776
777 METRIC_FLOW_ROWS
778 .with_label_values(&["in-streaming"])
779 .inc_by(streaming_row_cnt as u64);
780
781 METRIC_FLOW_ROWS
782 .with_label_values(&["in-batching"])
783 .inc_by(batching_row_cnt as u64);
784
785 let streaming_engine = self.streaming_engine.clone();
786 let stream_handler: JoinHandle<Result<(), Error>> =
787 common_runtime::spawn_global(async move {
788 streaming_engine
789 .handle_flow_inserts(api::v1::region::InsertRequests {
790 requests: to_stream_engine,
791 })
792 .await?;
793 Ok(())
794 });
795 self.batching_engine
796 .handle_flow_inserts(api::v1::region::InsertRequests {
797 requests: to_batch_engine,
798 })
799 .await?;
800 stream_handler.await.context(JoinTaskSnafu)??;
801
802 Ok(())
803 }
804
805 async fn handle_mark_window_dirty(
806 &self,
807 req: api::v1::flow::DirtyWindowRequests,
808 ) -> Result<(), Error> {
809 self.batching_engine.handle_mark_window_dirty(req).await
810 }
811}
812
813#[async_trait::async_trait]
814impl common_meta::node_manager::Flownode for FlowDualEngine {
815 async fn handle(&self, request: FlowRequest) -> MetaResult<FlowResponse> {
816 let query_ctx = request
817 .header
818 .and_then(|h| h.query_context)
819 .map(|ctx| ctx.into());
820 match request.body {
821 Some(flow_request::Body::Create(CreateRequest {
822 flow_id: Some(task_id),
823 source_table_ids,
824 sink_table_name: Some(sink_table_name),
825 create_if_not_exists,
826 expire_after,
827 eval_interval,
828 comment,
829 sql,
830 flow_options,
831 or_replace,
832 })) => {
833 let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec();
834 let sink_table_name = [
835 sink_table_name.catalog_name,
836 sink_table_name.schema_name,
837 sink_table_name.table_name,
838 ];
839 let expire_after = expire_after.map(|e| e.value);
840 let args = CreateFlowArgs {
841 flow_id: task_id.id as u64,
842 sink_table_name,
843 source_table_ids,
844 create_if_not_exists,
845 or_replace,
846 expire_after,
847 eval_interval: eval_interval.map(|e| e.seconds),
848 comment: Some(comment),
849 sql: sql.clone(),
850 flow_options,
851 query_ctx,
852 };
853 let ret = self
854 .create_flow(args)
855 .await
856 .map_err(BoxedError::new)
857 .with_context(|_| CreateFlowSnafu { sql: sql.clone() })
858 .map_err(to_meta_err(snafu::location!()))?;
859 METRIC_FLOW_TASK_COUNT.inc();
860 Ok(FlowResponse {
861 affected_flows: ret
862 .map(|id| greptime_proto::v1::FlowId { id: id as u32 })
863 .into_iter()
864 .collect_vec(),
865 ..Default::default()
866 })
867 }
868 Some(flow_request::Body::Drop(DropRequest {
869 flow_id: Some(flow_id),
870 })) => {
871 self.remove_flow(flow_id.id as u64)
872 .await
873 .map_err(to_meta_err(snafu::location!()))?;
874 METRIC_FLOW_TASK_COUNT.dec();
875 Ok(Default::default())
876 }
877 Some(flow_request::Body::Flush(FlushFlow {
878 flow_id: Some(flow_id),
879 })) => {
880 let row = self
881 .flush_flow(flow_id.id as u64)
882 .await
883 .map_err(to_meta_err(snafu::location!()))?;
884 Ok(FlowResponse {
885 affected_flows: vec![flow_id],
886 affected_rows: row as u64,
887 ..Default::default()
888 })
889 }
890 other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
891 }
892 }
893
894 async fn handle_inserts(&self, request: InsertRequests) -> MetaResult<FlowResponse> {
895 FlowEngine::handle_flow_inserts(self, request)
896 .await
897 .map(|_| Default::default())
898 .map_err(to_meta_err(snafu::location!()))
899 }
900
901 async fn handle_mark_window_dirty(&self, req: DirtyWindowRequests) -> MetaResult<FlowResponse> {
902 self.batching_engine()
903 .handle_mark_dirty_time_window(req)
904 .await
905 .map(|_| FlowResponse::default())
906 .map_err(to_meta_err(snafu::location!()))
907 }
908}
909
910fn to_meta_err(
912 location: snafu::Location,
913) -> impl FnOnce(crate::error::Error) -> common_meta::error::Error {
914 move |err: crate::error::Error| -> common_meta::error::Error {
915 match err {
916 crate::error::Error::FlowNotFound { id, .. } => {
917 common_meta::error::Error::FlowNotFound {
918 flow_name: format!("flow_id={id}"),
919 location,
920 }
921 }
922 _ => common_meta::error::Error::External {
923 location,
924 source: BoxedError::new(err),
925 },
926 }
927 }
928}
929
930impl FlowEngine for StreamingEngine {
931 async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
932 self.create_flow_inner(args).await
933 }
934
935 async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
936 self.remove_flow_inner(flow_id).await
937 }
938
939 async fn flush_flow(&self, flow_id: FlowId) -> Result<usize, Error> {
940 self.flush_flow_inner(flow_id).await
941 }
942
943 async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error> {
944 self.flow_exist_inner(flow_id).await
945 }
946
947 async fn list_flows(&self) -> Result<impl IntoIterator<Item = FlowId>, Error> {
948 Ok(self
949 .flow_err_collectors
950 .read()
951 .await
952 .keys()
953 .cloned()
954 .collect::<Vec<_>>())
955 }
956
957 async fn handle_flow_inserts(
958 &self,
959 request: api::v1::region::InsertRequests,
960 ) -> Result<(), Error> {
961 self.handle_inserts_inner(request).await
962 }
963
964 async fn handle_mark_window_dirty(
965 &self,
966 _req: api::v1::flow::DirtyWindowRequests,
967 ) -> Result<(), Error> {
968 UnsupportedSnafu {
969 reason: "handle_mark_window_dirty in streaming engine",
970 }
971 .fail()
972 }
973}
974
975#[derive(Debug, Clone)]
977enum FetchFromRow {
978 Idx(usize),
979 Default(Value),
980}
981
982impl FetchFromRow {
983 fn fetch(&self, row: &repr::Row) -> Value {
985 match self {
986 FetchFromRow::Idx(idx) => row.get(*idx).unwrap().clone(),
987 FetchFromRow::Default(v) => v.clone(),
988 }
989 }
990}
991
992impl StreamingEngine {
993 async fn handle_inserts_inner(
994 &self,
995 request: InsertRequests,
996 ) -> std::result::Result<(), Error> {
997 let _flush_lock = self.flush_lock.try_read();
1001 for write_request in request.requests {
1002 let region_id = write_request.region_id;
1003 let table_id = RegionId::from(region_id).table_id();
1004
1005 let (insert_schema, rows_proto) = write_request
1006 .rows
1007 .map(|r| (r.schema, r.rows))
1008 .unwrap_or_default();
1009
1010 let now = self.tick_manager.tick();
1012
1013 let (table_types, fetch_order) = {
1014 let ctx = self.node_context.read().await;
1015
1016 let table_schema = ctx.table_source.table_from_id(&table_id).await?;
1018 let default_vals = table_schema
1019 .default_values
1020 .iter()
1021 .zip(table_schema.relation_desc.typ().column_types.iter())
1022 .map(|(v, ty)| {
1023 v.as_ref().and_then(|v| {
1024 match v.create_default(ty.scalar_type(), ty.nullable()) {
1025 Ok(v) => Some(v),
1026 Err(err) => {
1027 common_telemetry::error!(err; "Failed to create default value");
1028 None
1029 }
1030 }
1031 })
1032 })
1033 .collect_vec();
1034
1035 let table_types = table_schema
1036 .relation_desc
1037 .typ()
1038 .column_types
1039 .clone()
1040 .into_iter()
1041 .map(|t| t.scalar_type)
1042 .collect_vec();
1043 let table_col_names = table_schema.relation_desc.names;
1044 let table_col_names = table_col_names
1045 .iter().enumerate()
1046 .map(|(idx,name)| match name {
1047 Some(name) => Ok(name.clone()),
1048 None => InternalSnafu {
1049 reason: format!("Expect column {idx} of table id={table_id} to have name in table schema, found None"),
1050 }
1051 .fail(),
1052 })
1053 .collect::<Result<Vec<_>, _>>()?;
1054 let name_to_col = HashMap::<_, _>::from_iter(
1055 insert_schema
1056 .iter()
1057 .enumerate()
1058 .map(|(i, name)| (&name.column_name, i)),
1059 );
1060
1061 let fetch_order: Vec<FetchFromRow> = table_col_names
1062 .iter()
1063 .zip(default_vals.into_iter())
1064 .map(|(col_name, col_default_val)| {
1065 name_to_col
1066 .get(col_name)
1067 .copied()
1068 .map(FetchFromRow::Idx)
1069 .or_else(|| col_default_val.clone().map(FetchFromRow::Default))
1070 .with_context(|| UnexpectedSnafu {
1071 reason: format!(
1072 "Column not found: {}, default_value: {:?}",
1073 col_name, col_default_val
1074 ),
1075 })
1076 })
1077 .try_collect()?;
1078
1079 trace!("Reordering columns: {:?}", fetch_order);
1080 (table_types, fetch_order)
1081 };
1082
1083 let rows: Vec<DiffRow> = rows_proto
1085 .into_iter()
1086 .map(|r| {
1087 let r = repr::Row::from(r);
1088 let reordered = fetch_order.iter().map(|i| i.fetch(&r)).collect_vec();
1089 repr::Row::new(reordered)
1090 })
1091 .map(|r| (r, now, 1))
1092 .collect_vec();
1093 if let Err(err) = self
1094 .handle_write_request(region_id.into(), rows, &table_types)
1095 .await
1096 {
1097 let err = BoxedError::new(err);
1098 let flow_ids = self
1099 .node_context
1100 .read()
1101 .await
1102 .get_flow_ids(table_id)
1103 .into_iter()
1104 .flatten()
1105 .cloned()
1106 .collect_vec();
1107 let err = InsertIntoFlowSnafu {
1108 region_id,
1109 flow_ids,
1110 }
1111 .into_error(err);
1112 common_telemetry::error!(err; "Failed to handle write request");
1113 return Err(err);
1114 }
1115 }
1116 Ok(())
1117 }
1118}