1use std::collections::{HashMap, HashSet};
17use std::sync::atomic::AtomicBool;
18use std::sync::Arc;
19
20use api::v1::flow::{
21 flow_request, CreateRequest, DropRequest, FlowRequest, FlowResponse, FlushFlow,
22};
23use api::v1::region::InsertRequests;
24use catalog::CatalogManager;
25use common_base::Plugins;
26use common_error::ext::BoxedError;
27use common_meta::ddl::create_flow::FlowType;
28use common_meta::error::Result as MetaResult;
29use common_meta::key::flow::FlowMetadataManager;
30use common_runtime::JoinHandle;
31use common_telemetry::{error, info, trace, warn};
32use datatypes::value::Value;
33use futures::TryStreamExt;
34use itertools::Itertools;
35use session::context::QueryContextBuilder;
36use snafu::{ensure, IntoError, OptionExt, ResultExt};
37use store_api::storage::{RegionId, TableId};
38use tokio::sync::{Mutex, RwLock};
39
40use crate::adapter::{CreateFlowArgs, StreamingEngine};
41use crate::batching_mode::engine::BatchingEngine;
42use crate::batching_mode::{FRONTEND_SCAN_TIMEOUT, MIN_REFRESH_DURATION};
43use crate::engine::FlowEngine;
44use crate::error::{
45 CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, FlowNotRecoveredSnafu,
46 IllegalCheckTaskStateSnafu, InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu,
47 NoAvailableFrontendSnafu, SyncCheckTaskSnafu, UnexpectedSnafu,
48};
49use crate::metrics::METRIC_FLOW_TASK_COUNT;
50use crate::repr::{self, DiffRow};
51use crate::{Error, FlowId};
52
53pub type FlowDualEngineRef = Arc<FlowDualEngine>;
55
56pub struct FlowDualEngine {
61 streaming_engine: Arc<StreamingEngine>,
62 batching_engine: Arc<BatchingEngine>,
63 src_table2flow: RwLock<SrcTableToFlow>,
65 flow_metadata_manager: Arc<FlowMetadataManager>,
66 catalog_manager: Arc<dyn CatalogManager>,
67 check_task: tokio::sync::Mutex<Option<ConsistentCheckTask>>,
68 plugins: Plugins,
69 done_recovering: AtomicBool,
70}
71
72impl FlowDualEngine {
73 pub fn new(
74 streaming_engine: Arc<StreamingEngine>,
75 batching_engine: Arc<BatchingEngine>,
76 flow_metadata_manager: Arc<FlowMetadataManager>,
77 catalog_manager: Arc<dyn CatalogManager>,
78 plugins: Plugins,
79 ) -> Self {
80 Self {
81 streaming_engine,
82 batching_engine,
83 src_table2flow: RwLock::new(SrcTableToFlow::default()),
84 flow_metadata_manager,
85 catalog_manager,
86 check_task: Mutex::new(None),
87 plugins,
88 done_recovering: AtomicBool::new(false),
89 }
90 }
91
92 pub fn set_done_recovering(&self) {
95 info!("FlowDualEngine done recovering");
96 self.done_recovering
97 .store(true, std::sync::atomic::Ordering::Release);
98 }
99
100 pub fn is_recover_done(&self) -> bool {
102 self.done_recovering
103 .load(std::sync::atomic::Ordering::Acquire)
104 }
105
106 async fn wait_for_all_flow_recover(&self, waiting_req_cnt: usize) -> Result<(), Error> {
108 if self.is_recover_done() {
109 return Ok(());
110 }
111
112 warn!(
113 "FlowDualEngine is not done recovering, {} insert request waiting for recovery",
114 waiting_req_cnt
115 );
116 let mut retry = 0;
119 let max_retry = 3;
120 while retry < max_retry && !self.is_recover_done() {
121 warn!(
122 "FlowDualEngine is not done recovering, retry {} in 1s",
123 retry
124 );
125 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
126 retry += 1;
127 }
128 if retry == max_retry {
129 return FlowNotRecoveredSnafu.fail();
130 } else {
131 info!("FlowDualEngine is done recovering");
132 }
133 Ok(())
135 }
136
137 pub fn plugins(&self) -> &Plugins {
138 &self.plugins
139 }
140
141 pub fn is_distributed(&self) -> bool {
143 self.streaming_engine.node_id.is_some()
144 }
145
146 pub fn streaming_engine(&self) -> Arc<StreamingEngine> {
147 self.streaming_engine.clone()
148 }
149
150 pub fn batching_engine(&self) -> Arc<BatchingEngine> {
151 self.batching_engine.clone()
152 }
153
154 async fn wait_for_available_frontend(&self, timeout: std::time::Duration) -> Result<(), Error> {
158 if !self.is_distributed() {
159 return Ok(());
160 }
161 let frontend_client = self.batching_engine().frontend_client.clone();
162 let sleep_duration = std::time::Duration::from_millis(1_000);
163 let now = std::time::Instant::now();
164 loop {
165 let frontend_list = frontend_client.scan_for_frontend().await?;
166 if !frontend_list.is_empty() {
167 let fe_list = frontend_list
168 .iter()
169 .map(|(_, info)| &info.peer.addr)
170 .collect::<Vec<_>>();
171 info!("Available frontend found: {:?}", fe_list);
172 return Ok(());
173 }
174 let elapsed = now.elapsed();
175 tokio::time::sleep(sleep_duration).await;
176 info!("Waiting for available frontend, elapsed={:?}", elapsed);
177 if elapsed >= timeout {
178 return NoAvailableFrontendSnafu {
179 timeout,
180 context: "No available frontend found in cluster info",
181 }
182 .fail();
183 }
184 }
185 }
186
187 async fn try_sync_with_check_task(
191 &self,
192 flow_id: FlowId,
193 allow_drop: bool,
194 ) -> Result<(), Error> {
195 info!("Try to sync with check task for flow {}", flow_id);
197 let mut retry = 0;
198 let max_retry = 10;
199 while retry < max_retry {
201 if let Some(task) = self.check_task.lock().await.as_ref() {
202 task.trigger(false, allow_drop).await?;
203 break;
204 }
205 retry += 1;
206 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
207 }
208
209 if retry == max_retry {
210 error!(
211 "Can't sync with check task for flow {} with allow_drop={}",
212 flow_id, allow_drop
213 );
214 return SyncCheckTaskSnafu {
215 flow_id,
216 allow_drop,
217 }
218 .fail();
219 }
220 info!("Successfully sync with check task for flow {}", flow_id);
221
222 Ok(())
223 }
224
225 async fn check_flow_consistent(
228 &self,
229 allow_create: bool,
230 allow_drop: bool,
231 ) -> Result<(), Error> {
232 let nodeid = self.streaming_engine.node_id;
234 let should_exists: Vec<_> = if let Some(nodeid) = nodeid {
235 let to_be_recover = self
238 .flow_metadata_manager
239 .flownode_flow_manager()
240 .flows(nodeid.into())
241 .try_collect::<Vec<_>>()
242 .await
243 .context(ListFlowsSnafu {
244 id: Some(nodeid.into()),
245 })?;
246 to_be_recover.into_iter().map(|(id, _)| id).collect()
247 } else {
248 let all_catalogs = self
251 .catalog_manager
252 .catalog_names()
253 .await
254 .map_err(BoxedError::new)
255 .context(ExternalSnafu)?;
256 let mut all_flow_ids = vec![];
257 for catalog in all_catalogs {
258 let flows = self
259 .flow_metadata_manager
260 .flow_name_manager()
261 .flow_names(&catalog)
262 .await
263 .try_collect::<Vec<_>>()
264 .await
265 .map_err(BoxedError::new)
266 .context(ExternalSnafu)?;
267
268 all_flow_ids.extend(flows.into_iter().map(|(_, id)| id.flow_id()));
269 }
270 all_flow_ids
271 };
272 let should_exists = should_exists
273 .into_iter()
274 .map(|i| i as FlowId)
275 .collect::<HashSet<_>>();
276 let actual_exists = self.list_flows().await?.into_iter().collect::<HashSet<_>>();
277 let to_be_created = should_exists
278 .iter()
279 .filter(|id| !actual_exists.contains(id))
280 .collect::<Vec<_>>();
281 let to_be_dropped = actual_exists
282 .iter()
283 .filter(|id| !should_exists.contains(id))
284 .collect::<Vec<_>>();
285
286 if !to_be_created.is_empty() {
287 if allow_create {
288 info!(
289 "Recovering {} flows: {:?}",
290 to_be_created.len(),
291 to_be_created
292 );
293 let mut errors = vec![];
294 for flow_id in to_be_created.clone() {
295 let flow_id = *flow_id;
296 let info = self
297 .flow_metadata_manager
298 .flow_info_manager()
299 .get(flow_id as u32)
300 .await
301 .map_err(BoxedError::new)
302 .context(ExternalSnafu)?
303 .context(FlowNotFoundSnafu { id: flow_id })?;
304
305 let sink_table_name = [
306 info.sink_table_name().catalog_name.clone(),
307 info.sink_table_name().schema_name.clone(),
308 info.sink_table_name().table_name.clone(),
309 ];
310 let args = CreateFlowArgs {
311 flow_id,
312 sink_table_name,
313 source_table_ids: info.source_table_ids().to_vec(),
314 create_if_not_exists: true,
318 or_replace: true,
319 expire_after: info.expire_after(),
320 comment: Some(info.comment().clone()),
321 sql: info.raw_sql().clone(),
322 flow_options: info.options().clone(),
323 query_ctx: info
324 .query_context()
325 .clone()
326 .map(|ctx| {
327 ctx.try_into()
328 .map_err(BoxedError::new)
329 .context(ExternalSnafu)
330 })
331 .transpose()?
332 .or_else(|| {
335 Some(
336 QueryContextBuilder::default()
337 .current_catalog(info.catalog_name().to_string())
338 .build(),
339 )
340 }),
341 };
342 if let Err(err) = self
343 .create_flow(args)
344 .await
345 .map_err(BoxedError::new)
346 .with_context(|_| CreateFlowSnafu {
347 sql: info.raw_sql().clone(),
348 })
349 {
350 errors.push((flow_id, err));
351 }
352 }
353 if errors.is_empty() {
354 info!("Recover flows successfully, flows: {:?}", to_be_created);
355 }
356
357 for (flow_id, err) in errors {
358 warn!("Failed to recreate flow {}, err={:#?}", flow_id, err);
359 }
360 } else {
361 warn!(
362 "Flows do not exist in flownode for node {:?}, flow_ids={:?}",
363 nodeid, to_be_created
364 );
365 }
366 }
367 if !to_be_dropped.is_empty() {
368 if allow_drop {
369 info!("Dropping flows: {:?}", to_be_dropped);
370 let mut errors = vec![];
371 for flow_id in to_be_dropped {
372 let flow_id = *flow_id;
373 if let Err(err) = self.remove_flow(flow_id).await {
374 errors.push((flow_id, err));
375 }
376 }
377 for (flow_id, err) in errors {
378 warn!("Failed to drop flow {}, err={:#?}", flow_id, err);
379 }
380 } else {
381 warn!(
382 "Flows do not exist in metadata for node {:?}, flow_ids={:?}",
383 nodeid, to_be_dropped
384 );
385 }
386 }
387 Ok(())
388 }
389
390 pub async fn start_flow_consistent_check_task(self: &Arc<Self>) -> Result<(), Error> {
392 let mut check_task = self.check_task.lock().await;
393 ensure!(
394 check_task.is_none(),
395 IllegalCheckTaskStateSnafu {
396 reason: "Flow consistent check task already exists",
397 }
398 );
399 let task = ConsistentCheckTask::start_check_task(self).await?;
400 *check_task = Some(task);
401 Ok(())
402 }
403
404 pub async fn stop_flow_consistent_check_task(&self) -> Result<(), Error> {
405 info!("Stopping flow consistent check task");
406 let mut check_task = self.check_task.lock().await;
407
408 ensure!(
409 check_task.is_some(),
410 IllegalCheckTaskStateSnafu {
411 reason: "Flow consistent check task does not exist",
412 }
413 );
414
415 check_task.take().unwrap().stop().await?;
416 info!("Stopped flow consistent check task");
417 Ok(())
418 }
419
420 async fn flow_exist_in_metadata(&self, flow_id: FlowId) -> Result<bool, Error> {
422 self.flow_metadata_manager
423 .flow_info_manager()
424 .get(flow_id as u32)
425 .await
426 .map_err(BoxedError::new)
427 .context(ExternalSnafu)
428 .map(|info| info.is_some())
429 }
430}
431
432struct ConsistentCheckTask {
433 handle: JoinHandle<()>,
434 shutdown_tx: tokio::sync::mpsc::Sender<()>,
435 trigger_tx: tokio::sync::mpsc::Sender<(bool, bool, tokio::sync::oneshot::Sender<()>)>,
436}
437
438impl ConsistentCheckTask {
439 async fn start_check_task(engine: &Arc<FlowDualEngine>) -> Result<Self, Error> {
440 let engine = engine.clone();
441 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
442 let (trigger_tx, mut trigger_rx) =
443 tokio::sync::mpsc::channel::<(bool, bool, tokio::sync::oneshot::Sender<()>)>(10);
444 let handle = common_runtime::spawn_global(async move {
445 if let Err(err) = engine
447 .wait_for_available_frontend(FRONTEND_SCAN_TIMEOUT)
448 .await
449 {
450 warn!("No frontend is available yet:\n {err:?}");
451 }
452
453 let mut recover_retry = 0;
455 while let Err(err) = engine.check_flow_consistent(true, false).await {
456 recover_retry += 1;
457 error!(
458 "Failed to recover flows:\n {err:?}, retry {} in {}s",
459 recover_retry,
460 MIN_REFRESH_DURATION.as_secs()
461 );
462 tokio::time::sleep(MIN_REFRESH_DURATION).await;
463 }
464
465 engine.set_done_recovering();
466
467 let (mut allow_create, mut allow_drop) = (false, false);
469 let mut ret_signal: Option<tokio::sync::oneshot::Sender<()>> = None;
470 loop {
471 if let Err(err) = engine.check_flow_consistent(allow_create, allow_drop).await {
472 error!(err; "Failed to check flow consistent");
473 }
474 if let Some(done) = ret_signal.take() {
475 let _ = done.send(());
476 }
477 tokio::select! {
478 _ = rx.recv() => break,
479 incoming = trigger_rx.recv() => if let Some(incoming) = incoming {
480 (allow_create, allow_drop) = (incoming.0, incoming.1);
481 ret_signal = Some(incoming.2);
482 },
483 _ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
484 (allow_create, allow_drop) = (false, false);
485 },
486 }
487 }
488 });
489 Ok(ConsistentCheckTask {
490 handle,
491 shutdown_tx: tx,
492 trigger_tx,
493 })
494 }
495
496 async fn trigger(&self, allow_create: bool, allow_drop: bool) -> Result<(), Error> {
497 let (tx, rx) = tokio::sync::oneshot::channel();
498 self.trigger_tx
499 .send((allow_create, allow_drop, tx))
500 .await
501 .map_err(|_| {
502 IllegalCheckTaskStateSnafu {
503 reason: "Failed to send trigger signal",
504 }
505 .build()
506 })?;
507 rx.await.map_err(|_| {
508 IllegalCheckTaskStateSnafu {
509 reason: "Failed to receive trigger signal",
510 }
511 .build()
512 })?;
513 Ok(())
514 }
515
516 async fn stop(self) -> Result<(), Error> {
517 self.shutdown_tx.send(()).await.map_err(|_| {
518 IllegalCheckTaskStateSnafu {
519 reason: "Failed to send shutdown signal",
520 }
521 .build()
522 })?;
523 self.handle.abort();
525 Ok(())
526 }
527}
528
529#[derive(Default)]
530struct SrcTableToFlow {
531 stream: HashMap<TableId, HashSet<FlowId>>,
533 batch: HashMap<TableId, HashSet<FlowId>>,
535 flow_infos: HashMap<FlowId, (FlowType, Vec<TableId>)>,
537}
538
539impl SrcTableToFlow {
540 fn in_stream(&self, table_id: TableId) -> bool {
541 self.stream.contains_key(&table_id)
542 }
543 fn in_batch(&self, table_id: TableId) -> bool {
544 self.batch.contains_key(&table_id)
545 }
546 fn add_flow(&mut self, flow_id: FlowId, flow_type: FlowType, src_table_ids: Vec<TableId>) {
547 let mapping = match flow_type {
548 FlowType::Streaming => &mut self.stream,
549 FlowType::Batching => &mut self.batch,
550 };
551
552 for src_table in src_table_ids.clone() {
553 mapping
554 .entry(src_table)
555 .and_modify(|flows| {
556 flows.insert(flow_id);
557 })
558 .or_insert_with(|| {
559 let mut set = HashSet::new();
560 set.insert(flow_id);
561 set
562 });
563 }
564 self.flow_infos.insert(flow_id, (flow_type, src_table_ids));
565 }
566
567 fn remove_flow(&mut self, flow_id: FlowId) {
568 let mapping = match self.get_flow_type(flow_id) {
569 Some(FlowType::Streaming) => &mut self.stream,
570 Some(FlowType::Batching) => &mut self.batch,
571 None => return,
572 };
573 if let Some((_, src_table_ids)) = self.flow_infos.remove(&flow_id) {
574 for src_table in src_table_ids {
575 if let Some(flows) = mapping.get_mut(&src_table) {
576 flows.remove(&flow_id);
577 }
578 }
579 }
580 }
581
582 fn get_flow_type(&self, flow_id: FlowId) -> Option<FlowType> {
583 self.flow_infos
584 .get(&flow_id)
585 .map(|(flow_type, _)| flow_type)
586 .cloned()
587 }
588}
589
590impl FlowEngine for FlowDualEngine {
591 async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
592 let flow_type = args
593 .flow_options
594 .get(FlowType::FLOW_TYPE_KEY)
595 .map(|s| s.as_str());
596
597 let flow_type = match flow_type {
598 Some(FlowType::BATCHING) => FlowType::Batching,
599 Some(FlowType::STREAMING) => FlowType::Streaming,
600 None => FlowType::Batching,
601 Some(flow_type) => {
602 return InternalSnafu {
603 reason: format!("Invalid flow type: {}", flow_type),
604 }
605 .fail()
606 }
607 };
608
609 let flow_id = args.flow_id;
610 let src_table_ids = args.source_table_ids.clone();
611
612 let res = match flow_type {
613 FlowType::Batching => self.batching_engine.create_flow(args).await,
614 FlowType::Streaming => self.streaming_engine.create_flow(args).await,
615 }?;
616
617 self.src_table2flow
618 .write()
619 .await
620 .add_flow(flow_id, flow_type, src_table_ids);
621
622 Ok(res)
623 }
624
625 async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
626 let flow_type = self.src_table2flow.read().await.get_flow_type(flow_id);
627
628 match flow_type {
629 Some(FlowType::Batching) => self.batching_engine.remove_flow(flow_id).await,
630 Some(FlowType::Streaming) => self.streaming_engine.remove_flow(flow_id).await,
631 None => {
632 warn!(
636 "Flow {} is not exist in the underlying engine, but exist in metadata",
637 flow_id
638 );
639 self.try_sync_with_check_task(flow_id, true).await?;
640
641 Ok(())
642 }
643 }?;
644 self.src_table2flow.write().await.remove_flow(flow_id);
646 Ok(())
647 }
648
649 async fn flush_flow(&self, flow_id: FlowId) -> Result<usize, Error> {
650 self.try_sync_with_check_task(flow_id, false).await?;
652 let flow_type = self.src_table2flow.read().await.get_flow_type(flow_id);
653 match flow_type {
654 Some(FlowType::Batching) => self.batching_engine.flush_flow(flow_id).await,
655 Some(FlowType::Streaming) => self.streaming_engine.flush_flow(flow_id).await,
656 None => {
657 warn!(
658 "Currently flow={flow_id} doesn't exist in flownode, ignore flush_flow request"
659 );
660 Ok(0)
661 }
662 }
663 }
664
665 async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error> {
666 let flow_type = self.src_table2flow.read().await.get_flow_type(flow_id);
667 match flow_type {
669 Some(FlowType::Batching) => self.batching_engine.flow_exist(flow_id).await,
670 Some(FlowType::Streaming) => self.streaming_engine.flow_exist(flow_id).await,
671 None => Ok(false),
672 }
673 }
674
675 async fn list_flows(&self) -> Result<impl IntoIterator<Item = FlowId>, Error> {
676 let stream_flows = self.streaming_engine.list_flows().await?;
677 let batch_flows = self.batching_engine.list_flows().await?;
678
679 Ok(stream_flows.into_iter().chain(batch_flows))
680 }
681
682 async fn handle_flow_inserts(
683 &self,
684 request: api::v1::region::InsertRequests,
685 ) -> Result<(), Error> {
686 self.wait_for_all_flow_recover(request.requests.len())
687 .await?;
688 let mut to_stream_engine = Vec::with_capacity(request.requests.len());
690 let mut to_batch_engine = request.requests;
691
692 {
693 let src_table2flow = self.src_table2flow.read().await;
695 to_batch_engine.retain(|req| {
696 let region_id = RegionId::from(req.region_id);
697 let table_id = region_id.table_id();
698 let is_in_stream = src_table2flow.in_stream(table_id);
699 let is_in_batch = src_table2flow.in_batch(table_id);
700 if is_in_stream {
701 to_stream_engine.push(req.clone());
702 }
703 if is_in_batch {
704 return true;
705 }
706 if !is_in_batch && !is_in_stream {
707 warn!("Table {} is not any flow's source table", table_id)
709 }
710 false
711 });
712 }
715
716 let streaming_engine = self.streaming_engine.clone();
717 let stream_handler: JoinHandle<Result<(), Error>> =
718 common_runtime::spawn_global(async move {
719 streaming_engine
720 .handle_flow_inserts(api::v1::region::InsertRequests {
721 requests: to_stream_engine,
722 })
723 .await?;
724 Ok(())
725 });
726 self.batching_engine
727 .handle_flow_inserts(api::v1::region::InsertRequests {
728 requests: to_batch_engine,
729 })
730 .await?;
731 stream_handler.await.context(JoinTaskSnafu)??;
732
733 Ok(())
734 }
735}
736
737#[async_trait::async_trait]
738impl common_meta::node_manager::Flownode for FlowDualEngine {
739 async fn handle(&self, request: FlowRequest) -> MetaResult<FlowResponse> {
740 let query_ctx = request
741 .header
742 .and_then(|h| h.query_context)
743 .map(|ctx| ctx.into());
744 match request.body {
745 Some(flow_request::Body::Create(CreateRequest {
746 flow_id: Some(task_id),
747 source_table_ids,
748 sink_table_name: Some(sink_table_name),
749 create_if_not_exists,
750 expire_after,
751 comment,
752 sql,
753 flow_options,
754 or_replace,
755 })) => {
756 let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec();
757 let sink_table_name = [
758 sink_table_name.catalog_name,
759 sink_table_name.schema_name,
760 sink_table_name.table_name,
761 ];
762 let expire_after = expire_after.map(|e| e.value);
763 let args = CreateFlowArgs {
764 flow_id: task_id.id as u64,
765 sink_table_name,
766 source_table_ids,
767 create_if_not_exists,
768 or_replace,
769 expire_after,
770 comment: Some(comment),
771 sql: sql.clone(),
772 flow_options,
773 query_ctx,
774 };
775 let ret = self
776 .create_flow(args)
777 .await
778 .map_err(BoxedError::new)
779 .with_context(|_| CreateFlowSnafu { sql: sql.clone() })
780 .map_err(to_meta_err(snafu::location!()))?;
781 METRIC_FLOW_TASK_COUNT.inc();
782 Ok(FlowResponse {
783 affected_flows: ret
784 .map(|id| greptime_proto::v1::FlowId { id: id as u32 })
785 .into_iter()
786 .collect_vec(),
787 ..Default::default()
788 })
789 }
790 Some(flow_request::Body::Drop(DropRequest {
791 flow_id: Some(flow_id),
792 })) => {
793 self.remove_flow(flow_id.id as u64)
794 .await
795 .map_err(to_meta_err(snafu::location!()))?;
796 METRIC_FLOW_TASK_COUNT.dec();
797 Ok(Default::default())
798 }
799 Some(flow_request::Body::Flush(FlushFlow {
800 flow_id: Some(flow_id),
801 })) => {
802 let row = self
803 .flush_flow(flow_id.id as u64)
804 .await
805 .map_err(to_meta_err(snafu::location!()))?;
806 Ok(FlowResponse {
807 affected_flows: vec![flow_id],
808 affected_rows: row as u64,
809 ..Default::default()
810 })
811 }
812 other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
813 }
814 }
815
816 async fn handle_inserts(&self, request: InsertRequests) -> MetaResult<FlowResponse> {
817 FlowEngine::handle_flow_inserts(self, request)
818 .await
819 .map(|_| Default::default())
820 .map_err(to_meta_err(snafu::location!()))
821 }
822}
823
824fn to_meta_err(
826 location: snafu::Location,
827) -> impl FnOnce(crate::error::Error) -> common_meta::error::Error {
828 move |err: crate::error::Error| -> common_meta::error::Error {
829 match err {
830 crate::error::Error::FlowNotFound { id, .. } => {
831 common_meta::error::Error::FlowNotFound {
832 flow_name: format!("flow_id={id}"),
833 location,
834 }
835 }
836 _ => common_meta::error::Error::External {
837 location,
838 source: BoxedError::new(err),
839 },
840 }
841 }
842}
843
844#[async_trait::async_trait]
845impl common_meta::node_manager::Flownode for StreamingEngine {
846 async fn handle(&self, request: FlowRequest) -> MetaResult<FlowResponse> {
847 let query_ctx = request
848 .header
849 .and_then(|h| h.query_context)
850 .map(|ctx| ctx.into());
851 match request.body {
852 Some(flow_request::Body::Create(CreateRequest {
853 flow_id: Some(task_id),
854 source_table_ids,
855 sink_table_name: Some(sink_table_name),
856 create_if_not_exists,
857 expire_after,
858 comment,
859 sql,
860 flow_options,
861 or_replace,
862 })) => {
863 let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec();
864 let sink_table_name = [
865 sink_table_name.catalog_name,
866 sink_table_name.schema_name,
867 sink_table_name.table_name,
868 ];
869 let expire_after = expire_after.map(|e| e.value);
870 let args = CreateFlowArgs {
871 flow_id: task_id.id as u64,
872 sink_table_name,
873 source_table_ids,
874 create_if_not_exists,
875 or_replace,
876 expire_after,
877 comment: Some(comment),
878 sql: sql.clone(),
879 flow_options,
880 query_ctx,
881 };
882 let ret = self
883 .create_flow(args)
884 .await
885 .map_err(BoxedError::new)
886 .with_context(|_| CreateFlowSnafu { sql: sql.clone() })
887 .map_err(to_meta_err(snafu::location!()))?;
888 METRIC_FLOW_TASK_COUNT.inc();
889 Ok(FlowResponse {
890 affected_flows: ret
891 .map(|id| greptime_proto::v1::FlowId { id: id as u32 })
892 .into_iter()
893 .collect_vec(),
894 ..Default::default()
895 })
896 }
897 Some(flow_request::Body::Drop(DropRequest {
898 flow_id: Some(flow_id),
899 })) => {
900 self.remove_flow(flow_id.id as u64)
901 .await
902 .map_err(to_meta_err(snafu::location!()))?;
903 METRIC_FLOW_TASK_COUNT.dec();
904 Ok(Default::default())
905 }
906 Some(flow_request::Body::Flush(FlushFlow {
907 flow_id: Some(flow_id),
908 })) => {
909 let row = self
910 .flush_flow_inner(flow_id.id as u64)
911 .await
912 .map_err(to_meta_err(snafu::location!()))?;
913 Ok(FlowResponse {
914 affected_flows: vec![flow_id],
915 affected_rows: row as u64,
916 ..Default::default()
917 })
918 }
919 other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
920 }
921 }
922
923 async fn handle_inserts(&self, request: InsertRequests) -> MetaResult<FlowResponse> {
924 self.handle_inserts_inner(request)
925 .await
926 .map(|_| Default::default())
927 .map_err(to_meta_err(snafu::location!()))
928 }
929}
930
931impl FlowEngine for StreamingEngine {
932 async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
933 self.create_flow_inner(args).await
934 }
935
936 async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
937 self.remove_flow_inner(flow_id).await
938 }
939
940 async fn flush_flow(&self, flow_id: FlowId) -> Result<usize, Error> {
941 self.flush_flow_inner(flow_id).await
942 }
943
944 async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error> {
945 self.flow_exist_inner(flow_id).await
946 }
947
948 async fn list_flows(&self) -> Result<impl IntoIterator<Item = FlowId>, Error> {
949 Ok(self
950 .flow_err_collectors
951 .read()
952 .await
953 .keys()
954 .cloned()
955 .collect::<Vec<_>>())
956 }
957
958 async fn handle_flow_inserts(
959 &self,
960 request: api::v1::region::InsertRequests,
961 ) -> Result<(), Error> {
962 self.handle_inserts_inner(request).await
963 }
964}
965
966#[derive(Debug, Clone)]
968enum FetchFromRow {
969 Idx(usize),
970 Default(Value),
971}
972
973impl FetchFromRow {
974 fn fetch(&self, row: &repr::Row) -> Value {
976 match self {
977 FetchFromRow::Idx(idx) => row.get(*idx).unwrap().clone(),
978 FetchFromRow::Default(v) => v.clone(),
979 }
980 }
981}
982
983impl StreamingEngine {
984 async fn handle_inserts_inner(
985 &self,
986 request: InsertRequests,
987 ) -> std::result::Result<(), Error> {
988 let _flush_lock = self.flush_lock.try_read();
992 for write_request in request.requests {
993 let region_id = write_request.region_id;
994 let table_id = RegionId::from(region_id).table_id();
995
996 let (insert_schema, rows_proto) = write_request
997 .rows
998 .map(|r| (r.schema, r.rows))
999 .unwrap_or_default();
1000
1001 let now = self.tick_manager.tick();
1003
1004 let (table_types, fetch_order) = {
1005 let ctx = self.node_context.read().await;
1006
1007 let table_schema = ctx.table_source.table_from_id(&table_id).await?;
1009 let default_vals = table_schema
1010 .default_values
1011 .iter()
1012 .zip(table_schema.relation_desc.typ().column_types.iter())
1013 .map(|(v, ty)| {
1014 v.as_ref().and_then(|v| {
1015 match v.create_default(ty.scalar_type(), ty.nullable()) {
1016 Ok(v) => Some(v),
1017 Err(err) => {
1018 common_telemetry::error!(err; "Failed to create default value");
1019 None
1020 }
1021 }
1022 })
1023 })
1024 .collect_vec();
1025
1026 let table_types = table_schema
1027 .relation_desc
1028 .typ()
1029 .column_types
1030 .clone()
1031 .into_iter()
1032 .map(|t| t.scalar_type)
1033 .collect_vec();
1034 let table_col_names = table_schema.relation_desc.names;
1035 let table_col_names = table_col_names
1036 .iter().enumerate()
1037 .map(|(idx,name)| match name {
1038 Some(name) => Ok(name.clone()),
1039 None => InternalSnafu {
1040 reason: format!("Expect column {idx} of table id={table_id} to have name in table schema, found None"),
1041 }
1042 .fail(),
1043 })
1044 .collect::<Result<Vec<_>, _>>()?;
1045 let name_to_col = HashMap::<_, _>::from_iter(
1046 insert_schema
1047 .iter()
1048 .enumerate()
1049 .map(|(i, name)| (&name.column_name, i)),
1050 );
1051
1052 let fetch_order: Vec<FetchFromRow> = table_col_names
1053 .iter()
1054 .zip(default_vals.into_iter())
1055 .map(|(col_name, col_default_val)| {
1056 name_to_col
1057 .get(col_name)
1058 .copied()
1059 .map(FetchFromRow::Idx)
1060 .or_else(|| col_default_val.clone().map(FetchFromRow::Default))
1061 .with_context(|| UnexpectedSnafu {
1062 reason: format!(
1063 "Column not found: {}, default_value: {:?}",
1064 col_name, col_default_val
1065 ),
1066 })
1067 })
1068 .try_collect()?;
1069
1070 trace!("Reordering columns: {:?}", fetch_order);
1071 (table_types, fetch_order)
1072 };
1073
1074 let rows: Vec<DiffRow> = rows_proto
1076 .into_iter()
1077 .map(|r| {
1078 let r = repr::Row::from(r);
1079 let reordered = fetch_order.iter().map(|i| i.fetch(&r)).collect_vec();
1080 repr::Row::new(reordered)
1081 })
1082 .map(|r| (r, now, 1))
1083 .collect_vec();
1084 if let Err(err) = self
1085 .handle_write_request(region_id.into(), rows, &table_types)
1086 .await
1087 {
1088 let err = BoxedError::new(err);
1089 let flow_ids = self
1090 .node_context
1091 .read()
1092 .await
1093 .get_flow_ids(table_id)
1094 .into_iter()
1095 .flatten()
1096 .cloned()
1097 .collect_vec();
1098 let err = InsertIntoFlowSnafu {
1099 region_id,
1100 flow_ids,
1101 }
1102 .into_error(err);
1103 common_telemetry::error!(err; "Failed to handle write request");
1104 return Err(err);
1105 }
1106 }
1107 Ok(())
1108 }
1109}