1use std::collections::{HashMap, HashSet};
17use std::sync::atomic::AtomicBool;
18use std::sync::Arc;
19
20use api::v1::flow::{
21 flow_request, CreateRequest, DirtyWindowRequests, DropRequest, FlowRequest, FlowResponse,
22 FlushFlow,
23};
24use api::v1::region::InsertRequests;
25use catalog::CatalogManager;
26use common_base::Plugins;
27use common_error::ext::BoxedError;
28use common_meta::ddl::create_flow::FlowType;
29use common_meta::error::Result as MetaResult;
30use common_meta::key::flow::FlowMetadataManager;
31use common_runtime::JoinHandle;
32use common_telemetry::{error, info, trace, warn};
33use datatypes::value::Value;
34use futures::TryStreamExt;
35use greptime_proto::v1::flow::DirtyWindowRequest;
36use itertools::Itertools;
37use session::context::QueryContextBuilder;
38use snafu::{ensure, IntoError, OptionExt, ResultExt};
39use store_api::storage::{RegionId, TableId};
40use tokio::sync::{Mutex, RwLock};
41
42use crate::adapter::{CreateFlowArgs, StreamingEngine};
43use crate::batching_mode::engine::BatchingEngine;
44use crate::batching_mode::{FRONTEND_SCAN_TIMEOUT, MIN_REFRESH_DURATION};
45use crate::engine::FlowEngine;
46use crate::error::{
47 CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, FlowNotRecoveredSnafu,
48 IllegalCheckTaskStateSnafu, InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu,
49 NoAvailableFrontendSnafu, SyncCheckTaskSnafu, UnexpectedSnafu,
50};
51use crate::metrics::{METRIC_FLOW_ROWS, METRIC_FLOW_TASK_COUNT};
52use crate::repr::{self, DiffRow};
53use crate::{Error, FlowId};
54
55pub type FlowDualEngineRef = Arc<FlowDualEngine>;
57
58pub struct FlowDualEngine {
63 streaming_engine: Arc<StreamingEngine>,
64 batching_engine: Arc<BatchingEngine>,
65 src_table2flow: RwLock<SrcTableToFlow>,
67 flow_metadata_manager: Arc<FlowMetadataManager>,
68 catalog_manager: Arc<dyn CatalogManager>,
69 check_task: tokio::sync::Mutex<Option<ConsistentCheckTask>>,
70 plugins: Plugins,
71 done_recovering: AtomicBool,
72}
73
74impl FlowDualEngine {
75 pub fn new(
76 streaming_engine: Arc<StreamingEngine>,
77 batching_engine: Arc<BatchingEngine>,
78 flow_metadata_manager: Arc<FlowMetadataManager>,
79 catalog_manager: Arc<dyn CatalogManager>,
80 plugins: Plugins,
81 ) -> Self {
82 Self {
83 streaming_engine,
84 batching_engine,
85 src_table2flow: RwLock::new(SrcTableToFlow::default()),
86 flow_metadata_manager,
87 catalog_manager,
88 check_task: Mutex::new(None),
89 plugins,
90 done_recovering: AtomicBool::new(false),
91 }
92 }
93
94 pub fn set_done_recovering(&self) {
97 info!("FlowDualEngine done recovering");
98 self.done_recovering
99 .store(true, std::sync::atomic::Ordering::Release);
100 }
101
102 pub fn is_recover_done(&self) -> bool {
104 self.done_recovering
105 .load(std::sync::atomic::Ordering::Acquire)
106 }
107
108 async fn wait_for_all_flow_recover(&self, waiting_req_cnt: usize) -> Result<(), Error> {
110 if self.is_recover_done() {
111 return Ok(());
112 }
113
114 warn!(
115 "FlowDualEngine is not done recovering, {} insert request waiting for recovery",
116 waiting_req_cnt
117 );
118 let mut retry = 0;
121 let max_retry = 3;
122 while retry < max_retry && !self.is_recover_done() {
123 warn!(
124 "FlowDualEngine is not done recovering, retry {} in 1s",
125 retry
126 );
127 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
128 retry += 1;
129 }
130 if retry == max_retry {
131 return FlowNotRecoveredSnafu.fail();
132 } else {
133 info!("FlowDualEngine is done recovering");
134 }
135 Ok(())
137 }
138
139 pub fn plugins(&self) -> &Plugins {
140 &self.plugins
141 }
142
143 pub fn is_distributed(&self) -> bool {
145 self.streaming_engine.node_id.is_some()
146 }
147
148 pub fn streaming_engine(&self) -> Arc<StreamingEngine> {
149 self.streaming_engine.clone()
150 }
151
152 pub fn batching_engine(&self) -> Arc<BatchingEngine> {
153 self.batching_engine.clone()
154 }
155
156 async fn wait_for_available_frontend(&self, timeout: std::time::Duration) -> Result<(), Error> {
160 if !self.is_distributed() {
161 return Ok(());
162 }
163 let frontend_client = self.batching_engine().frontend_client.clone();
164 let sleep_duration = std::time::Duration::from_millis(1_000);
165 let now = std::time::Instant::now();
166 loop {
167 let frontend_list = frontend_client.scan_for_frontend().await?;
168 if !frontend_list.is_empty() {
169 let fe_list = frontend_list
170 .iter()
171 .map(|(_, info)| &info.peer.addr)
172 .collect::<Vec<_>>();
173 info!("Available frontend found: {:?}", fe_list);
174 return Ok(());
175 }
176 let elapsed = now.elapsed();
177 tokio::time::sleep(sleep_duration).await;
178 info!("Waiting for available frontend, elapsed={:?}", elapsed);
179 if elapsed >= timeout {
180 return NoAvailableFrontendSnafu {
181 timeout,
182 context: "No available frontend found in cluster info",
183 }
184 .fail();
185 }
186 }
187 }
188
189 async fn try_sync_with_check_task(
193 &self,
194 flow_id: FlowId,
195 allow_drop: bool,
196 ) -> Result<(), Error> {
197 info!("Try to sync with check task for flow {}", flow_id);
199 let mut retry = 0;
200 let max_retry = 10;
201 while retry < max_retry {
203 if let Some(task) = self.check_task.lock().await.as_ref() {
204 task.trigger(false, allow_drop).await?;
205 break;
206 }
207 retry += 1;
208 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
209 }
210
211 if retry == max_retry {
212 error!(
213 "Can't sync with check task for flow {} with allow_drop={}",
214 flow_id, allow_drop
215 );
216 return SyncCheckTaskSnafu {
217 flow_id,
218 allow_drop,
219 }
220 .fail();
221 }
222 info!("Successfully sync with check task for flow {}", flow_id);
223
224 Ok(())
225 }
226
227 async fn check_flow_consistent(
230 &self,
231 allow_create: bool,
232 allow_drop: bool,
233 ) -> Result<(), Error> {
234 let nodeid = self.streaming_engine.node_id;
236 let should_exists: Vec<_> = if let Some(nodeid) = nodeid {
237 let to_be_recover = self
240 .flow_metadata_manager
241 .flownode_flow_manager()
242 .flows(nodeid.into())
243 .try_collect::<Vec<_>>()
244 .await
245 .context(ListFlowsSnafu {
246 id: Some(nodeid.into()),
247 })?;
248 to_be_recover.into_iter().map(|(id, _)| id).collect()
249 } else {
250 let all_catalogs = self
253 .catalog_manager
254 .catalog_names()
255 .await
256 .map_err(BoxedError::new)
257 .context(ExternalSnafu)?;
258 let mut all_flow_ids = vec![];
259 for catalog in all_catalogs {
260 let flows = self
261 .flow_metadata_manager
262 .flow_name_manager()
263 .flow_names(&catalog)
264 .await
265 .try_collect::<Vec<_>>()
266 .await
267 .map_err(BoxedError::new)
268 .context(ExternalSnafu)?;
269
270 all_flow_ids.extend(flows.into_iter().map(|(_, id)| id.flow_id()));
271 }
272 all_flow_ids
273 };
274 let should_exists = should_exists
275 .into_iter()
276 .map(|i| i as FlowId)
277 .collect::<HashSet<_>>();
278 let actual_exists = self.list_flows().await?.into_iter().collect::<HashSet<_>>();
279 let to_be_created = should_exists
280 .iter()
281 .filter(|id| !actual_exists.contains(id))
282 .collect::<Vec<_>>();
283 let to_be_dropped = actual_exists
284 .iter()
285 .filter(|id| !should_exists.contains(id))
286 .collect::<Vec<_>>();
287
288 if !to_be_created.is_empty() {
289 if allow_create {
290 info!(
291 "Recovering {} flows: {:?}",
292 to_be_created.len(),
293 to_be_created
294 );
295 let mut errors = vec![];
296 for flow_id in to_be_created.clone() {
297 let flow_id = *flow_id;
298 let info = self
299 .flow_metadata_manager
300 .flow_info_manager()
301 .get(flow_id as u32)
302 .await
303 .map_err(BoxedError::new)
304 .context(ExternalSnafu)?
305 .context(FlowNotFoundSnafu { id: flow_id })?;
306
307 let sink_table_name = [
308 info.sink_table_name().catalog_name.clone(),
309 info.sink_table_name().schema_name.clone(),
310 info.sink_table_name().table_name.clone(),
311 ];
312 let args = CreateFlowArgs {
313 flow_id,
314 sink_table_name,
315 source_table_ids: info.source_table_ids().to_vec(),
316 create_if_not_exists: true,
320 or_replace: true,
321 expire_after: info.expire_after(),
322 comment: Some(info.comment().clone()),
323 sql: info.raw_sql().clone(),
324 flow_options: info.options().clone(),
325 query_ctx: info
326 .query_context()
327 .clone()
328 .map(|ctx| {
329 ctx.try_into()
330 .map_err(BoxedError::new)
331 .context(ExternalSnafu)
332 })
333 .transpose()?
334 .or_else(|| {
337 Some(
338 QueryContextBuilder::default()
339 .current_catalog(info.catalog_name().to_string())
340 .build(),
341 )
342 }),
343 };
344 if let Err(err) = self
345 .create_flow(args)
346 .await
347 .map_err(BoxedError::new)
348 .with_context(|_| CreateFlowSnafu {
349 sql: info.raw_sql().clone(),
350 })
351 {
352 errors.push((flow_id, err));
353 }
354 }
355 if errors.is_empty() {
356 info!("Recover flows successfully, flows: {:?}", to_be_created);
357 }
358
359 for (flow_id, err) in errors {
360 warn!("Failed to recreate flow {}, err={:#?}", flow_id, err);
361 }
362 } else {
363 warn!(
364 "Flows do not exist in flownode for node {:?}, flow_ids={:?}",
365 nodeid, to_be_created
366 );
367 }
368 }
369 if !to_be_dropped.is_empty() {
370 if allow_drop {
371 info!("Dropping flows: {:?}", to_be_dropped);
372 let mut errors = vec![];
373 for flow_id in to_be_dropped {
374 let flow_id = *flow_id;
375 if let Err(err) = self.remove_flow(flow_id).await {
376 errors.push((flow_id, err));
377 }
378 }
379 for (flow_id, err) in errors {
380 warn!("Failed to drop flow {}, err={:#?}", flow_id, err);
381 }
382 } else {
383 warn!(
384 "Flows do not exist in metadata for node {:?}, flow_ids={:?}",
385 nodeid, to_be_dropped
386 );
387 }
388 }
389 Ok(())
390 }
391
392 pub async fn start_flow_consistent_check_task(self: &Arc<Self>) -> Result<(), Error> {
394 let mut check_task = self.check_task.lock().await;
395 ensure!(
396 check_task.is_none(),
397 IllegalCheckTaskStateSnafu {
398 reason: "Flow consistent check task already exists",
399 }
400 );
401 let task = ConsistentCheckTask::start_check_task(self).await?;
402 *check_task = Some(task);
403 Ok(())
404 }
405
406 pub async fn stop_flow_consistent_check_task(&self) -> Result<(), Error> {
407 info!("Stopping flow consistent check task");
408 let mut check_task = self.check_task.lock().await;
409
410 ensure!(
411 check_task.is_some(),
412 IllegalCheckTaskStateSnafu {
413 reason: "Flow consistent check task does not exist",
414 }
415 );
416
417 check_task.take().unwrap().stop().await?;
418 info!("Stopped flow consistent check task");
419 Ok(())
420 }
421
422 async fn flow_exist_in_metadata(&self, flow_id: FlowId) -> Result<bool, Error> {
424 self.flow_metadata_manager
425 .flow_info_manager()
426 .get(flow_id as u32)
427 .await
428 .map_err(BoxedError::new)
429 .context(ExternalSnafu)
430 .map(|info| info.is_some())
431 }
432}
433
434struct ConsistentCheckTask {
435 handle: JoinHandle<()>,
436 shutdown_tx: tokio::sync::mpsc::Sender<()>,
437 trigger_tx: tokio::sync::mpsc::Sender<(bool, bool, tokio::sync::oneshot::Sender<()>)>,
438}
439
440impl ConsistentCheckTask {
441 async fn start_check_task(engine: &Arc<FlowDualEngine>) -> Result<Self, Error> {
442 let engine = engine.clone();
443 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
444 let (trigger_tx, mut trigger_rx) =
445 tokio::sync::mpsc::channel::<(bool, bool, tokio::sync::oneshot::Sender<()>)>(10);
446 let handle = common_runtime::spawn_global(async move {
447 if let Err(err) = engine
449 .wait_for_available_frontend(FRONTEND_SCAN_TIMEOUT)
450 .await
451 {
452 warn!("No frontend is available yet:\n {err:?}");
453 }
454
455 let mut recover_retry = 0;
457 while let Err(err) = engine.check_flow_consistent(true, false).await {
458 recover_retry += 1;
459 error!(
460 "Failed to recover flows:\n {err:?}, retry {} in {}s",
461 recover_retry,
462 MIN_REFRESH_DURATION.as_secs()
463 );
464 tokio::time::sleep(MIN_REFRESH_DURATION).await;
465 }
466
467 engine.set_done_recovering();
468
469 let (mut allow_create, mut allow_drop) = (false, false);
471 let mut ret_signal: Option<tokio::sync::oneshot::Sender<()>> = None;
472 loop {
473 if let Err(err) = engine.check_flow_consistent(allow_create, allow_drop).await {
474 error!(err; "Failed to check flow consistent");
475 }
476 if let Some(done) = ret_signal.take() {
477 let _ = done.send(());
478 }
479 tokio::select! {
480 _ = rx.recv() => break,
481 incoming = trigger_rx.recv() => if let Some(incoming) = incoming {
482 (allow_create, allow_drop) = (incoming.0, incoming.1);
483 ret_signal = Some(incoming.2);
484 },
485 _ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
486 (allow_create, allow_drop) = (false, false);
487 },
488 }
489 }
490 });
491 Ok(ConsistentCheckTask {
492 handle,
493 shutdown_tx: tx,
494 trigger_tx,
495 })
496 }
497
498 async fn trigger(&self, allow_create: bool, allow_drop: bool) -> Result<(), Error> {
499 let (tx, rx) = tokio::sync::oneshot::channel();
500 self.trigger_tx
501 .send((allow_create, allow_drop, tx))
502 .await
503 .map_err(|_| {
504 IllegalCheckTaskStateSnafu {
505 reason: "Failed to send trigger signal",
506 }
507 .build()
508 })?;
509 rx.await.map_err(|_| {
510 IllegalCheckTaskStateSnafu {
511 reason: "Failed to receive trigger signal",
512 }
513 .build()
514 })?;
515 Ok(())
516 }
517
518 async fn stop(self) -> Result<(), Error> {
519 self.shutdown_tx.send(()).await.map_err(|_| {
520 IllegalCheckTaskStateSnafu {
521 reason: "Failed to send shutdown signal",
522 }
523 .build()
524 })?;
525 self.handle.abort();
527 Ok(())
528 }
529}
530
531#[derive(Default)]
532struct SrcTableToFlow {
533 stream: HashMap<TableId, HashSet<FlowId>>,
535 batch: HashMap<TableId, HashSet<FlowId>>,
537 flow_infos: HashMap<FlowId, (FlowType, Vec<TableId>)>,
539}
540
541impl SrcTableToFlow {
542 fn in_stream(&self, table_id: TableId) -> bool {
543 self.stream.contains_key(&table_id)
544 }
545 fn in_batch(&self, table_id: TableId) -> bool {
546 self.batch.contains_key(&table_id)
547 }
548 fn add_flow(&mut self, flow_id: FlowId, flow_type: FlowType, src_table_ids: Vec<TableId>) {
549 let mapping = match flow_type {
550 FlowType::Streaming => &mut self.stream,
551 FlowType::Batching => &mut self.batch,
552 };
553
554 for src_table in src_table_ids.clone() {
555 mapping
556 .entry(src_table)
557 .and_modify(|flows| {
558 flows.insert(flow_id);
559 })
560 .or_insert_with(|| {
561 let mut set = HashSet::new();
562 set.insert(flow_id);
563 set
564 });
565 }
566 self.flow_infos.insert(flow_id, (flow_type, src_table_ids));
567 }
568
569 fn remove_flow(&mut self, flow_id: FlowId) {
570 let mapping = match self.get_flow_type(flow_id) {
571 Some(FlowType::Streaming) => &mut self.stream,
572 Some(FlowType::Batching) => &mut self.batch,
573 None => return,
574 };
575 if let Some((_, src_table_ids)) = self.flow_infos.remove(&flow_id) {
576 for src_table in src_table_ids {
577 if let Some(flows) = mapping.get_mut(&src_table) {
578 flows.remove(&flow_id);
579 }
580 }
581 }
582 }
583
584 fn get_flow_type(&self, flow_id: FlowId) -> Option<FlowType> {
585 self.flow_infos
586 .get(&flow_id)
587 .map(|(flow_type, _)| flow_type)
588 .cloned()
589 }
590}
591
592impl FlowEngine for FlowDualEngine {
593 async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
594 let flow_type = args
595 .flow_options
596 .get(FlowType::FLOW_TYPE_KEY)
597 .map(|s| s.as_str());
598
599 let flow_type = match flow_type {
600 Some(FlowType::BATCHING) => FlowType::Batching,
601 Some(FlowType::STREAMING) => FlowType::Streaming,
602 None => FlowType::Batching,
603 Some(flow_type) => {
604 return InternalSnafu {
605 reason: format!("Invalid flow type: {}", flow_type),
606 }
607 .fail()
608 }
609 };
610
611 let flow_id = args.flow_id;
612 let src_table_ids = args.source_table_ids.clone();
613
614 let res = match flow_type {
615 FlowType::Batching => self.batching_engine.create_flow(args).await,
616 FlowType::Streaming => self.streaming_engine.create_flow(args).await,
617 }?;
618
619 self.src_table2flow
620 .write()
621 .await
622 .add_flow(flow_id, flow_type, src_table_ids);
623
624 Ok(res)
625 }
626
627 async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
628 let flow_type = self.src_table2flow.read().await.get_flow_type(flow_id);
629
630 match flow_type {
631 Some(FlowType::Batching) => self.batching_engine.remove_flow(flow_id).await,
632 Some(FlowType::Streaming) => self.streaming_engine.remove_flow(flow_id).await,
633 None => {
634 warn!(
638 "Flow {} is not exist in the underlying engine, but exist in metadata",
639 flow_id
640 );
641 self.try_sync_with_check_task(flow_id, true).await?;
642
643 Ok(())
644 }
645 }?;
646 self.src_table2flow.write().await.remove_flow(flow_id);
648 Ok(())
649 }
650
651 async fn flush_flow(&self, flow_id: FlowId) -> Result<usize, Error> {
652 self.try_sync_with_check_task(flow_id, false).await?;
654 let flow_type = self.src_table2flow.read().await.get_flow_type(flow_id);
655 match flow_type {
656 Some(FlowType::Batching) => self.batching_engine.flush_flow(flow_id).await,
657 Some(FlowType::Streaming) => self.streaming_engine.flush_flow(flow_id).await,
658 None => {
659 warn!(
660 "Currently flow={flow_id} doesn't exist in flownode, ignore flush_flow request"
661 );
662 Ok(0)
663 }
664 }
665 }
666
667 async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error> {
668 let flow_type = self.src_table2flow.read().await.get_flow_type(flow_id);
669 match flow_type {
671 Some(FlowType::Batching) => self.batching_engine.flow_exist(flow_id).await,
672 Some(FlowType::Streaming) => self.streaming_engine.flow_exist(flow_id).await,
673 None => Ok(false),
674 }
675 }
676
677 async fn list_flows(&self) -> Result<impl IntoIterator<Item = FlowId>, Error> {
678 let stream_flows = self.streaming_engine.list_flows().await?;
679 let batch_flows = self.batching_engine.list_flows().await?;
680
681 Ok(stream_flows.into_iter().chain(batch_flows))
682 }
683
684 async fn handle_flow_inserts(
685 &self,
686 request: api::v1::region::InsertRequests,
687 ) -> Result<(), Error> {
688 self.wait_for_all_flow_recover(request.requests.len())
689 .await?;
690 let mut to_stream_engine = Vec::with_capacity(request.requests.len());
692 let mut to_batch_engine = request.requests;
693
694 let mut batching_row_cnt = 0;
695 let mut streaming_row_cnt = 0;
696
697 {
698 let src_table2flow = self.src_table2flow.read().await;
700 to_batch_engine.retain(|req| {
701 let region_id = RegionId::from(req.region_id);
702 let table_id = region_id.table_id();
703 let is_in_stream = src_table2flow.in_stream(table_id);
704 let is_in_batch = src_table2flow.in_batch(table_id);
705 if is_in_stream {
706 streaming_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
707 to_stream_engine.push(req.clone());
708 }
709 if is_in_batch {
710 batching_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
711 return true;
712 }
713 if !is_in_batch && !is_in_stream {
714 warn!("Table {} is not any flow's source table", table_id)
716 }
717 false
718 });
719 }
722
723 METRIC_FLOW_ROWS
724 .with_label_values(&["in-streaming"])
725 .inc_by(streaming_row_cnt as u64);
726
727 METRIC_FLOW_ROWS
728 .with_label_values(&["in-batching"])
729 .inc_by(batching_row_cnt as u64);
730
731 let streaming_engine = self.streaming_engine.clone();
732 let stream_handler: JoinHandle<Result<(), Error>> =
733 common_runtime::spawn_global(async move {
734 streaming_engine
735 .handle_flow_inserts(api::v1::region::InsertRequests {
736 requests: to_stream_engine,
737 })
738 .await?;
739 Ok(())
740 });
741 self.batching_engine
742 .handle_flow_inserts(api::v1::region::InsertRequests {
743 requests: to_batch_engine,
744 })
745 .await?;
746 stream_handler.await.context(JoinTaskSnafu)??;
747
748 Ok(())
749 }
750}
751
752#[async_trait::async_trait]
753impl common_meta::node_manager::Flownode for FlowDualEngine {
754 async fn handle(&self, request: FlowRequest) -> MetaResult<FlowResponse> {
755 let query_ctx = request
756 .header
757 .and_then(|h| h.query_context)
758 .map(|ctx| ctx.into());
759 match request.body {
760 Some(flow_request::Body::Create(CreateRequest {
761 flow_id: Some(task_id),
762 source_table_ids,
763 sink_table_name: Some(sink_table_name),
764 create_if_not_exists,
765 expire_after,
766 comment,
767 sql,
768 flow_options,
769 or_replace,
770 })) => {
771 let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec();
772 let sink_table_name = [
773 sink_table_name.catalog_name,
774 sink_table_name.schema_name,
775 sink_table_name.table_name,
776 ];
777 let expire_after = expire_after.map(|e| e.value);
778 let args = CreateFlowArgs {
779 flow_id: task_id.id as u64,
780 sink_table_name,
781 source_table_ids,
782 create_if_not_exists,
783 or_replace,
784 expire_after,
785 comment: Some(comment),
786 sql: sql.clone(),
787 flow_options,
788 query_ctx,
789 };
790 let ret = self
791 .create_flow(args)
792 .await
793 .map_err(BoxedError::new)
794 .with_context(|_| CreateFlowSnafu { sql: sql.clone() })
795 .map_err(to_meta_err(snafu::location!()))?;
796 METRIC_FLOW_TASK_COUNT.inc();
797 Ok(FlowResponse {
798 affected_flows: ret
799 .map(|id| greptime_proto::v1::FlowId { id: id as u32 })
800 .into_iter()
801 .collect_vec(),
802 ..Default::default()
803 })
804 }
805 Some(flow_request::Body::Drop(DropRequest {
806 flow_id: Some(flow_id),
807 })) => {
808 self.remove_flow(flow_id.id as u64)
809 .await
810 .map_err(to_meta_err(snafu::location!()))?;
811 METRIC_FLOW_TASK_COUNT.dec();
812 Ok(Default::default())
813 }
814 Some(flow_request::Body::Flush(FlushFlow {
815 flow_id: Some(flow_id),
816 })) => {
817 let row = self
818 .flush_flow(flow_id.id as u64)
819 .await
820 .map_err(to_meta_err(snafu::location!()))?;
821 Ok(FlowResponse {
822 affected_flows: vec![flow_id],
823 affected_rows: row as u64,
824 ..Default::default()
825 })
826 }
827 other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
828 }
829 }
830
831 async fn handle_inserts(&self, request: InsertRequests) -> MetaResult<FlowResponse> {
832 FlowEngine::handle_flow_inserts(self, request)
833 .await
834 .map(|_| Default::default())
835 .map_err(to_meta_err(snafu::location!()))
836 }
837
838 async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> MetaResult<FlowResponse> {
839 self.batching_engine()
840 .handle_mark_dirty_time_window(DirtyWindowRequests {
841 requests: vec![req],
842 })
843 .await
844 .map_err(to_meta_err(snafu::location!()))
845 }
846}
847
848fn to_meta_err(
850 location: snafu::Location,
851) -> impl FnOnce(crate::error::Error) -> common_meta::error::Error {
852 move |err: crate::error::Error| -> common_meta::error::Error {
853 match err {
854 crate::error::Error::FlowNotFound { id, .. } => {
855 common_meta::error::Error::FlowNotFound {
856 flow_name: format!("flow_id={id}"),
857 location,
858 }
859 }
860 _ => common_meta::error::Error::External {
861 location,
862 source: BoxedError::new(err),
863 },
864 }
865 }
866}
867
868impl FlowEngine for StreamingEngine {
869 async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
870 self.create_flow_inner(args).await
871 }
872
873 async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
874 self.remove_flow_inner(flow_id).await
875 }
876
877 async fn flush_flow(&self, flow_id: FlowId) -> Result<usize, Error> {
878 self.flush_flow_inner(flow_id).await
879 }
880
881 async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error> {
882 self.flow_exist_inner(flow_id).await
883 }
884
885 async fn list_flows(&self) -> Result<impl IntoIterator<Item = FlowId>, Error> {
886 Ok(self
887 .flow_err_collectors
888 .read()
889 .await
890 .keys()
891 .cloned()
892 .collect::<Vec<_>>())
893 }
894
895 async fn handle_flow_inserts(
896 &self,
897 request: api::v1::region::InsertRequests,
898 ) -> Result<(), Error> {
899 self.handle_inserts_inner(request).await
900 }
901}
902
903#[derive(Debug, Clone)]
905enum FetchFromRow {
906 Idx(usize),
907 Default(Value),
908}
909
910impl FetchFromRow {
911 fn fetch(&self, row: &repr::Row) -> Value {
913 match self {
914 FetchFromRow::Idx(idx) => row.get(*idx).unwrap().clone(),
915 FetchFromRow::Default(v) => v.clone(),
916 }
917 }
918}
919
920impl StreamingEngine {
921 async fn handle_inserts_inner(
922 &self,
923 request: InsertRequests,
924 ) -> std::result::Result<(), Error> {
925 let _flush_lock = self.flush_lock.try_read();
929 for write_request in request.requests {
930 let region_id = write_request.region_id;
931 let table_id = RegionId::from(region_id).table_id();
932
933 let (insert_schema, rows_proto) = write_request
934 .rows
935 .map(|r| (r.schema, r.rows))
936 .unwrap_or_default();
937
938 let now = self.tick_manager.tick();
940
941 let (table_types, fetch_order) = {
942 let ctx = self.node_context.read().await;
943
944 let table_schema = ctx.table_source.table_from_id(&table_id).await?;
946 let default_vals = table_schema
947 .default_values
948 .iter()
949 .zip(table_schema.relation_desc.typ().column_types.iter())
950 .map(|(v, ty)| {
951 v.as_ref().and_then(|v| {
952 match v.create_default(ty.scalar_type(), ty.nullable()) {
953 Ok(v) => Some(v),
954 Err(err) => {
955 common_telemetry::error!(err; "Failed to create default value");
956 None
957 }
958 }
959 })
960 })
961 .collect_vec();
962
963 let table_types = table_schema
964 .relation_desc
965 .typ()
966 .column_types
967 .clone()
968 .into_iter()
969 .map(|t| t.scalar_type)
970 .collect_vec();
971 let table_col_names = table_schema.relation_desc.names;
972 let table_col_names = table_col_names
973 .iter().enumerate()
974 .map(|(idx,name)| match name {
975 Some(name) => Ok(name.clone()),
976 None => InternalSnafu {
977 reason: format!("Expect column {idx} of table id={table_id} to have name in table schema, found None"),
978 }
979 .fail(),
980 })
981 .collect::<Result<Vec<_>, _>>()?;
982 let name_to_col = HashMap::<_, _>::from_iter(
983 insert_schema
984 .iter()
985 .enumerate()
986 .map(|(i, name)| (&name.column_name, i)),
987 );
988
989 let fetch_order: Vec<FetchFromRow> = table_col_names
990 .iter()
991 .zip(default_vals.into_iter())
992 .map(|(col_name, col_default_val)| {
993 name_to_col
994 .get(col_name)
995 .copied()
996 .map(FetchFromRow::Idx)
997 .or_else(|| col_default_val.clone().map(FetchFromRow::Default))
998 .with_context(|| UnexpectedSnafu {
999 reason: format!(
1000 "Column not found: {}, default_value: {:?}",
1001 col_name, col_default_val
1002 ),
1003 })
1004 })
1005 .try_collect()?;
1006
1007 trace!("Reordering columns: {:?}", fetch_order);
1008 (table_types, fetch_order)
1009 };
1010
1011 let rows: Vec<DiffRow> = rows_proto
1013 .into_iter()
1014 .map(|r| {
1015 let r = repr::Row::from(r);
1016 let reordered = fetch_order.iter().map(|i| i.fetch(&r)).collect_vec();
1017 repr::Row::new(reordered)
1018 })
1019 .map(|r| (r, now, 1))
1020 .collect_vec();
1021 if let Err(err) = self
1022 .handle_write_request(region_id.into(), rows, &table_types)
1023 .await
1024 {
1025 let err = BoxedError::new(err);
1026 let flow_ids = self
1027 .node_context
1028 .read()
1029 .await
1030 .get_flow_ids(table_id)
1031 .into_iter()
1032 .flatten()
1033 .cloned()
1034 .collect_vec();
1035 let err = InsertIntoFlowSnafu {
1036 region_id,
1037 flow_ids,
1038 }
1039 .into_error(err);
1040 common_telemetry::error!(err; "Failed to handle write request");
1041 return Err(err);
1042 }
1043 }
1044 Ok(())
1045 }
1046}