1use std::collections::{HashMap, HashSet};
17use std::sync::atomic::AtomicBool;
18use std::sync::Arc;
19
20use api::v1::flow::{
21 flow_request, CreateRequest, DropRequest, FlowRequest, FlowResponse, FlushFlow,
22};
23use api::v1::region::InsertRequests;
24use catalog::CatalogManager;
25use common_base::Plugins;
26use common_error::ext::BoxedError;
27use common_meta::ddl::create_flow::FlowType;
28use common_meta::error::Result as MetaResult;
29use common_meta::key::flow::FlowMetadataManager;
30use common_runtime::JoinHandle;
31use common_telemetry::{error, info, trace, warn};
32use datatypes::value::Value;
33use futures::TryStreamExt;
34use itertools::Itertools;
35use session::context::QueryContextBuilder;
36use snafu::{ensure, IntoError, OptionExt, ResultExt};
37use store_api::storage::{RegionId, TableId};
38use tokio::sync::{Mutex, RwLock};
39
40use crate::adapter::{CreateFlowArgs, StreamingEngine};
41use crate::batching_mode::engine::BatchingEngine;
42use crate::batching_mode::{FRONTEND_SCAN_TIMEOUT, MIN_REFRESH_DURATION};
43use crate::engine::FlowEngine;
44use crate::error::{
45 CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, FlowNotRecoveredSnafu,
46 IllegalCheckTaskStateSnafu, InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu,
47 NoAvailableFrontendSnafu, SyncCheckTaskSnafu, UnexpectedSnafu,
48};
49use crate::metrics::METRIC_FLOW_TASK_COUNT;
50use crate::repr::{self, DiffRow};
51use crate::{Error, FlowId};
52
53pub type FlowDualEngineRef = Arc<FlowDualEngine>;
55
56pub struct FlowDualEngine {
61 streaming_engine: Arc<StreamingEngine>,
62 batching_engine: Arc<BatchingEngine>,
63 src_table2flow: RwLock<SrcTableToFlow>,
65 flow_metadata_manager: Arc<FlowMetadataManager>,
66 catalog_manager: Arc<dyn CatalogManager>,
67 check_task: tokio::sync::Mutex<Option<ConsistentCheckTask>>,
68 plugins: Plugins,
69 done_recovering: AtomicBool,
70}
71
72impl FlowDualEngine {
73 pub fn new(
74 streaming_engine: Arc<StreamingEngine>,
75 batching_engine: Arc<BatchingEngine>,
76 flow_metadata_manager: Arc<FlowMetadataManager>,
77 catalog_manager: Arc<dyn CatalogManager>,
78 plugins: Plugins,
79 ) -> Self {
80 Self {
81 streaming_engine,
82 batching_engine,
83 src_table2flow: RwLock::new(SrcTableToFlow::default()),
84 flow_metadata_manager,
85 catalog_manager,
86 check_task: Mutex::new(None),
87 plugins,
88 done_recovering: AtomicBool::new(false),
89 }
90 }
91
92 pub fn set_done_recovering(&self) {
95 info!("FlowDualEngine done recovering");
96 self.done_recovering
97 .store(true, std::sync::atomic::Ordering::Release);
98 }
99
100 pub fn is_recover_done(&self) -> bool {
102 self.done_recovering
103 .load(std::sync::atomic::Ordering::Acquire)
104 }
105
106 async fn wait_for_all_flow_recover(&self, waiting_req_cnt: usize) -> Result<(), Error> {
108 if self.is_recover_done() {
109 return Ok(());
110 }
111
112 warn!(
113 "FlowDualEngine is not done recovering, {} insert request waiting for recovery",
114 waiting_req_cnt
115 );
116 let mut retry = 0;
119 let max_retry = 3;
120 while retry < max_retry && !self.is_recover_done() {
121 warn!(
122 "FlowDualEngine is not done recovering, retry {} in 1s",
123 retry
124 );
125 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
126 retry += 1;
127 }
128 if retry == max_retry {
129 return FlowNotRecoveredSnafu.fail();
130 } else {
131 info!("FlowDualEngine is done recovering");
132 }
133 Ok(())
135 }
136
137 pub fn plugins(&self) -> &Plugins {
138 &self.plugins
139 }
140
141 pub fn is_distributed(&self) -> bool {
143 self.streaming_engine.node_id.is_some()
144 }
145
146 pub fn streaming_engine(&self) -> Arc<StreamingEngine> {
147 self.streaming_engine.clone()
148 }
149
150 pub fn batching_engine(&self) -> Arc<BatchingEngine> {
151 self.batching_engine.clone()
152 }
153
154 async fn wait_for_available_frontend(&self, timeout: std::time::Duration) -> Result<(), Error> {
158 if !self.is_distributed() {
159 return Ok(());
160 }
161 let frontend_client = self.batching_engine().frontend_client.clone();
162 let sleep_duration = std::time::Duration::from_millis(1_000);
163 let now = std::time::Instant::now();
164 loop {
165 let frontend_list = frontend_client.scan_for_frontend().await?;
166 if !frontend_list.is_empty() {
167 let fe_list = frontend_list
168 .iter()
169 .map(|(_, info)| &info.peer.addr)
170 .collect::<Vec<_>>();
171 info!("Available frontend found: {:?}", fe_list);
172 return Ok(());
173 }
174 let elapsed = now.elapsed();
175 tokio::time::sleep(sleep_duration).await;
176 info!("Waiting for available frontend, elapsed={:?}", elapsed);
177 if elapsed >= timeout {
178 return NoAvailableFrontendSnafu {
179 timeout,
180 context: "No available frontend found in cluster info",
181 }
182 .fail();
183 }
184 }
185 }
186
187 async fn try_sync_with_check_task(
191 &self,
192 flow_id: FlowId,
193 allow_drop: bool,
194 ) -> Result<(), Error> {
195 info!("Try to sync with check task for flow {}", flow_id);
197 let mut retry = 0;
198 let max_retry = 10;
199 while retry < max_retry {
201 if let Some(task) = self.check_task.lock().await.as_ref() {
202 task.trigger(false, allow_drop).await?;
203 break;
204 }
205 retry += 1;
206 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
207 }
208
209 if retry == max_retry {
210 error!(
211 "Can't sync with check task for flow {} with allow_drop={}",
212 flow_id, allow_drop
213 );
214 return SyncCheckTaskSnafu {
215 flow_id,
216 allow_drop,
217 }
218 .fail();
219 }
220 info!("Successfully sync with check task for flow {}", flow_id);
221
222 Ok(())
223 }
224
225 async fn check_flow_consistent(
228 &self,
229 allow_create: bool,
230 allow_drop: bool,
231 ) -> Result<(), Error> {
232 let nodeid = self.streaming_engine.node_id;
234 let should_exists: Vec<_> = if let Some(nodeid) = nodeid {
235 let to_be_recover = self
238 .flow_metadata_manager
239 .flownode_flow_manager()
240 .flows(nodeid.into())
241 .try_collect::<Vec<_>>()
242 .await
243 .context(ListFlowsSnafu {
244 id: Some(nodeid.into()),
245 })?;
246 to_be_recover.into_iter().map(|(id, _)| id).collect()
247 } else {
248 let all_catalogs = self
251 .catalog_manager
252 .catalog_names()
253 .await
254 .map_err(BoxedError::new)
255 .context(ExternalSnafu)?;
256 let mut all_flow_ids = vec![];
257 for catalog in all_catalogs {
258 let flows = self
259 .flow_metadata_manager
260 .flow_name_manager()
261 .flow_names(&catalog)
262 .await
263 .try_collect::<Vec<_>>()
264 .await
265 .map_err(BoxedError::new)
266 .context(ExternalSnafu)?;
267
268 all_flow_ids.extend(flows.into_iter().map(|(_, id)| id.flow_id()));
269 }
270 all_flow_ids
271 };
272 let should_exists = should_exists
273 .into_iter()
274 .map(|i| i as FlowId)
275 .collect::<HashSet<_>>();
276 let actual_exists = self.list_flows().await?.into_iter().collect::<HashSet<_>>();
277 let to_be_created = should_exists
278 .iter()
279 .filter(|id| !actual_exists.contains(id))
280 .collect::<Vec<_>>();
281 let to_be_dropped = actual_exists
282 .iter()
283 .filter(|id| !should_exists.contains(id))
284 .collect::<Vec<_>>();
285
286 if !to_be_created.is_empty() {
287 if allow_create {
288 info!(
289 "Recovering {} flows: {:?}",
290 to_be_created.len(),
291 to_be_created
292 );
293 let mut errors = vec![];
294 for flow_id in to_be_created.clone() {
295 let flow_id = *flow_id;
296 let info = self
297 .flow_metadata_manager
298 .flow_info_manager()
299 .get(flow_id as u32)
300 .await
301 .map_err(BoxedError::new)
302 .context(ExternalSnafu)?
303 .context(FlowNotFoundSnafu { id: flow_id })?;
304
305 let sink_table_name = [
306 info.sink_table_name().catalog_name.clone(),
307 info.sink_table_name().schema_name.clone(),
308 info.sink_table_name().table_name.clone(),
309 ];
310 let args = CreateFlowArgs {
311 flow_id,
312 sink_table_name,
313 source_table_ids: info.source_table_ids().to_vec(),
314 create_if_not_exists: true,
318 or_replace: true,
319 expire_after: info.expire_after(),
320 comment: Some(info.comment().clone()),
321 sql: info.raw_sql().clone(),
322 flow_options: info.options().clone(),
323 query_ctx: info
324 .query_context()
325 .clone()
326 .map(|ctx| {
327 ctx.try_into()
328 .map_err(BoxedError::new)
329 .context(ExternalSnafu)
330 })
331 .transpose()?
332 .or_else(|| {
335 Some(
336 QueryContextBuilder::default()
337 .current_catalog(info.catalog_name().to_string())
338 .build(),
339 )
340 }),
341 };
342 if let Err(err) = self
343 .create_flow(args)
344 .await
345 .map_err(BoxedError::new)
346 .with_context(|_| CreateFlowSnafu {
347 sql: info.raw_sql().clone(),
348 })
349 {
350 errors.push((flow_id, err));
351 }
352 }
353 if errors.is_empty() {
354 info!("Recover flows successfully, flows: {:?}", to_be_created);
355 }
356
357 for (flow_id, err) in errors {
358 warn!("Failed to recreate flow {}, err={:#?}", flow_id, err);
359 }
360 } else {
361 warn!(
362 "Flownode {:?} found flows not exist in flownode, flow_ids={:?}",
363 nodeid, to_be_created
364 );
365 }
366 }
367 if !to_be_dropped.is_empty() {
368 if allow_drop {
369 info!("Dropping flows: {:?}", to_be_dropped);
370 let mut errors = vec![];
371 for flow_id in to_be_dropped {
372 let flow_id = *flow_id;
373 if let Err(err) = self.remove_flow(flow_id).await {
374 errors.push((flow_id, err));
375 }
376 }
377 for (flow_id, err) in errors {
378 warn!("Failed to drop flow {}, err={:#?}", flow_id, err);
379 }
380 } else {
381 warn!(
382 "Flownode {:?} found flows not exist in flownode, flow_ids={:?}",
383 nodeid, to_be_dropped
384 );
385 }
386 }
387 Ok(())
388 }
389
390 pub async fn start_flow_consistent_check_task(self: &Arc<Self>) -> Result<(), Error> {
392 let mut check_task = self.check_task.lock().await;
393 ensure!(
394 check_task.is_none(),
395 IllegalCheckTaskStateSnafu {
396 reason: "Flow consistent check task already exists",
397 }
398 );
399 let task = ConsistentCheckTask::start_check_task(self).await?;
400 *check_task = Some(task);
401 Ok(())
402 }
403
404 pub async fn stop_flow_consistent_check_task(&self) -> Result<(), Error> {
405 info!("Stopping flow consistent check task");
406 let mut check_task = self.check_task.lock().await;
407
408 ensure!(
409 check_task.is_some(),
410 IllegalCheckTaskStateSnafu {
411 reason: "Flow consistent check task does not exist",
412 }
413 );
414
415 check_task.take().unwrap().stop().await?;
416 info!("Stopped flow consistent check task");
417 Ok(())
418 }
419
420 async fn flow_exist_in_metadata(&self, flow_id: FlowId) -> Result<bool, Error> {
422 self.flow_metadata_manager
423 .flow_info_manager()
424 .get(flow_id as u32)
425 .await
426 .map_err(BoxedError::new)
427 .context(ExternalSnafu)
428 .map(|info| info.is_some())
429 }
430}
431
432struct ConsistentCheckTask {
433 handle: JoinHandle<()>,
434 shutdown_tx: tokio::sync::mpsc::Sender<()>,
435 trigger_tx: tokio::sync::mpsc::Sender<(bool, bool, tokio::sync::oneshot::Sender<()>)>,
436}
437
438impl ConsistentCheckTask {
439 async fn start_check_task(engine: &Arc<FlowDualEngine>) -> Result<Self, Error> {
440 let engine = engine.clone();
441 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
442 let (trigger_tx, mut trigger_rx) =
443 tokio::sync::mpsc::channel::<(bool, bool, tokio::sync::oneshot::Sender<()>)>(10);
444 let handle = common_runtime::spawn_global(async move {
445 if let Err(err) = engine
447 .wait_for_available_frontend(FRONTEND_SCAN_TIMEOUT)
448 .await
449 {
450 warn!("No frontend is available yet:\n {err:?}");
451 }
452
453 let mut recover_retry = 0;
455 while let Err(err) = engine.check_flow_consistent(true, false).await {
456 recover_retry += 1;
457 error!(
458 "Failed to recover flows:\n {err:?}, retry {} in {}s",
459 recover_retry,
460 MIN_REFRESH_DURATION.as_secs()
461 );
462 tokio::time::sleep(MIN_REFRESH_DURATION).await;
463 }
464
465 engine.set_done_recovering();
466
467 let (mut allow_create, mut allow_drop) = (false, false);
469 let mut ret_signal: Option<tokio::sync::oneshot::Sender<()>> = None;
470 loop {
471 if let Err(err) = engine.check_flow_consistent(allow_create, allow_drop).await {
472 error!(err; "Failed to check flow consistent");
473 }
474 if let Some(done) = ret_signal.take() {
475 let _ = done.send(());
476 }
477 tokio::select! {
478 _ = rx.recv() => break,
479 incoming = trigger_rx.recv() => if let Some(incoming) = incoming {
480 (allow_create, allow_drop) = (incoming.0, incoming.1);
481 ret_signal = Some(incoming.2);
482 },
483 _ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
484 (allow_create, allow_drop) = (false, false);
485 },
486 }
487 }
488 });
489 Ok(ConsistentCheckTask {
490 handle,
491 shutdown_tx: tx,
492 trigger_tx,
493 })
494 }
495
496 async fn trigger(&self, allow_create: bool, allow_drop: bool) -> Result<(), Error> {
497 let (tx, rx) = tokio::sync::oneshot::channel();
498 self.trigger_tx
499 .send((allow_create, allow_drop, tx))
500 .await
501 .map_err(|_| {
502 IllegalCheckTaskStateSnafu {
503 reason: "Failed to send trigger signal",
504 }
505 .build()
506 })?;
507 rx.await.map_err(|_| {
508 IllegalCheckTaskStateSnafu {
509 reason: "Failed to receive trigger signal",
510 }
511 .build()
512 })?;
513 Ok(())
514 }
515
516 async fn stop(self) -> Result<(), Error> {
517 self.shutdown_tx.send(()).await.map_err(|_| {
518 IllegalCheckTaskStateSnafu {
519 reason: "Failed to send shutdown signal",
520 }
521 .build()
522 })?;
523 self.handle.abort();
525 Ok(())
526 }
527}
528
529#[derive(Default)]
530struct SrcTableToFlow {
531 stream: HashMap<TableId, HashSet<FlowId>>,
533 batch: HashMap<TableId, HashSet<FlowId>>,
535 flow_infos: HashMap<FlowId, (FlowType, Vec<TableId>)>,
537}
538
539impl SrcTableToFlow {
540 fn in_stream(&self, table_id: TableId) -> bool {
541 self.stream.contains_key(&table_id)
542 }
543 fn in_batch(&self, table_id: TableId) -> bool {
544 self.batch.contains_key(&table_id)
545 }
546 fn add_flow(&mut self, flow_id: FlowId, flow_type: FlowType, src_table_ids: Vec<TableId>) {
547 let mapping = match flow_type {
548 FlowType::Streaming => &mut self.stream,
549 FlowType::Batching => &mut self.batch,
550 };
551
552 for src_table in src_table_ids.clone() {
553 mapping
554 .entry(src_table)
555 .and_modify(|flows| {
556 flows.insert(flow_id);
557 })
558 .or_insert_with(|| {
559 let mut set = HashSet::new();
560 set.insert(flow_id);
561 set
562 });
563 }
564 self.flow_infos.insert(flow_id, (flow_type, src_table_ids));
565 }
566
567 fn remove_flow(&mut self, flow_id: FlowId) {
568 let mapping = match self.get_flow_type(flow_id) {
569 Some(FlowType::Streaming) => &mut self.stream,
570 Some(FlowType::Batching) => &mut self.batch,
571 None => return,
572 };
573 if let Some((_, src_table_ids)) = self.flow_infos.remove(&flow_id) {
574 for src_table in src_table_ids {
575 if let Some(flows) = mapping.get_mut(&src_table) {
576 flows.remove(&flow_id);
577 }
578 }
579 }
580 }
581
582 fn get_flow_type(&self, flow_id: FlowId) -> Option<FlowType> {
583 self.flow_infos
584 .get(&flow_id)
585 .map(|(flow_type, _)| flow_type)
586 .cloned()
587 }
588}
589
590impl FlowEngine for FlowDualEngine {
591 async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
592 let flow_type = args
593 .flow_options
594 .get(FlowType::FLOW_TYPE_KEY)
595 .map(|s| s.as_str());
596
597 let flow_type = match flow_type {
598 Some(FlowType::BATCHING) => FlowType::Batching,
599 Some(FlowType::STREAMING) => FlowType::Streaming,
600 None => FlowType::Batching,
601 Some(flow_type) => {
602 return InternalSnafu {
603 reason: format!("Invalid flow type: {}", flow_type),
604 }
605 .fail()
606 }
607 };
608
609 let flow_id = args.flow_id;
610 let src_table_ids = args.source_table_ids.clone();
611
612 let res = match flow_type {
613 FlowType::Batching => self.batching_engine.create_flow(args).await,
614 FlowType::Streaming => self.streaming_engine.create_flow(args).await,
615 }?;
616
617 self.src_table2flow
618 .write()
619 .await
620 .add_flow(flow_id, flow_type, src_table_ids);
621
622 Ok(res)
623 }
624
625 async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
626 let flow_type = self.src_table2flow.read().await.get_flow_type(flow_id);
627
628 match flow_type {
629 Some(FlowType::Batching) => self.batching_engine.remove_flow(flow_id).await,
630 Some(FlowType::Streaming) => self.streaming_engine.remove_flow(flow_id).await,
631 None => {
632 warn!(
636 "Flow {} is not exist in the underlying engine, but exist in metadata",
637 flow_id
638 );
639 self.try_sync_with_check_task(flow_id, true).await?;
640
641 Ok(())
642 }
643 }?;
644 self.src_table2flow.write().await.remove_flow(flow_id);
646 Ok(())
647 }
648
649 async fn flush_flow(&self, flow_id: FlowId) -> Result<usize, Error> {
650 self.try_sync_with_check_task(flow_id, false).await?;
652 let flow_type = self.src_table2flow.read().await.get_flow_type(flow_id);
653 match flow_type {
654 Some(FlowType::Batching) => self.batching_engine.flush_flow(flow_id).await,
655 Some(FlowType::Streaming) => self.streaming_engine.flush_flow(flow_id).await,
656 None => {
657 warn!(
658 "Currently flow={flow_id} doesn't exist in flownode, ignore flush_flow request"
659 );
660 Ok(0)
661 }
662 }
663 }
664
665 async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error> {
666 let flow_type = self.src_table2flow.read().await.get_flow_type(flow_id);
667 match flow_type {
669 Some(FlowType::Batching) => self.batching_engine.flow_exist(flow_id).await,
670 Some(FlowType::Streaming) => self.streaming_engine.flow_exist(flow_id).await,
671 None => Ok(false),
672 }
673 }
674
675 async fn list_flows(&self) -> Result<impl IntoIterator<Item = FlowId>, Error> {
676 let stream_flows = self.streaming_engine.list_flows().await?;
677 let batch_flows = self.batching_engine.list_flows().await?;
678
679 Ok(stream_flows.into_iter().chain(batch_flows))
680 }
681
682 async fn handle_flow_inserts(
683 &self,
684 request: api::v1::region::InsertRequests,
685 ) -> Result<(), Error> {
686 self.wait_for_all_flow_recover(request.requests.len())
687 .await?;
688 let mut to_stream_engine = Vec::with_capacity(request.requests.len());
690 let mut to_batch_engine = request.requests;
691
692 {
693 let src_table2flow = self.src_table2flow.read().await;
695 to_batch_engine.retain(|req| {
696 let region_id = RegionId::from(req.region_id);
697 let table_id = region_id.table_id();
698 let is_in_stream = src_table2flow.in_stream(table_id);
699 let is_in_batch = src_table2flow.in_batch(table_id);
700 if is_in_stream {
701 to_stream_engine.push(req.clone());
702 }
703 if is_in_batch {
704 return true;
705 }
706 if !is_in_batch && !is_in_stream {
707 warn!("Table {} is not any flow's source table", table_id)
709 }
710 false
711 });
712 }
715
716 let streaming_engine = self.streaming_engine.clone();
717 let stream_handler: JoinHandle<Result<(), Error>> =
718 common_runtime::spawn_global(async move {
719 streaming_engine
720 .handle_flow_inserts(api::v1::region::InsertRequests {
721 requests: to_stream_engine,
722 })
723 .await?;
724 Ok(())
725 });
726 self.batching_engine
727 .handle_flow_inserts(api::v1::region::InsertRequests {
728 requests: to_batch_engine,
729 })
730 .await?;
731 stream_handler.await.context(JoinTaskSnafu)??;
732
733 Ok(())
734 }
735}
736
737#[async_trait::async_trait]
738impl common_meta::node_manager::Flownode for FlowDualEngine {
739 async fn handle(&self, request: FlowRequest) -> MetaResult<FlowResponse> {
740 let query_ctx = request
741 .header
742 .and_then(|h| h.query_context)
743 .map(|ctx| ctx.into());
744 match request.body {
745 Some(flow_request::Body::Create(CreateRequest {
746 flow_id: Some(task_id),
747 source_table_ids,
748 sink_table_name: Some(sink_table_name),
749 create_if_not_exists,
750 expire_after,
751 comment,
752 sql,
753 flow_options,
754 or_replace,
755 })) => {
756 let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec();
757 let sink_table_name = [
758 sink_table_name.catalog_name,
759 sink_table_name.schema_name,
760 sink_table_name.table_name,
761 ];
762 let expire_after = expire_after.map(|e| e.value);
763 let args = CreateFlowArgs {
764 flow_id: task_id.id as u64,
765 sink_table_name,
766 source_table_ids,
767 create_if_not_exists,
768 or_replace,
769 expire_after,
770 comment: Some(comment),
771 sql: sql.clone(),
772 flow_options,
773 query_ctx,
774 };
775 let ret = self
776 .create_flow(args)
777 .await
778 .map_err(BoxedError::new)
779 .with_context(|_| CreateFlowSnafu { sql: sql.clone() })
780 .map_err(to_meta_err(snafu::location!()))?;
781 METRIC_FLOW_TASK_COUNT.inc();
782 Ok(FlowResponse {
783 affected_flows: ret
784 .map(|id| greptime_proto::v1::FlowId { id: id as u32 })
785 .into_iter()
786 .collect_vec(),
787 ..Default::default()
788 })
789 }
790 Some(flow_request::Body::Drop(DropRequest {
791 flow_id: Some(flow_id),
792 })) => {
793 self.remove_flow(flow_id.id as u64)
794 .await
795 .map_err(to_meta_err(snafu::location!()))?;
796 METRIC_FLOW_TASK_COUNT.dec();
797 Ok(Default::default())
798 }
799 Some(flow_request::Body::Flush(FlushFlow {
800 flow_id: Some(flow_id),
801 })) => {
802 let row = self
803 .flush_flow(flow_id.id as u64)
804 .await
805 .map_err(to_meta_err(snafu::location!()))?;
806 Ok(FlowResponse {
807 affected_flows: vec![flow_id],
808 affected_rows: row as u64,
809 ..Default::default()
810 })
811 }
812 other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
813 }
814 }
815
816 async fn handle_inserts(&self, request: InsertRequests) -> MetaResult<FlowResponse> {
817 FlowEngine::handle_flow_inserts(self, request)
818 .await
819 .map(|_| Default::default())
820 .map_err(to_meta_err(snafu::location!()))
821 }
822}
823
824fn to_meta_err(
826 location: snafu::Location,
827) -> impl FnOnce(crate::error::Error) -> common_meta::error::Error {
828 move |err: crate::error::Error| -> common_meta::error::Error {
829 common_meta::error::Error::External {
830 location,
831 source: BoxedError::new(err),
832 }
833 }
834}
835
836#[async_trait::async_trait]
837impl common_meta::node_manager::Flownode for StreamingEngine {
838 async fn handle(&self, request: FlowRequest) -> MetaResult<FlowResponse> {
839 let query_ctx = request
840 .header
841 .and_then(|h| h.query_context)
842 .map(|ctx| ctx.into());
843 match request.body {
844 Some(flow_request::Body::Create(CreateRequest {
845 flow_id: Some(task_id),
846 source_table_ids,
847 sink_table_name: Some(sink_table_name),
848 create_if_not_exists,
849 expire_after,
850 comment,
851 sql,
852 flow_options,
853 or_replace,
854 })) => {
855 let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec();
856 let sink_table_name = [
857 sink_table_name.catalog_name,
858 sink_table_name.schema_name,
859 sink_table_name.table_name,
860 ];
861 let expire_after = expire_after.map(|e| e.value);
862 let args = CreateFlowArgs {
863 flow_id: task_id.id as u64,
864 sink_table_name,
865 source_table_ids,
866 create_if_not_exists,
867 or_replace,
868 expire_after,
869 comment: Some(comment),
870 sql: sql.clone(),
871 flow_options,
872 query_ctx,
873 };
874 let ret = self
875 .create_flow(args)
876 .await
877 .map_err(BoxedError::new)
878 .with_context(|_| CreateFlowSnafu { sql: sql.clone() })
879 .map_err(to_meta_err(snafu::location!()))?;
880 METRIC_FLOW_TASK_COUNT.inc();
881 Ok(FlowResponse {
882 affected_flows: ret
883 .map(|id| greptime_proto::v1::FlowId { id: id as u32 })
884 .into_iter()
885 .collect_vec(),
886 ..Default::default()
887 })
888 }
889 Some(flow_request::Body::Drop(DropRequest {
890 flow_id: Some(flow_id),
891 })) => {
892 self.remove_flow(flow_id.id as u64)
893 .await
894 .map_err(to_meta_err(snafu::location!()))?;
895 METRIC_FLOW_TASK_COUNT.dec();
896 Ok(Default::default())
897 }
898 Some(flow_request::Body::Flush(FlushFlow {
899 flow_id: Some(flow_id),
900 })) => {
901 let row = self
902 .flush_flow_inner(flow_id.id as u64)
903 .await
904 .map_err(to_meta_err(snafu::location!()))?;
905 Ok(FlowResponse {
906 affected_flows: vec![flow_id],
907 affected_rows: row as u64,
908 ..Default::default()
909 })
910 }
911 other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
912 }
913 }
914
915 async fn handle_inserts(&self, request: InsertRequests) -> MetaResult<FlowResponse> {
916 self.handle_inserts_inner(request)
917 .await
918 .map(|_| Default::default())
919 .map_err(to_meta_err(snafu::location!()))
920 }
921}
922
923impl FlowEngine for StreamingEngine {
924 async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
925 self.create_flow_inner(args).await
926 }
927
928 async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
929 self.remove_flow_inner(flow_id).await
930 }
931
932 async fn flush_flow(&self, flow_id: FlowId) -> Result<usize, Error> {
933 self.flush_flow_inner(flow_id).await
934 }
935
936 async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error> {
937 self.flow_exist_inner(flow_id).await
938 }
939
940 async fn list_flows(&self) -> Result<impl IntoIterator<Item = FlowId>, Error> {
941 Ok(self
942 .flow_err_collectors
943 .read()
944 .await
945 .keys()
946 .cloned()
947 .collect::<Vec<_>>())
948 }
949
950 async fn handle_flow_inserts(
951 &self,
952 request: api::v1::region::InsertRequests,
953 ) -> Result<(), Error> {
954 self.handle_inserts_inner(request).await
955 }
956}
957
958#[derive(Debug, Clone)]
960enum FetchFromRow {
961 Idx(usize),
962 Default(Value),
963}
964
965impl FetchFromRow {
966 fn fetch(&self, row: &repr::Row) -> Value {
968 match self {
969 FetchFromRow::Idx(idx) => row.get(*idx).unwrap().clone(),
970 FetchFromRow::Default(v) => v.clone(),
971 }
972 }
973}
974
975impl StreamingEngine {
976 async fn handle_inserts_inner(
977 &self,
978 request: InsertRequests,
979 ) -> std::result::Result<(), Error> {
980 let _flush_lock = self.flush_lock.try_read();
984 for write_request in request.requests {
985 let region_id = write_request.region_id;
986 let table_id = RegionId::from(region_id).table_id();
987
988 let (insert_schema, rows_proto) = write_request
989 .rows
990 .map(|r| (r.schema, r.rows))
991 .unwrap_or_default();
992
993 let now = self.tick_manager.tick();
995
996 let (table_types, fetch_order) = {
997 let ctx = self.node_context.read().await;
998
999 let table_schema = ctx.table_source.table_from_id(&table_id).await?;
1001 let default_vals = table_schema
1002 .default_values
1003 .iter()
1004 .zip(table_schema.relation_desc.typ().column_types.iter())
1005 .map(|(v, ty)| {
1006 v.as_ref().and_then(|v| {
1007 match v.create_default(ty.scalar_type(), ty.nullable()) {
1008 Ok(v) => Some(v),
1009 Err(err) => {
1010 common_telemetry::error!(err; "Failed to create default value");
1011 None
1012 }
1013 }
1014 })
1015 })
1016 .collect_vec();
1017
1018 let table_types = table_schema
1019 .relation_desc
1020 .typ()
1021 .column_types
1022 .clone()
1023 .into_iter()
1024 .map(|t| t.scalar_type)
1025 .collect_vec();
1026 let table_col_names = table_schema.relation_desc.names;
1027 let table_col_names = table_col_names
1028 .iter().enumerate()
1029 .map(|(idx,name)| match name {
1030 Some(name) => Ok(name.clone()),
1031 None => InternalSnafu {
1032 reason: format!("Expect column {idx} of table id={table_id} to have name in table schema, found None"),
1033 }
1034 .fail(),
1035 })
1036 .collect::<Result<Vec<_>, _>>()?;
1037 let name_to_col = HashMap::<_, _>::from_iter(
1038 insert_schema
1039 .iter()
1040 .enumerate()
1041 .map(|(i, name)| (&name.column_name, i)),
1042 );
1043
1044 let fetch_order: Vec<FetchFromRow> = table_col_names
1045 .iter()
1046 .zip(default_vals.into_iter())
1047 .map(|(col_name, col_default_val)| {
1048 name_to_col
1049 .get(col_name)
1050 .copied()
1051 .map(FetchFromRow::Idx)
1052 .or_else(|| col_default_val.clone().map(FetchFromRow::Default))
1053 .with_context(|| UnexpectedSnafu {
1054 reason: format!(
1055 "Column not found: {}, default_value: {:?}",
1056 col_name, col_default_val
1057 ),
1058 })
1059 })
1060 .try_collect()?;
1061
1062 trace!("Reordering columns: {:?}", fetch_order);
1063 (table_types, fetch_order)
1064 };
1065
1066 let rows: Vec<DiffRow> = rows_proto
1068 .into_iter()
1069 .map(|r| {
1070 let r = repr::Row::from(r);
1071 let reordered = fetch_order.iter().map(|i| i.fetch(&r)).collect_vec();
1072 repr::Row::new(reordered)
1073 })
1074 .map(|r| (r, now, 1))
1075 .collect_vec();
1076 if let Err(err) = self
1077 .handle_write_request(region_id.into(), rows, &table_types)
1078 .await
1079 {
1080 let err = BoxedError::new(err);
1081 let flow_ids = self
1082 .node_context
1083 .read()
1084 .await
1085 .get_flow_ids(table_id)
1086 .into_iter()
1087 .flatten()
1088 .cloned()
1089 .collect_vec();
1090 let err = InsertIntoFlowSnafu {
1091 region_id,
1092 flow_ids,
1093 }
1094 .into_error(err);
1095 common_telemetry::error!(err; "Failed to handle write request");
1096 return Err(err);
1097 }
1098 }
1099 Ok(())
1100 }
1101}