1use std::net::SocketAddr;
18use std::sync::Arc;
19
20use api::v1::flow::DirtyWindowRequests;
21use api::v1::{RowDeleteRequests, RowInsertRequests};
22use cache::{PARTITION_INFO_CACHE_NAME, TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME};
23use catalog::CatalogManagerRef;
24use common_base::Plugins;
25use common_error::ext::BoxedError;
26use common_meta::cache::{LayeredCacheRegistryRef, TableFlownodeSetCacheRef, TableRouteCacheRef};
27use common_meta::key::TableMetadataManagerRef;
28use common_meta::key::flow::FlowMetadataManagerRef;
29use common_meta::kv_backend::KvBackendRef;
30use common_meta::node_manager::{Flownode, NodeManagerRef};
31use common_meta::procedure_executor::ProcedureExecutorRef;
32use common_query::Output;
33use common_runtime::JoinHandle;
34use common_telemetry::tracing::info;
35use futures::TryStreamExt;
36use greptime_proto::v1::flow::{FlowRequest, FlowResponse, InsertRequests, flow_server};
37use itertools::Itertools;
38use operator::delete::Deleter;
39use operator::insert::Inserter;
40use operator::statement::StatementExecutor;
41use partition::cache::PartitionInfoCacheRef;
42use partition::manager::PartitionRuleManager;
43use query::{QueryEngine, QueryEngineFactory};
44use servers::add_service;
45use servers::grpc::builder::GrpcServerBuilder;
46use servers::grpc::{GrpcServer, GrpcServerConfig};
47use servers::http::HttpServerBuilder;
48use servers::metrics_handler::MetricsHandler;
49use servers::server::{ServerHandler, ServerHandlers};
50use session::context::QueryContextRef;
51use snafu::{OptionExt, ResultExt};
52use tokio::sync::{Mutex, broadcast, oneshot};
53use tonic::codec::CompressionEncoding;
54use tonic::{Request, Response, Status};
55
56use crate::adapter::flownode_impl::{FlowDualEngine, FlowDualEngineRef};
57use crate::adapter::{FlowStreamingEngineRef, create_worker};
58use crate::batching_mode::engine::BatchingEngine;
59use crate::error::{
60 CacheRequiredSnafu, ExternalSnafu, IllegalAuthConfigSnafu, ListFlowsSnafu, ParseAddrSnafu,
61 ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu, to_status_with_last_err,
62};
63use crate::heartbeat::HeartbeatTask;
64use crate::metrics::{METRIC_FLOW_PROCESSING_TIME, METRIC_FLOW_ROWS};
65use crate::transform::register_function_to_query_engine;
66use crate::utils::{SizeReportSender, StateReportHandler};
67use crate::{Error, FlowAuthHeader, FlownodeOptions, FrontendClient, StreamingEngine};
68
69pub const FLOW_NODE_SERVER_NAME: &str = "FLOW_NODE_SERVER";
70#[derive(Clone)]
72pub struct FlowService {
73 pub dual_engine: FlowDualEngineRef,
74}
75
76impl FlowService {
77 pub fn new(manager: FlowDualEngineRef) -> Self {
78 Self {
79 dual_engine: manager,
80 }
81 }
82}
83
84#[async_trait::async_trait]
85impl flow_server::Flow for FlowService {
86 async fn handle_create_remove(
87 &self,
88 request: Request<FlowRequest>,
89 ) -> Result<Response<FlowResponse>, Status> {
90 let _timer = METRIC_FLOW_PROCESSING_TIME
91 .with_label_values(&["ddl"])
92 .start_timer();
93
94 let request = request.into_inner();
95 self.dual_engine
96 .handle(request)
97 .await
98 .map_err(|err| {
99 common_telemetry::error!(err; "Failed to handle flow request");
100 err
101 })
102 .map(Response::new)
103 .map_err(to_status_with_last_err)
104 }
105
106 async fn handle_mirror_request(
107 &self,
108 request: Request<InsertRequests>,
109 ) -> Result<Response<FlowResponse>, Status> {
110 let _timer = METRIC_FLOW_PROCESSING_TIME
111 .with_label_values(&["insert"])
112 .start_timer();
113
114 let request = request.into_inner();
115 let mut row_count = 0;
117 let request = api::v1::region::InsertRequests {
118 requests: request
119 .requests
120 .into_iter()
121 .map(|insert| {
122 insert.rows.as_ref().inspect(|x| row_count += x.rows.len());
123 api::v1::region::InsertRequest {
124 region_id: insert.region_id,
125 rows: insert.rows,
126 partition_expr_version: insert.partition_expr_version,
127 }
128 })
129 .collect_vec(),
130 };
131
132 METRIC_FLOW_ROWS
133 .with_label_values(&["in"])
134 .inc_by(row_count as u64);
135
136 self.dual_engine
137 .handle_inserts(request)
138 .await
139 .map(Response::new)
140 .map_err(to_status_with_last_err)
141 }
142
143 async fn handle_mark_dirty_time_window(
144 &self,
145 reqs: Request<DirtyWindowRequests>,
146 ) -> Result<Response<FlowResponse>, Status> {
147 self.dual_engine
148 .handle_mark_window_dirty(reqs.into_inner())
149 .await
150 .map(Response::new)
151 .map_err(to_status_with_last_err)
152 }
153}
154
155#[derive(Clone)]
156pub struct FlownodeServer {
157 inner: Arc<FlownodeServerInner>,
158}
159
160struct FlownodeServerInner {
164 worker_shutdown_tx: Mutex<broadcast::Sender<()>>,
166 server_shutdown_tx: Mutex<broadcast::Sender<()>>,
168 streaming_task_handler: Mutex<Option<JoinHandle<()>>>,
170 state_report_task_handler: Mutex<Option<JoinHandle<()>>>,
172 flow_service: FlowService,
173}
174
175impl FlownodeServer {
176 pub fn new(flow_service: FlowService) -> Self {
177 let (tx, _rx) = broadcast::channel::<()>(1);
178 let (server_tx, _server_rx) = broadcast::channel::<()>(1);
179 Self {
180 inner: Arc::new(FlownodeServerInner {
181 flow_service,
182 worker_shutdown_tx: Mutex::new(tx),
183 server_shutdown_tx: Mutex::new(server_tx),
184 streaming_task_handler: Mutex::new(None),
185 state_report_task_handler: Mutex::new(None),
186 }),
187 }
188 }
189
190 async fn start_workers(&self) -> Result<(), Error> {
194 let manager_ref = self.inner.flow_service.dual_engine.clone();
195 let mut state_report_task_handler = self.inner.state_report_task_handler.lock().await;
196 if state_report_task_handler.is_none() {
197 *state_report_task_handler = manager_ref.clone().start_state_report_task().await;
198 }
199 drop(state_report_task_handler);
200 let handle = manager_ref
201 .streaming_engine()
202 .run_background(Some(self.inner.worker_shutdown_tx.lock().await.subscribe()));
203 self.inner
204 .streaming_task_handler
205 .lock()
206 .await
207 .replace(handle);
208
209 self.inner
210 .flow_service
211 .dual_engine
212 .start_flow_consistent_check_task()
213 .await?;
214
215 Ok(())
216 }
217
218 async fn stop_workers(&self) -> Result<(), Error> {
220 let tx = self.inner.worker_shutdown_tx.lock().await;
221 if tx.send(()).is_err() {
222 info!("Receiver dropped, the flow node server has already shutdown");
223 }
224 self.inner
227 .flow_service
228 .dual_engine
229 .stop_flow_consistent_check_task()
230 .await?;
231 Ok(())
232 }
233}
234
235impl FlownodeServer {
236 pub fn create_flow_service(&self) -> flow_server::FlowServer<impl flow_server::Flow> {
237 flow_server::FlowServer::new(self.inner.flow_service.clone())
238 .accept_compressed(CompressionEncoding::Gzip)
239 .send_compressed(CompressionEncoding::Gzip)
240 .accept_compressed(CompressionEncoding::Zstd)
241 .send_compressed(CompressionEncoding::Zstd)
242 }
243}
244
245pub struct FlownodeInstance {
247 flownode_server: FlownodeServer,
248 services: ServerHandlers,
249 heartbeat_task: Option<HeartbeatTask>,
250}
251
252impl FlownodeInstance {
253 pub async fn start(&mut self) -> Result<(), crate::Error> {
254 if let Some(task) = &self.heartbeat_task {
255 task.start().await?;
256 }
257
258 self.flownode_server.start_workers().await?;
259
260 self.services.start_all().await.context(StartServerSnafu)?;
261
262 Ok(())
263 }
264 pub async fn shutdown(&mut self) -> Result<(), Error> {
265 self.services
266 .shutdown_all()
267 .await
268 .context(ShutdownServerSnafu)?;
269
270 self.flownode_server.stop_workers().await?;
271
272 if let Some(task) = &self.heartbeat_task {
273 task.shutdown();
274 }
275
276 Ok(())
277 }
278
279 pub fn flownode_server(&self) -> &FlownodeServer {
280 &self.flownode_server
281 }
282
283 pub fn flow_engine(&self) -> FlowDualEngineRef {
284 self.flownode_server.inner.flow_service.dual_engine.clone()
285 }
286
287 pub fn setup_services(&mut self, services: ServerHandlers) {
288 self.services = services;
289 }
290}
291
292pub fn get_flow_auth_options(fn_opts: &FlownodeOptions) -> Result<Option<FlowAuthHeader>, Error> {
293 if let Some(user_provider) = fn_opts.user_provider.as_ref() {
294 let static_provider = auth::static_user_provider_from_option(user_provider)
295 .context(IllegalAuthConfigSnafu)?;
296
297 let (usr, pwd) = static_provider
298 .get_one_user_pwd()
299 .context(IllegalAuthConfigSnafu)?;
300 let auth_header = FlowAuthHeader::from_user_pwd(&usr, &pwd);
301 return Ok(Some(auth_header));
302 }
303
304 Ok(None)
305}
306
307pub struct FlownodeBuilder {
309 opts: FlownodeOptions,
310 plugins: Plugins,
311 table_meta: TableMetadataManagerRef,
312 catalog_manager: CatalogManagerRef,
313 flow_metadata_manager: FlowMetadataManagerRef,
314 heartbeat_task: Option<HeartbeatTask>,
315 state_report_handler: Option<StateReportHandler>,
317 frontend_client: Arc<FrontendClient>,
318}
319
320impl FlownodeBuilder {
321 pub fn new(
323 opts: FlownodeOptions,
324 plugins: Plugins,
325 table_meta: TableMetadataManagerRef,
326 catalog_manager: CatalogManagerRef,
327 flow_metadata_manager: FlowMetadataManagerRef,
328 frontend_client: Arc<FrontendClient>,
329 ) -> Self {
330 Self {
331 opts,
332 plugins,
333 table_meta,
334 catalog_manager,
335 flow_metadata_manager,
336 heartbeat_task: None,
337 state_report_handler: None,
338 frontend_client,
339 }
340 }
341
342 pub fn with_heartbeat_task(self, heartbeat_task: HeartbeatTask) -> Self {
343 let (sender, receiver) = SizeReportSender::new();
344 Self {
345 heartbeat_task: Some(heartbeat_task.with_query_stat_size(sender)),
346 state_report_handler: Some(receiver),
347 ..self
348 }
349 }
350
351 pub async fn build(mut self) -> Result<FlownodeInstance, Error> {
352 let query_engine_factory = QueryEngineFactory::new_with_plugins(
354 self.catalog_manager.clone(),
356 None,
357 None,
358 None,
359 None,
360 None,
361 false,
362 Default::default(),
363 self.opts.query.clone(),
364 );
365 let manager = Arc::new(
366 self.build_manager(query_engine_factory.query_engine())
367 .await?,
368 );
369 let batching = Arc::new(BatchingEngine::new(
370 self.frontend_client.clone(),
371 query_engine_factory.query_engine(),
372 self.flow_metadata_manager.clone(),
373 self.table_meta.clone(),
374 self.catalog_manager.clone(),
375 self.opts.flow.batching_mode.clone(),
376 ));
377 let dual = Arc::new(FlowDualEngine::new(
378 manager.clone(),
379 batching,
380 self.flow_metadata_manager.clone(),
381 self.catalog_manager.clone(),
382 self.plugins.clone(),
383 ));
384 if let Some(handler) = self.state_report_handler.take() {
385 dual.set_state_report_handler(handler).await;
386 }
387
388 let server = FlownodeServer::new(FlowService::new(dual));
389
390 let heartbeat_task = self.heartbeat_task;
391
392 let instance = FlownodeInstance {
393 flownode_server: server,
394 services: ServerHandlers::default(),
395 heartbeat_task,
396 };
397 Ok(instance)
398 }
399
400 async fn build_manager(
403 &mut self,
404 query_engine: Arc<dyn QueryEngine>,
405 ) -> Result<StreamingEngine, Error> {
406 let table_meta = self.table_meta.clone();
407
408 register_function_to_query_engine(&query_engine);
409
410 let num_workers = self.opts.flow.num_workers;
411
412 let node_id = self.opts.node_id.map(|id| id as u32);
413
414 let mut man = StreamingEngine::new(node_id, query_engine, table_meta);
415 for worker_id in 0..num_workers {
416 let (tx, rx) = oneshot::channel();
417
418 let _handle = std::thread::Builder::new()
419 .name(format!("flow-worker-{}", worker_id))
420 .spawn(move || {
421 let (handle, mut worker) = create_worker();
422 let _ = tx.send(handle);
423 info!("Flow Worker started in new thread");
424 worker.run();
425 });
426 let worker_handle = rx.await.map_err(|e| {
427 UnexpectedSnafu {
428 reason: format!("Failed to receive worker handle: {}", e),
429 }
430 .build()
431 })?;
432 man.add_worker_handle(worker_handle);
433 }
434 info!("Flow Node Manager started");
435 Ok(man)
436 }
437}
438
439pub struct FlownodeServiceBuilder<'a> {
441 opts: &'a FlownodeOptions,
442 grpc_server: Option<GrpcServer>,
443 enable_http_service: bool,
444}
445
446impl<'a> FlownodeServiceBuilder<'a> {
447 pub fn new(opts: &'a FlownodeOptions) -> Self {
448 Self {
449 opts,
450 grpc_server: None,
451 enable_http_service: false,
452 }
453 }
454
455 pub fn enable_http_service(self) -> Self {
456 Self {
457 enable_http_service: true,
458 ..self
459 }
460 }
461
462 pub fn with_grpc_server(self, grpc_server: GrpcServer) -> Self {
463 Self {
464 grpc_server: Some(grpc_server),
465 ..self
466 }
467 }
468
469 pub fn with_default_grpc_server(mut self, flownode_server: &FlownodeServer) -> Self {
470 let grpc_server = Self::grpc_server_builder(self.opts, flownode_server).build();
471 self.grpc_server = Some(grpc_server);
472 self
473 }
474
475 pub fn build(mut self) -> Result<ServerHandlers, Error> {
476 let handlers = ServerHandlers::default();
477 if let Some(grpc_server) = self.grpc_server.take() {
478 let addr: SocketAddr = self.opts.grpc.bind_addr.parse().context(ParseAddrSnafu {
479 addr: &self.opts.grpc.bind_addr,
480 })?;
481 let handler: ServerHandler = (Box::new(grpc_server), addr);
482 handlers.insert(handler);
483 }
484
485 if self.enable_http_service {
486 let http_server = HttpServerBuilder::new(self.opts.http.clone())
487 .with_metrics_handler(MetricsHandler)
488 .build();
489 let addr: SocketAddr = self.opts.http.addr.parse().context(ParseAddrSnafu {
490 addr: &self.opts.http.addr,
491 })?;
492 let handler: ServerHandler = (Box::new(http_server), addr);
493 handlers.insert(handler);
494 }
495 Ok(handlers)
496 }
497
498 pub fn grpc_server_builder(
499 opts: &FlownodeOptions,
500 flownode_server: &FlownodeServer,
501 ) -> GrpcServerBuilder {
502 let config = GrpcServerConfig {
503 max_recv_message_size: opts.grpc.max_recv_message_size.as_bytes() as usize,
504 max_send_message_size: opts.grpc.max_send_message_size.as_bytes() as usize,
505 tls: opts.grpc.tls.clone(),
506 max_connection_age: opts.grpc.max_connection_age,
507 };
508 let service = flownode_server.create_flow_service();
509 let runtime = common_runtime::global_runtime();
510 let mut builder = GrpcServerBuilder::new(config, runtime);
511 add_service!(builder, service);
512 builder
513 }
514}
515
516#[derive(Clone)]
521pub struct FrontendInvoker {
522 inserter: Arc<Inserter>,
523 deleter: Arc<Deleter>,
524 statement_executor: Arc<StatementExecutor>,
525}
526
527impl FrontendInvoker {
528 pub fn new(
529 inserter: Arc<Inserter>,
530 deleter: Arc<Deleter>,
531 statement_executor: Arc<StatementExecutor>,
532 ) -> Self {
533 Self {
534 inserter,
535 deleter,
536 statement_executor,
537 }
538 }
539
540 pub async fn build_from(
541 flow_streaming_engine: FlowStreamingEngineRef,
542 catalog_manager: CatalogManagerRef,
543 kv_backend: KvBackendRef,
544 layered_cache_registry: LayeredCacheRegistryRef,
545 procedure_executor: ProcedureExecutorRef,
546 node_manager: NodeManagerRef,
547 ) -> Result<FrontendInvoker, Error> {
548 let table_route_cache: TableRouteCacheRef =
549 layered_cache_registry.get().context(CacheRequiredSnafu {
550 name: TABLE_ROUTE_CACHE_NAME,
551 })?;
552 let partition_info_cache: PartitionInfoCacheRef =
553 layered_cache_registry.get().context(CacheRequiredSnafu {
554 name: PARTITION_INFO_CACHE_NAME,
555 })?;
556
557 let partition_manager = Arc::new(PartitionRuleManager::new(
558 kv_backend.clone(),
559 table_route_cache.clone(),
560 partition_info_cache.clone(),
561 ));
562
563 let table_flownode_cache: TableFlownodeSetCacheRef =
564 layered_cache_registry.get().context(CacheRequiredSnafu {
565 name: TABLE_FLOWNODE_SET_CACHE_NAME,
566 })?;
567
568 let inserter = Arc::new(Inserter::new(
569 catalog_manager.clone(),
570 partition_manager.clone(),
571 node_manager.clone(),
572 table_flownode_cache,
573 ));
574
575 let deleter = Arc::new(Deleter::new(
576 catalog_manager.clone(),
577 partition_manager.clone(),
578 node_manager.clone(),
579 ));
580
581 let query_engine = flow_streaming_engine.query_engine.clone();
582
583 let statement_executor = Arc::new(StatementExecutor::new(
584 catalog_manager.clone(),
585 query_engine.clone(),
586 procedure_executor.clone(),
587 kv_backend.clone(),
588 layered_cache_registry.clone(),
589 inserter.clone(),
590 partition_manager,
591 None,
592 ));
593
594 let invoker = FrontendInvoker::new(inserter, deleter, statement_executor);
595 Ok(invoker)
596 }
597}
598
599impl FrontendInvoker {
600 pub async fn row_inserts(
601 &self,
602 requests: RowInsertRequests,
603 ctx: QueryContextRef,
604 ) -> common_frontend::error::Result<Output> {
605 let _timer = METRIC_FLOW_PROCESSING_TIME
606 .with_label_values(&["output_insert"])
607 .start_timer();
608
609 self.inserter
610 .handle_row_inserts(requests, ctx, &self.statement_executor, false, false)
611 .await
612 .map_err(BoxedError::new)
613 .context(common_frontend::error::ExternalSnafu)
614 }
615
616 pub async fn row_deletes(
617 &self,
618 requests: RowDeleteRequests,
619 ctx: QueryContextRef,
620 ) -> common_frontend::error::Result<Output> {
621 let _timer = METRIC_FLOW_PROCESSING_TIME
622 .with_label_values(&["output_delete"])
623 .start_timer();
624
625 self.deleter
626 .handle_row_deletes(requests, ctx)
627 .await
628 .map_err(BoxedError::new)
629 .context(common_frontend::error::ExternalSnafu)
630 }
631
632 pub fn statement_executor(&self) -> Arc<StatementExecutor> {
633 self.statement_executor.clone()
634 }
635}
636
637pub(crate) async fn get_all_flow_ids(
639 flow_metadata_manager: &FlowMetadataManagerRef,
640 catalog_manager: &CatalogManagerRef,
641 nodeid: Option<u64>,
642) -> Result<Vec<u32>, Error> {
643 let ret = if let Some(nodeid) = nodeid {
644 let flow_ids_one_node = flow_metadata_manager
645 .flownode_flow_manager()
646 .flows(nodeid)
647 .try_collect::<Vec<_>>()
648 .await
649 .context(ListFlowsSnafu { id: Some(nodeid) })?;
650 flow_ids_one_node.into_iter().map(|(id, _)| id).collect()
651 } else {
652 let all_catalogs = catalog_manager
653 .catalog_names()
654 .await
655 .map_err(BoxedError::new)
656 .context(ExternalSnafu)?;
657 let mut all_flow_ids = vec![];
658 for catalog in all_catalogs {
659 let flows = flow_metadata_manager
660 .flow_name_manager()
661 .flow_names(&catalog)
662 .await
663 .try_collect::<Vec<_>>()
664 .await
665 .map_err(BoxedError::new)
666 .context(ExternalSnafu)?;
667
668 all_flow_ids.extend(flows.into_iter().map(|(_, id)| id.flow_id()));
669 }
670 all_flow_ids
671 };
672
673 Ok(ret)
674}
675
676#[cfg(test)]
677mod tests {
678 use std::sync::Arc;
679 use std::time::Duration;
680
681 use catalog::memory::new_memory_catalog_manager;
682 use common_base::Plugins;
683 use common_meta::key::TableMetadataManager;
684 use common_meta::key::flow::FlowMetadataManager;
685 use common_meta::kv_backend::memory::MemoryKvBackend;
686 use query::options::QueryOptions;
687
688 use super::*;
689 use crate::adapter::flownode_impl::FlowDualEngine;
690 use crate::batching_mode::BatchingModeOptions;
691 use crate::batching_mode::engine::BatchingEngine;
692 use crate::utils::SizeReportSender;
693
694 async fn new_test_flownode_server() -> (FlownodeServer, SizeReportSender) {
695 let kv_backend = Arc::new(MemoryKvBackend::new());
696 let table_meta = Arc::new(TableMetadataManager::new(kv_backend.clone()));
697 table_meta.init().await.unwrap();
698 let flow_meta = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
699 let catalog_manager = new_memory_catalog_manager().unwrap();
700 let query_engine = crate::test_utils::create_test_query_engine();
701
702 let streaming_engine = Arc::new(StreamingEngine::new(
703 None,
704 query_engine.clone(),
705 table_meta.clone(),
706 ));
707 let (frontend_client, _handler) =
708 FrontendClient::from_empty_grpc_handler(QueryOptions::default());
709 let batching_engine = Arc::new(BatchingEngine::new(
710 Arc::new(frontend_client),
711 query_engine,
712 flow_meta.clone(),
713 table_meta,
714 catalog_manager.clone(),
715 BatchingModeOptions::default(),
716 ));
717 let dual_engine = Arc::new(FlowDualEngine::new(
718 streaming_engine,
719 batching_engine,
720 flow_meta,
721 catalog_manager,
722 Plugins::new(),
723 ));
724
725 let (report_sender, report_handler) = SizeReportSender::new();
726 dual_engine.set_state_report_handler(report_handler).await;
727
728 let server = FlownodeServer::new(FlowService::new(dual_engine));
729 (server, report_sender)
730 }
731
732 #[tokio::test]
733 async fn test_state_report_handler_survives_worker_restart() {
734 let (server, report_sender) = new_test_flownode_server().await;
735
736 server.start_workers().await.unwrap();
737 report_sender.query(Duration::from_secs(3)).await.unwrap();
738
739 server.stop_workers().await.unwrap();
740 report_sender.query(Duration::from_secs(3)).await.unwrap();
741
742 server.start_workers().await.unwrap();
743 report_sender.query(Duration::from_secs(3)).await.unwrap();
744
745 server.stop_workers().await.unwrap();
746 }
747}