1use std::collections::{HashMap, HashSet};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19
20use api::v1::flow::{
21 CreateRequest, DirtyWindowRequests, DropRequest, FlowRequest, FlowResponse, FlushFlow,
22 flow_request,
23};
24use api::v1::region::InsertRequests;
25use catalog::CatalogManager;
26use common_base::Plugins;
27use common_error::ext::BoxedError;
28use common_meta::ddl::create_flow::FlowType;
29use common_meta::error::Result as MetaResult;
30use common_meta::key::flow::FlowMetadataManager;
31use common_runtime::JoinHandle;
32use common_telemetry::{error, info, trace, warn};
33use datatypes::value::Value;
34use futures::TryStreamExt;
35use itertools::Itertools;
36use session::context::QueryContextBuilder;
37use snafu::{IntoError, OptionExt, ResultExt, ensure};
38use store_api::storage::{RegionId, TableId};
39use tokio::sync::{Mutex, RwLock};
40
41use crate::adapter::{CreateFlowArgs, StreamingEngine};
42use crate::batching_mode::engine::BatchingEngine;
43use crate::engine::FlowEngine;
44use crate::error::{
45 CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, FlowNotRecoveredSnafu,
46 IllegalCheckTaskStateSnafu, InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu,
47 NoAvailableFrontendSnafu, SyncCheckTaskSnafu, UnexpectedSnafu, UnsupportedSnafu,
48};
49use crate::metrics::{METRIC_FLOW_ROWS, METRIC_FLOW_TASK_COUNT};
50use crate::repr::{self, DiffRow};
51use crate::{Error, FlowId};
52
53pub type FlowDualEngineRef = Arc<FlowDualEngine>;
55
56pub struct FlowDualEngine {
61 streaming_engine: Arc<StreamingEngine>,
62 batching_engine: Arc<BatchingEngine>,
63 src_table2flow: RwLock<SrcTableToFlow>,
65 flow_metadata_manager: Arc<FlowMetadataManager>,
66 catalog_manager: Arc<dyn CatalogManager>,
67 check_task: tokio::sync::Mutex<Option<ConsistentCheckTask>>,
68 plugins: Plugins,
69 done_recovering: AtomicBool,
70}
71
72impl FlowDualEngine {
73 pub fn new(
74 streaming_engine: Arc<StreamingEngine>,
75 batching_engine: Arc<BatchingEngine>,
76 flow_metadata_manager: Arc<FlowMetadataManager>,
77 catalog_manager: Arc<dyn CatalogManager>,
78 plugins: Plugins,
79 ) -> Self {
80 Self {
81 streaming_engine,
82 batching_engine,
83 src_table2flow: RwLock::new(SrcTableToFlow::default()),
84 flow_metadata_manager,
85 catalog_manager,
86 check_task: Mutex::new(None),
87 plugins,
88 done_recovering: AtomicBool::new(false),
89 }
90 }
91
92 pub fn set_done_recovering(&self) {
95 info!("FlowDualEngine done recovering");
96 self.done_recovering
97 .store(true, std::sync::atomic::Ordering::Release);
98 }
99
100 pub fn is_recover_done(&self) -> bool {
102 self.done_recovering
103 .load(std::sync::atomic::Ordering::Acquire)
104 }
105
106 async fn wait_for_all_flow_recover(&self, waiting_req_cnt: usize) -> Result<(), Error> {
108 if self.is_recover_done() {
109 return Ok(());
110 }
111
112 warn!(
113 "FlowDualEngine is not done recovering, {} insert request waiting for recovery",
114 waiting_req_cnt
115 );
116 let mut retry = 0;
119 let max_retry = 3;
120 while retry < max_retry && !self.is_recover_done() {
121 warn!(
122 "FlowDualEngine is not done recovering, retry {} in 1s",
123 retry
124 );
125 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
126 retry += 1;
127 }
128 if retry == max_retry {
129 return FlowNotRecoveredSnafu.fail();
130 } else {
131 info!("FlowDualEngine is done recovering");
132 }
133 Ok(())
135 }
136
137 pub fn plugins(&self) -> &Plugins {
138 &self.plugins
139 }
140
141 pub fn is_distributed(&self) -> bool {
143 self.streaming_engine.node_id.is_some()
144 }
145
146 pub fn streaming_engine(&self) -> Arc<StreamingEngine> {
147 self.streaming_engine.clone()
148 }
149
150 pub fn batching_engine(&self) -> Arc<BatchingEngine> {
151 self.batching_engine.clone()
152 }
153
154 async fn wait_for_available_frontend(&self, timeout: std::time::Duration) -> Result<(), Error> {
158 if !self.is_distributed() {
159 return Ok(());
160 }
161 let frontend_client = self.batching_engine().frontend_client.clone();
162 let sleep_duration = std::time::Duration::from_millis(1_000);
163 let now = std::time::Instant::now();
164 loop {
165 let frontend_list = frontend_client.scan_for_frontend().await?;
166 if !frontend_list.is_empty() {
167 let fe_list = frontend_list
168 .iter()
169 .map(|(_, info)| &info.peer.addr)
170 .collect::<Vec<_>>();
171 info!("Available frontend found: {:?}", fe_list);
172 return Ok(());
173 }
174 let elapsed = now.elapsed();
175 tokio::time::sleep(sleep_duration).await;
176 info!("Waiting for available frontend, elapsed={:?}", elapsed);
177 if elapsed >= timeout {
178 return NoAvailableFrontendSnafu {
179 timeout,
180 context: "No available frontend found in cluster info",
181 }
182 .fail();
183 }
184 }
185 }
186
187 async fn try_sync_with_check_task(
191 &self,
192 flow_id: FlowId,
193 allow_drop: bool,
194 ) -> Result<(), Error> {
195 info!("Try to sync with check task for flow {}", flow_id);
197 let mut retry = 0;
198 let max_retry = 10;
199 while retry < max_retry {
201 if let Some(task) = self.check_task.lock().await.as_ref() {
202 task.trigger(false, allow_drop).await?;
203 break;
204 }
205 retry += 1;
206 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
207 }
208
209 if retry == max_retry {
210 error!(
211 "Can't sync with check task for flow {} with allow_drop={}",
212 flow_id, allow_drop
213 );
214 return SyncCheckTaskSnafu {
215 flow_id,
216 allow_drop,
217 }
218 .fail();
219 }
220 info!("Successfully sync with check task for flow {}", flow_id);
221
222 Ok(())
223 }
224
225 async fn check_flow_consistent(
228 &self,
229 allow_create: bool,
230 allow_drop: bool,
231 ) -> Result<(), Error> {
232 let nodeid = self.streaming_engine.node_id;
234 let should_exists: Vec<_> = if let Some(nodeid) = nodeid {
235 let to_be_recover = self
238 .flow_metadata_manager
239 .flownode_flow_manager()
240 .flows(nodeid.into())
241 .try_collect::<Vec<_>>()
242 .await
243 .context(ListFlowsSnafu {
244 id: Some(nodeid.into()),
245 })?;
246 to_be_recover.into_iter().map(|(id, _)| id).collect()
247 } else {
248 let all_catalogs = self
251 .catalog_manager
252 .catalog_names()
253 .await
254 .map_err(BoxedError::new)
255 .context(ExternalSnafu)?;
256 let mut all_flow_ids = vec![];
257 for catalog in all_catalogs {
258 let flows = self
259 .flow_metadata_manager
260 .flow_name_manager()
261 .flow_names(&catalog)
262 .await
263 .try_collect::<Vec<_>>()
264 .await
265 .map_err(BoxedError::new)
266 .context(ExternalSnafu)?;
267
268 all_flow_ids.extend(flows.into_iter().map(|(_, id)| id.flow_id()));
269 }
270 all_flow_ids
271 };
272 let should_exists = should_exists
273 .into_iter()
274 .map(|i| i as FlowId)
275 .collect::<HashSet<_>>();
276 let actual_exists = self.list_flows().await?.into_iter().collect::<HashSet<_>>();
277 let to_be_created = should_exists
278 .iter()
279 .filter(|id| !actual_exists.contains(id))
280 .collect::<Vec<_>>();
281 let to_be_dropped = actual_exists
282 .iter()
283 .filter(|id| !should_exists.contains(id))
284 .collect::<Vec<_>>();
285
286 if !to_be_created.is_empty() {
287 if allow_create {
288 info!(
289 "Recovering {} flows: {:?}",
290 to_be_created.len(),
291 to_be_created
292 );
293 let mut errors = vec![];
294 for flow_id in to_be_created.clone() {
295 let flow_id = *flow_id;
296 let info = self
297 .flow_metadata_manager
298 .flow_info_manager()
299 .get(flow_id as u32)
300 .await
301 .map_err(BoxedError::new)
302 .context(ExternalSnafu)?
303 .context(FlowNotFoundSnafu { id: flow_id })?;
304
305 let sink_table_name = [
306 info.sink_table_name().catalog_name.clone(),
307 info.sink_table_name().schema_name.clone(),
308 info.sink_table_name().table_name.clone(),
309 ];
310 let args = CreateFlowArgs {
311 flow_id,
312 sink_table_name,
313 source_table_ids: info.source_table_ids().to_vec(),
314 create_if_not_exists: true,
318 or_replace: true,
319 expire_after: info.expire_after(),
320 eval_interval: info.eval_interval(),
321 comment: Some(info.comment().clone()),
322 sql: info.raw_sql().clone(),
323 flow_options: info.options().clone(),
324 query_ctx: info
325 .query_context()
326 .clone()
327 .map(|ctx| {
328 ctx.try_into()
329 .map_err(BoxedError::new)
330 .context(ExternalSnafu)
331 })
332 .transpose()?
333 .or_else(|| {
336 Some(
337 QueryContextBuilder::default()
338 .current_catalog(info.catalog_name().to_string())
339 .build(),
340 )
341 }),
342 };
343 if let Err(err) = self
344 .create_flow(args)
345 .await
346 .map_err(BoxedError::new)
347 .with_context(|_| CreateFlowSnafu {
348 sql: info.raw_sql().clone(),
349 })
350 {
351 errors.push((flow_id, err));
352 }
353 }
354 if errors.is_empty() {
355 info!("Recover flows successfully, flows: {:?}", to_be_created);
356 }
357
358 for (flow_id, err) in errors {
359 warn!("Failed to recreate flow {}, err={:#?}", flow_id, err);
360 }
361 } else {
362 warn!(
363 "Flows do not exist in flownode for node {:?}, flow_ids={:?}",
364 nodeid, to_be_created
365 );
366 }
367 }
368 if !to_be_dropped.is_empty() {
369 if allow_drop {
370 info!("Dropping flows: {:?}", to_be_dropped);
371 let mut errors = vec![];
372 for flow_id in to_be_dropped {
373 let flow_id = *flow_id;
374 if let Err(err) = self.remove_flow(flow_id).await {
375 errors.push((flow_id, err));
376 }
377 }
378 for (flow_id, err) in errors {
379 warn!("Failed to drop flow {}, err={:#?}", flow_id, err);
380 }
381 } else {
382 warn!(
383 "Flows do not exist in metadata for node {:?}, flow_ids={:?}",
384 nodeid, to_be_dropped
385 );
386 }
387 }
388 Ok(())
389 }
390
391 pub async fn start_flow_consistent_check_task(self: &Arc<Self>) -> Result<(), Error> {
393 let mut check_task = self.check_task.lock().await;
394 ensure!(
395 check_task.is_none(),
396 IllegalCheckTaskStateSnafu {
397 reason: "Flow consistent check task already exists",
398 }
399 );
400 let task = ConsistentCheckTask::start_check_task(self).await?;
401 *check_task = Some(task);
402 Ok(())
403 }
404
405 pub async fn stop_flow_consistent_check_task(&self) -> Result<(), Error> {
406 info!("Stopping flow consistent check task");
407 let mut check_task = self.check_task.lock().await;
408
409 ensure!(
410 check_task.is_some(),
411 IllegalCheckTaskStateSnafu {
412 reason: "Flow consistent check task does not exist",
413 }
414 );
415
416 check_task.take().unwrap().stop().await?;
417 info!("Stopped flow consistent check task");
418 Ok(())
419 }
420
421 async fn flow_exist_in_metadata(&self, flow_id: FlowId) -> Result<bool, Error> {
423 self.flow_metadata_manager
424 .flow_info_manager()
425 .get(flow_id as u32)
426 .await
427 .map_err(BoxedError::new)
428 .context(ExternalSnafu)
429 .map(|info| info.is_some())
430 }
431}
432
433struct ConsistentCheckTask {
434 handle: JoinHandle<()>,
435 shutdown_tx: tokio::sync::mpsc::Sender<()>,
436 trigger_tx: tokio::sync::mpsc::Sender<(bool, bool, tokio::sync::oneshot::Sender<()>)>,
437}
438
439impl ConsistentCheckTask {
440 async fn start_check_task(engine: &Arc<FlowDualEngine>) -> Result<Self, Error> {
441 let engine = engine.clone();
442 let min_refresh_duration = engine
443 .batching_engine()
444 .batch_opts
445 .experimental_min_refresh_duration;
446 let frontend_scan_timeout = engine
447 .batching_engine()
448 .batch_opts
449 .experimental_frontend_scan_timeout;
450 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
451 let (trigger_tx, mut trigger_rx) =
452 tokio::sync::mpsc::channel::<(bool, bool, tokio::sync::oneshot::Sender<()>)>(10);
453 let handle = common_runtime::spawn_global(async move {
454 if let Err(err) = engine
456 .wait_for_available_frontend(frontend_scan_timeout)
457 .await
458 {
459 warn!("No frontend is available yet:\n {err:?}");
460 }
461
462 let mut recover_retry = 0;
464 while let Err(err) = engine.check_flow_consistent(true, false).await {
465 recover_retry += 1;
466 error!(
467 "Failed to recover flows:\n {err:?}, retry {} in {}s",
468 recover_retry,
469 min_refresh_duration.as_secs()
470 );
471 tokio::time::sleep(min_refresh_duration).await;
472 }
473
474 engine.set_done_recovering();
475
476 let (mut allow_create, mut allow_drop) = (false, false);
478 let mut ret_signal: Option<tokio::sync::oneshot::Sender<()>> = None;
479 loop {
480 if let Err(err) = engine.check_flow_consistent(allow_create, allow_drop).await {
481 error!(err; "Failed to check flow consistent");
482 }
483 if let Some(done) = ret_signal.take() {
484 let _ = done.send(());
485 }
486 tokio::select! {
487 _ = rx.recv() => break,
488 incoming = trigger_rx.recv() => if let Some(incoming) = incoming {
489 (allow_create, allow_drop) = (incoming.0, incoming.1);
490 ret_signal = Some(incoming.2);
491 },
492 _ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
493 (allow_create, allow_drop) = (false, false);
494 },
495 }
496 }
497 });
498 Ok(ConsistentCheckTask {
499 handle,
500 shutdown_tx: tx,
501 trigger_tx,
502 })
503 }
504
505 async fn trigger(&self, allow_create: bool, allow_drop: bool) -> Result<(), Error> {
506 let (tx, rx) = tokio::sync::oneshot::channel();
507 self.trigger_tx
508 .send((allow_create, allow_drop, tx))
509 .await
510 .map_err(|_| {
511 IllegalCheckTaskStateSnafu {
512 reason: "Failed to send trigger signal",
513 }
514 .build()
515 })?;
516 rx.await.map_err(|_| {
517 IllegalCheckTaskStateSnafu {
518 reason: "Failed to receive trigger signal",
519 }
520 .build()
521 })?;
522 Ok(())
523 }
524
525 async fn stop(self) -> Result<(), Error> {
526 self.shutdown_tx.send(()).await.map_err(|_| {
527 IllegalCheckTaskStateSnafu {
528 reason: "Failed to send shutdown signal",
529 }
530 .build()
531 })?;
532 self.handle.abort();
534 Ok(())
535 }
536}
537
538#[derive(Default)]
539struct SrcTableToFlow {
540 stream: HashMap<TableId, HashSet<FlowId>>,
542 batch: HashMap<TableId, HashSet<FlowId>>,
544 flow_infos: HashMap<FlowId, (FlowType, Vec<TableId>)>,
546}
547
548impl SrcTableToFlow {
549 fn in_stream(&self, table_id: TableId) -> bool {
550 self.stream.contains_key(&table_id)
551 }
552 fn in_batch(&self, table_id: TableId) -> bool {
553 self.batch.contains_key(&table_id)
554 }
555 fn add_flow(&mut self, flow_id: FlowId, flow_type: FlowType, src_table_ids: Vec<TableId>) {
556 let mapping = match flow_type {
557 FlowType::Streaming => &mut self.stream,
558 FlowType::Batching => &mut self.batch,
559 };
560
561 for src_table in src_table_ids.clone() {
562 mapping
563 .entry(src_table)
564 .and_modify(|flows| {
565 flows.insert(flow_id);
566 })
567 .or_insert_with(|| {
568 let mut set = HashSet::new();
569 set.insert(flow_id);
570 set
571 });
572 }
573 self.flow_infos.insert(flow_id, (flow_type, src_table_ids));
574 }
575
576 fn remove_flow(&mut self, flow_id: FlowId) {
577 let mapping = match self.get_flow_type(flow_id) {
578 Some(FlowType::Streaming) => &mut self.stream,
579 Some(FlowType::Batching) => &mut self.batch,
580 None => return,
581 };
582 if let Some((_, src_table_ids)) = self.flow_infos.remove(&flow_id) {
583 for src_table in src_table_ids {
584 if let Some(flows) = mapping.get_mut(&src_table) {
585 flows.remove(&flow_id);
586 }
587 }
588 }
589 }
590
591 fn get_flow_type(&self, flow_id: FlowId) -> Option<FlowType> {
592 self.flow_infos
593 .get(&flow_id)
594 .map(|(flow_type, _)| flow_type)
595 .cloned()
596 }
597}
598
599impl FlowEngine for FlowDualEngine {
600 async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
601 let flow_type = args
602 .flow_options
603 .get(FlowType::FLOW_TYPE_KEY)
604 .map(|s| s.as_str());
605
606 let flow_type = match flow_type {
607 Some(FlowType::BATCHING) => FlowType::Batching,
608 Some(FlowType::STREAMING) => FlowType::Streaming,
609 None => FlowType::Batching,
610 Some(flow_type) => {
611 return InternalSnafu {
612 reason: format!("Invalid flow type: {}", flow_type),
613 }
614 .fail();
615 }
616 };
617
618 let flow_id = args.flow_id;
619 let src_table_ids = args.source_table_ids.clone();
620
621 let res = match flow_type {
622 FlowType::Batching => self.batching_engine.create_flow(args).await,
623 FlowType::Streaming => self.streaming_engine.create_flow(args).await,
624 }?;
625
626 self.src_table2flow
627 .write()
628 .await
629 .add_flow(flow_id, flow_type, src_table_ids);
630
631 Ok(res)
632 }
633
634 async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
635 let flow_type = self.src_table2flow.read().await.get_flow_type(flow_id);
636
637 match flow_type {
638 Some(FlowType::Batching) => self.batching_engine.remove_flow(flow_id).await,
639 Some(FlowType::Streaming) => self.streaming_engine.remove_flow(flow_id).await,
640 None => {
641 warn!(
645 "Flow {} is not exist in the underlying engine, but exist in metadata",
646 flow_id
647 );
648 self.try_sync_with_check_task(flow_id, true).await?;
649
650 Ok(())
651 }
652 }?;
653 self.src_table2flow.write().await.remove_flow(flow_id);
655 Ok(())
656 }
657
658 async fn flush_flow(&self, flow_id: FlowId) -> Result<usize, Error> {
659 self.try_sync_with_check_task(flow_id, false).await?;
661 let flow_type = self.src_table2flow.read().await.get_flow_type(flow_id);
662 match flow_type {
663 Some(FlowType::Batching) => self.batching_engine.flush_flow(flow_id).await,
664 Some(FlowType::Streaming) => self.streaming_engine.flush_flow(flow_id).await,
665 None => {
666 warn!(
667 "Currently flow={flow_id} doesn't exist in flownode, ignore flush_flow request"
668 );
669 Ok(0)
670 }
671 }
672 }
673
674 async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error> {
675 let flow_type = self.src_table2flow.read().await.get_flow_type(flow_id);
676 match flow_type {
678 Some(FlowType::Batching) => self.batching_engine.flow_exist(flow_id).await,
679 Some(FlowType::Streaming) => self.streaming_engine.flow_exist(flow_id).await,
680 None => Ok(false),
681 }
682 }
683
684 async fn list_flows(&self) -> Result<impl IntoIterator<Item = FlowId>, Error> {
685 let stream_flows = self.streaming_engine.list_flows().await?;
686 let batch_flows = self.batching_engine.list_flows().await?;
687
688 Ok(stream_flows.into_iter().chain(batch_flows))
689 }
690
691 async fn handle_flow_inserts(
692 &self,
693 request: api::v1::region::InsertRequests,
694 ) -> Result<(), Error> {
695 self.wait_for_all_flow_recover(request.requests.len())
696 .await?;
697 let mut to_stream_engine = Vec::with_capacity(request.requests.len());
699 let mut to_batch_engine = request.requests;
700
701 let mut batching_row_cnt = 0;
702 let mut streaming_row_cnt = 0;
703
704 {
705 let src_table2flow = self.src_table2flow.read().await;
707 to_batch_engine.retain(|req| {
708 let region_id = RegionId::from(req.region_id);
709 let table_id = region_id.table_id();
710 let is_in_stream = src_table2flow.in_stream(table_id);
711 let is_in_batch = src_table2flow.in_batch(table_id);
712 if is_in_stream {
713 streaming_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
714 to_stream_engine.push(req.clone());
715 }
716 if is_in_batch {
717 batching_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
718 return true;
719 }
720 if !is_in_batch && !is_in_stream {
721 warn!("Table {} is not any flow's source table", table_id)
723 }
724 false
725 });
726 }
729
730 METRIC_FLOW_ROWS
731 .with_label_values(&["in-streaming"])
732 .inc_by(streaming_row_cnt as u64);
733
734 METRIC_FLOW_ROWS
735 .with_label_values(&["in-batching"])
736 .inc_by(batching_row_cnt as u64);
737
738 let streaming_engine = self.streaming_engine.clone();
739 let stream_handler: JoinHandle<Result<(), Error>> =
740 common_runtime::spawn_global(async move {
741 streaming_engine
742 .handle_flow_inserts(api::v1::region::InsertRequests {
743 requests: to_stream_engine,
744 })
745 .await?;
746 Ok(())
747 });
748 self.batching_engine
749 .handle_flow_inserts(api::v1::region::InsertRequests {
750 requests: to_batch_engine,
751 })
752 .await?;
753 stream_handler.await.context(JoinTaskSnafu)??;
754
755 Ok(())
756 }
757
758 async fn handle_mark_window_dirty(
759 &self,
760 req: api::v1::flow::DirtyWindowRequests,
761 ) -> Result<(), Error> {
762 self.batching_engine.handle_mark_window_dirty(req).await
763 }
764}
765
766#[async_trait::async_trait]
767impl common_meta::node_manager::Flownode for FlowDualEngine {
768 async fn handle(&self, request: FlowRequest) -> MetaResult<FlowResponse> {
769 let query_ctx = request
770 .header
771 .and_then(|h| h.query_context)
772 .map(|ctx| ctx.into());
773 match request.body {
774 Some(flow_request::Body::Create(CreateRequest {
775 flow_id: Some(task_id),
776 source_table_ids,
777 sink_table_name: Some(sink_table_name),
778 create_if_not_exists,
779 expire_after,
780 eval_interval,
781 comment,
782 sql,
783 flow_options,
784 or_replace,
785 })) => {
786 let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec();
787 let sink_table_name = [
788 sink_table_name.catalog_name,
789 sink_table_name.schema_name,
790 sink_table_name.table_name,
791 ];
792 let expire_after = expire_after.map(|e| e.value);
793 let args = CreateFlowArgs {
794 flow_id: task_id.id as u64,
795 sink_table_name,
796 source_table_ids,
797 create_if_not_exists,
798 or_replace,
799 expire_after,
800 eval_interval: eval_interval.map(|e| e.seconds),
801 comment: Some(comment),
802 sql: sql.clone(),
803 flow_options,
804 query_ctx,
805 };
806 let ret = self
807 .create_flow(args)
808 .await
809 .map_err(BoxedError::new)
810 .with_context(|_| CreateFlowSnafu { sql: sql.clone() })
811 .map_err(to_meta_err(snafu::location!()))?;
812 METRIC_FLOW_TASK_COUNT.inc();
813 Ok(FlowResponse {
814 affected_flows: ret
815 .map(|id| greptime_proto::v1::FlowId { id: id as u32 })
816 .into_iter()
817 .collect_vec(),
818 ..Default::default()
819 })
820 }
821 Some(flow_request::Body::Drop(DropRequest {
822 flow_id: Some(flow_id),
823 })) => {
824 self.remove_flow(flow_id.id as u64)
825 .await
826 .map_err(to_meta_err(snafu::location!()))?;
827 METRIC_FLOW_TASK_COUNT.dec();
828 Ok(Default::default())
829 }
830 Some(flow_request::Body::Flush(FlushFlow {
831 flow_id: Some(flow_id),
832 })) => {
833 let row = self
834 .flush_flow(flow_id.id as u64)
835 .await
836 .map_err(to_meta_err(snafu::location!()))?;
837 Ok(FlowResponse {
838 affected_flows: vec![flow_id],
839 affected_rows: row as u64,
840 ..Default::default()
841 })
842 }
843 other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
844 }
845 }
846
847 async fn handle_inserts(&self, request: InsertRequests) -> MetaResult<FlowResponse> {
848 FlowEngine::handle_flow_inserts(self, request)
849 .await
850 .map(|_| Default::default())
851 .map_err(to_meta_err(snafu::location!()))
852 }
853
854 async fn handle_mark_window_dirty(&self, req: DirtyWindowRequests) -> MetaResult<FlowResponse> {
855 self.batching_engine()
856 .handle_mark_dirty_time_window(req)
857 .await
858 .map(|_| FlowResponse::default())
859 .map_err(to_meta_err(snafu::location!()))
860 }
861}
862
863fn to_meta_err(
865 location: snafu::Location,
866) -> impl FnOnce(crate::error::Error) -> common_meta::error::Error {
867 move |err: crate::error::Error| -> common_meta::error::Error {
868 match err {
869 crate::error::Error::FlowNotFound { id, .. } => {
870 common_meta::error::Error::FlowNotFound {
871 flow_name: format!("flow_id={id}"),
872 location,
873 }
874 }
875 _ => common_meta::error::Error::External {
876 location,
877 source: BoxedError::new(err),
878 },
879 }
880 }
881}
882
883impl FlowEngine for StreamingEngine {
884 async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
885 self.create_flow_inner(args).await
886 }
887
888 async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
889 self.remove_flow_inner(flow_id).await
890 }
891
892 async fn flush_flow(&self, flow_id: FlowId) -> Result<usize, Error> {
893 self.flush_flow_inner(flow_id).await
894 }
895
896 async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error> {
897 self.flow_exist_inner(flow_id).await
898 }
899
900 async fn list_flows(&self) -> Result<impl IntoIterator<Item = FlowId>, Error> {
901 Ok(self
902 .flow_err_collectors
903 .read()
904 .await
905 .keys()
906 .cloned()
907 .collect::<Vec<_>>())
908 }
909
910 async fn handle_flow_inserts(
911 &self,
912 request: api::v1::region::InsertRequests,
913 ) -> Result<(), Error> {
914 self.handle_inserts_inner(request).await
915 }
916
917 async fn handle_mark_window_dirty(
918 &self,
919 _req: api::v1::flow::DirtyWindowRequests,
920 ) -> Result<(), Error> {
921 UnsupportedSnafu {
922 reason: "handle_mark_window_dirty in streaming engine",
923 }
924 .fail()
925 }
926}
927
928#[derive(Debug, Clone)]
930enum FetchFromRow {
931 Idx(usize),
932 Default(Value),
933}
934
935impl FetchFromRow {
936 fn fetch(&self, row: &repr::Row) -> Value {
938 match self {
939 FetchFromRow::Idx(idx) => row.get(*idx).unwrap().clone(),
940 FetchFromRow::Default(v) => v.clone(),
941 }
942 }
943}
944
945impl StreamingEngine {
946 async fn handle_inserts_inner(
947 &self,
948 request: InsertRequests,
949 ) -> std::result::Result<(), Error> {
950 let _flush_lock = self.flush_lock.try_read();
954 for write_request in request.requests {
955 let region_id = write_request.region_id;
956 let table_id = RegionId::from(region_id).table_id();
957
958 let (insert_schema, rows_proto) = write_request
959 .rows
960 .map(|r| (r.schema, r.rows))
961 .unwrap_or_default();
962
963 let now = self.tick_manager.tick();
965
966 let (table_types, fetch_order) = {
967 let ctx = self.node_context.read().await;
968
969 let table_schema = ctx.table_source.table_from_id(&table_id).await?;
971 let default_vals = table_schema
972 .default_values
973 .iter()
974 .zip(table_schema.relation_desc.typ().column_types.iter())
975 .map(|(v, ty)| {
976 v.as_ref().and_then(|v| {
977 match v.create_default(ty.scalar_type(), ty.nullable()) {
978 Ok(v) => Some(v),
979 Err(err) => {
980 common_telemetry::error!(err; "Failed to create default value");
981 None
982 }
983 }
984 })
985 })
986 .collect_vec();
987
988 let table_types = table_schema
989 .relation_desc
990 .typ()
991 .column_types
992 .clone()
993 .into_iter()
994 .map(|t| t.scalar_type)
995 .collect_vec();
996 let table_col_names = table_schema.relation_desc.names;
997 let table_col_names = table_col_names
998 .iter().enumerate()
999 .map(|(idx,name)| match name {
1000 Some(name) => Ok(name.clone()),
1001 None => InternalSnafu {
1002 reason: format!("Expect column {idx} of table id={table_id} to have name in table schema, found None"),
1003 }
1004 .fail(),
1005 })
1006 .collect::<Result<Vec<_>, _>>()?;
1007 let name_to_col = HashMap::<_, _>::from_iter(
1008 insert_schema
1009 .iter()
1010 .enumerate()
1011 .map(|(i, name)| (&name.column_name, i)),
1012 );
1013
1014 let fetch_order: Vec<FetchFromRow> = table_col_names
1015 .iter()
1016 .zip(default_vals.into_iter())
1017 .map(|(col_name, col_default_val)| {
1018 name_to_col
1019 .get(col_name)
1020 .copied()
1021 .map(FetchFromRow::Idx)
1022 .or_else(|| col_default_val.clone().map(FetchFromRow::Default))
1023 .with_context(|| UnexpectedSnafu {
1024 reason: format!(
1025 "Column not found: {}, default_value: {:?}",
1026 col_name, col_default_val
1027 ),
1028 })
1029 })
1030 .try_collect()?;
1031
1032 trace!("Reordering columns: {:?}", fetch_order);
1033 (table_types, fetch_order)
1034 };
1035
1036 let rows: Vec<DiffRow> = rows_proto
1038 .into_iter()
1039 .map(|r| {
1040 let r = repr::Row::from(r);
1041 let reordered = fetch_order.iter().map(|i| i.fetch(&r)).collect_vec();
1042 repr::Row::new(reordered)
1043 })
1044 .map(|r| (r, now, 1))
1045 .collect_vec();
1046 if let Err(err) = self
1047 .handle_write_request(region_id.into(), rows, &table_types)
1048 .await
1049 {
1050 let err = BoxedError::new(err);
1051 let flow_ids = self
1052 .node_context
1053 .read()
1054 .await
1055 .get_flow_ids(table_id)
1056 .into_iter()
1057 .flatten()
1058 .cloned()
1059 .collect_vec();
1060 let err = InsertIntoFlowSnafu {
1061 region_id,
1062 flow_ids,
1063 }
1064 .into_error(err);
1065 common_telemetry::error!(err; "Failed to handle write request");
1066 return Err(err);
1067 }
1068 }
1069 Ok(())
1070 }
1071}