1use std::net::SocketAddr;
18use std::sync::Arc;
19
20use api::v1::flow::DirtyWindowRequests;
21use api::v1::{RowDeleteRequests, RowInsertRequests};
22use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME};
23use catalog::CatalogManagerRef;
24use common_base::Plugins;
25use common_error::ext::BoxedError;
26use common_meta::cache::{LayeredCacheRegistryRef, TableFlownodeSetCacheRef, TableRouteCacheRef};
27use common_meta::key::TableMetadataManagerRef;
28use common_meta::key::flow::FlowMetadataManagerRef;
29use common_meta::kv_backend::KvBackendRef;
30use common_meta::node_manager::{Flownode, NodeManagerRef};
31use common_meta::procedure_executor::ProcedureExecutorRef;
32use common_query::Output;
33use common_runtime::JoinHandle;
34use common_telemetry::tracing::info;
35use futures::TryStreamExt;
36use greptime_proto::v1::flow::{FlowRequest, FlowResponse, InsertRequests, flow_server};
37use itertools::Itertools;
38use operator::delete::Deleter;
39use operator::insert::Inserter;
40use operator::statement::StatementExecutor;
41use partition::manager::PartitionRuleManager;
42use query::{QueryEngine, QueryEngineFactory};
43use servers::add_service;
44use servers::grpc::builder::GrpcServerBuilder;
45use servers::grpc::{GrpcServer, GrpcServerConfig};
46use servers::http::HttpServerBuilder;
47use servers::metrics_handler::MetricsHandler;
48use servers::server::{ServerHandler, ServerHandlers};
49use session::context::QueryContextRef;
50use snafu::{OptionExt, ResultExt};
51use tokio::sync::{Mutex, broadcast, oneshot};
52use tonic::codec::CompressionEncoding;
53use tonic::{Request, Response, Status};
54
55use crate::adapter::flownode_impl::{FlowDualEngine, FlowDualEngineRef};
56use crate::adapter::{FlowStreamingEngineRef, create_worker};
57use crate::batching_mode::engine::BatchingEngine;
58use crate::error::{
59 CacheRequiredSnafu, ExternalSnafu, IllegalAuthConfigSnafu, ListFlowsSnafu, ParseAddrSnafu,
60 ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu, to_status_with_last_err,
61};
62use crate::heartbeat::HeartbeatTask;
63use crate::metrics::{METRIC_FLOW_PROCESSING_TIME, METRIC_FLOW_ROWS};
64use crate::transform::register_function_to_query_engine;
65use crate::utils::{SizeReportSender, StateReportHandler};
66use crate::{Error, FlowAuthHeader, FlownodeOptions, FrontendClient, StreamingEngine};
67
68pub const FLOW_NODE_SERVER_NAME: &str = "FLOW_NODE_SERVER";
69#[derive(Clone)]
71pub struct FlowService {
72 pub dual_engine: FlowDualEngineRef,
73}
74
75impl FlowService {
76 pub fn new(manager: FlowDualEngineRef) -> Self {
77 Self {
78 dual_engine: manager,
79 }
80 }
81}
82
83#[async_trait::async_trait]
84impl flow_server::Flow for FlowService {
85 async fn handle_create_remove(
86 &self,
87 request: Request<FlowRequest>,
88 ) -> Result<Response<FlowResponse>, Status> {
89 let _timer = METRIC_FLOW_PROCESSING_TIME
90 .with_label_values(&["ddl"])
91 .start_timer();
92
93 let request = request.into_inner();
94 self.dual_engine
95 .handle(request)
96 .await
97 .map_err(|err| {
98 common_telemetry::error!(err; "Failed to handle flow request");
99 err
100 })
101 .map(Response::new)
102 .map_err(to_status_with_last_err)
103 }
104
105 async fn handle_mirror_request(
106 &self,
107 request: Request<InsertRequests>,
108 ) -> Result<Response<FlowResponse>, Status> {
109 let _timer = METRIC_FLOW_PROCESSING_TIME
110 .with_label_values(&["insert"])
111 .start_timer();
112
113 let request = request.into_inner();
114 let mut row_count = 0;
116 let request = api::v1::region::InsertRequests {
117 requests: request
118 .requests
119 .into_iter()
120 .map(|insert| {
121 insert.rows.as_ref().inspect(|x| row_count += x.rows.len());
122 api::v1::region::InsertRequest {
123 region_id: insert.region_id,
124 rows: insert.rows,
125 }
126 })
127 .collect_vec(),
128 };
129
130 METRIC_FLOW_ROWS
131 .with_label_values(&["in"])
132 .inc_by(row_count as u64);
133
134 self.dual_engine
135 .handle_inserts(request)
136 .await
137 .map(Response::new)
138 .map_err(to_status_with_last_err)
139 }
140
141 async fn handle_mark_dirty_time_window(
142 &self,
143 reqs: Request<DirtyWindowRequests>,
144 ) -> Result<Response<FlowResponse>, Status> {
145 self.dual_engine
146 .handle_mark_window_dirty(reqs.into_inner())
147 .await
148 .map(Response::new)
149 .map_err(to_status_with_last_err)
150 }
151}
152
153#[derive(Clone)]
154pub struct FlownodeServer {
155 inner: Arc<FlownodeServerInner>,
156}
157
158struct FlownodeServerInner {
162 worker_shutdown_tx: Mutex<broadcast::Sender<()>>,
164 server_shutdown_tx: Mutex<broadcast::Sender<()>>,
166 streaming_task_handler: Mutex<Option<JoinHandle<()>>>,
168 flow_service: FlowService,
169}
170
171impl FlownodeServer {
172 pub fn new(flow_service: FlowService) -> Self {
173 let (tx, _rx) = broadcast::channel::<()>(1);
174 let (server_tx, _server_rx) = broadcast::channel::<()>(1);
175 Self {
176 inner: Arc::new(FlownodeServerInner {
177 flow_service,
178 worker_shutdown_tx: Mutex::new(tx),
179 server_shutdown_tx: Mutex::new(server_tx),
180 streaming_task_handler: Mutex::new(None),
181 }),
182 }
183 }
184
185 async fn start_workers(&self) -> Result<(), Error> {
189 let manager_ref = self.inner.flow_service.dual_engine.clone();
190 let handle = manager_ref
191 .streaming_engine()
192 .run_background(Some(self.inner.worker_shutdown_tx.lock().await.subscribe()));
193 self.inner
194 .streaming_task_handler
195 .lock()
196 .await
197 .replace(handle);
198
199 self.inner
200 .flow_service
201 .dual_engine
202 .start_flow_consistent_check_task()
203 .await?;
204
205 Ok(())
206 }
207
208 async fn stop_workers(&self) -> Result<(), Error> {
210 let tx = self.inner.worker_shutdown_tx.lock().await;
211 if tx.send(()).is_err() {
212 info!("Receiver dropped, the flow node server has already shutdown");
213 }
214 self.inner
215 .flow_service
216 .dual_engine
217 .stop_flow_consistent_check_task()
218 .await?;
219 Ok(())
220 }
221}
222
223impl FlownodeServer {
224 pub fn create_flow_service(&self) -> flow_server::FlowServer<impl flow_server::Flow> {
225 flow_server::FlowServer::new(self.inner.flow_service.clone())
226 .accept_compressed(CompressionEncoding::Gzip)
227 .send_compressed(CompressionEncoding::Gzip)
228 .accept_compressed(CompressionEncoding::Zstd)
229 .send_compressed(CompressionEncoding::Zstd)
230 }
231}
232
233pub struct FlownodeInstance {
235 flownode_server: FlownodeServer,
236 services: ServerHandlers,
237 heartbeat_task: Option<HeartbeatTask>,
238}
239
240impl FlownodeInstance {
241 pub async fn start(&mut self) -> Result<(), crate::Error> {
242 if let Some(task) = &self.heartbeat_task {
243 task.start().await?;
244 }
245
246 self.flownode_server.start_workers().await?;
247
248 self.services.start_all().await.context(StartServerSnafu)?;
249
250 Ok(())
251 }
252 pub async fn shutdown(&mut self) -> Result<(), Error> {
253 self.services
254 .shutdown_all()
255 .await
256 .context(ShutdownServerSnafu)?;
257
258 self.flownode_server.stop_workers().await?;
259
260 if let Some(task) = &self.heartbeat_task {
261 task.shutdown();
262 }
263
264 Ok(())
265 }
266
267 pub fn flownode_server(&self) -> &FlownodeServer {
268 &self.flownode_server
269 }
270
271 pub fn flow_engine(&self) -> FlowDualEngineRef {
272 self.flownode_server.inner.flow_service.dual_engine.clone()
273 }
274
275 pub fn setup_services(&mut self, services: ServerHandlers) {
276 self.services = services;
277 }
278}
279
280pub fn get_flow_auth_options(fn_opts: &FlownodeOptions) -> Result<Option<FlowAuthHeader>, Error> {
281 if let Some(user_provider) = fn_opts.user_provider.as_ref() {
282 let static_provider = auth::static_user_provider_from_option(user_provider)
283 .context(IllegalAuthConfigSnafu)?;
284
285 let (usr, pwd) = static_provider
286 .get_one_user_pwd()
287 .context(IllegalAuthConfigSnafu)?;
288 let auth_header = FlowAuthHeader::from_user_pwd(&usr, &pwd);
289 return Ok(Some(auth_header));
290 }
291
292 Ok(None)
293}
294
295pub struct FlownodeBuilder {
297 opts: FlownodeOptions,
298 plugins: Plugins,
299 table_meta: TableMetadataManagerRef,
300 catalog_manager: CatalogManagerRef,
301 flow_metadata_manager: FlowMetadataManagerRef,
302 heartbeat_task: Option<HeartbeatTask>,
303 state_report_handler: Option<StateReportHandler>,
305 frontend_client: Arc<FrontendClient>,
306}
307
308impl FlownodeBuilder {
309 pub fn new(
311 opts: FlownodeOptions,
312 plugins: Plugins,
313 table_meta: TableMetadataManagerRef,
314 catalog_manager: CatalogManagerRef,
315 flow_metadata_manager: FlowMetadataManagerRef,
316 frontend_client: Arc<FrontendClient>,
317 ) -> Self {
318 Self {
319 opts,
320 plugins,
321 table_meta,
322 catalog_manager,
323 flow_metadata_manager,
324 heartbeat_task: None,
325 state_report_handler: None,
326 frontend_client,
327 }
328 }
329
330 pub fn with_heartbeat_task(self, heartbeat_task: HeartbeatTask) -> Self {
331 let (sender, receiver) = SizeReportSender::new();
332 Self {
333 heartbeat_task: Some(heartbeat_task.with_query_stat_size(sender)),
334 state_report_handler: Some(receiver),
335 ..self
336 }
337 }
338
339 pub async fn build(mut self) -> Result<FlownodeInstance, Error> {
340 let query_engine_factory = QueryEngineFactory::new_with_plugins(
342 self.catalog_manager.clone(),
344 None,
345 None,
346 None,
347 None,
348 None,
349 false,
350 Default::default(),
351 self.opts.query.clone(),
352 );
353 let manager = Arc::new(
354 self.build_manager(query_engine_factory.query_engine())
355 .await?,
356 );
357 let batching = Arc::new(BatchingEngine::new(
358 self.frontend_client.clone(),
359 query_engine_factory.query_engine(),
360 self.flow_metadata_manager.clone(),
361 self.table_meta.clone(),
362 self.catalog_manager.clone(),
363 self.opts.flow.batching_mode.clone(),
364 ));
365 let dual = FlowDualEngine::new(
366 manager.clone(),
367 batching,
368 self.flow_metadata_manager.clone(),
369 self.catalog_manager.clone(),
370 self.plugins.clone(),
371 );
372
373 let server = FlownodeServer::new(FlowService::new(Arc::new(dual)));
374
375 let heartbeat_task = self.heartbeat_task;
376
377 let instance = FlownodeInstance {
378 flownode_server: server,
379 services: ServerHandlers::default(),
380 heartbeat_task,
381 };
382 Ok(instance)
383 }
384
385 async fn build_manager(
388 &mut self,
389 query_engine: Arc<dyn QueryEngine>,
390 ) -> Result<StreamingEngine, Error> {
391 let table_meta = self.table_meta.clone();
392
393 register_function_to_query_engine(&query_engine);
394
395 let num_workers = self.opts.flow.num_workers;
396
397 let node_id = self.opts.node_id.map(|id| id as u32);
398
399 let mut man = StreamingEngine::new(node_id, query_engine, table_meta);
400 for worker_id in 0..num_workers {
401 let (tx, rx) = oneshot::channel();
402
403 let _handle = std::thread::Builder::new()
404 .name(format!("flow-worker-{}", worker_id))
405 .spawn(move || {
406 let (handle, mut worker) = create_worker();
407 let _ = tx.send(handle);
408 info!("Flow Worker started in new thread");
409 worker.run();
410 });
411 let worker_handle = rx.await.map_err(|e| {
412 UnexpectedSnafu {
413 reason: format!("Failed to receive worker handle: {}", e),
414 }
415 .build()
416 })?;
417 man.add_worker_handle(worker_handle);
418 }
419 if let Some(handler) = self.state_report_handler.take() {
420 man = man.with_state_report_handler(handler).await;
421 }
422 info!("Flow Node Manager started");
423 Ok(man)
424 }
425}
426
427pub struct FlownodeServiceBuilder<'a> {
429 opts: &'a FlownodeOptions,
430 grpc_server: Option<GrpcServer>,
431 enable_http_service: bool,
432}
433
434impl<'a> FlownodeServiceBuilder<'a> {
435 pub fn new(opts: &'a FlownodeOptions) -> Self {
436 Self {
437 opts,
438 grpc_server: None,
439 enable_http_service: false,
440 }
441 }
442
443 pub fn enable_http_service(self) -> Self {
444 Self {
445 enable_http_service: true,
446 ..self
447 }
448 }
449
450 pub fn with_grpc_server(self, grpc_server: GrpcServer) -> Self {
451 Self {
452 grpc_server: Some(grpc_server),
453 ..self
454 }
455 }
456
457 pub fn with_default_grpc_server(mut self, flownode_server: &FlownodeServer) -> Self {
458 let grpc_server = Self::grpc_server_builder(self.opts, flownode_server).build();
459 self.grpc_server = Some(grpc_server);
460 self
461 }
462
463 pub fn build(mut self) -> Result<ServerHandlers, Error> {
464 let handlers = ServerHandlers::default();
465 if let Some(grpc_server) = self.grpc_server.take() {
466 let addr: SocketAddr = self.opts.grpc.bind_addr.parse().context(ParseAddrSnafu {
467 addr: &self.opts.grpc.bind_addr,
468 })?;
469 let handler: ServerHandler = (Box::new(grpc_server), addr);
470 handlers.insert(handler);
471 }
472
473 if self.enable_http_service {
474 let http_server = HttpServerBuilder::new(self.opts.http.clone())
475 .with_metrics_handler(MetricsHandler)
476 .build();
477 let addr: SocketAddr = self.opts.http.addr.parse().context(ParseAddrSnafu {
478 addr: &self.opts.http.addr,
479 })?;
480 let handler: ServerHandler = (Box::new(http_server), addr);
481 handlers.insert(handler);
482 }
483 Ok(handlers)
484 }
485
486 pub fn grpc_server_builder(
487 opts: &FlownodeOptions,
488 flownode_server: &FlownodeServer,
489 ) -> GrpcServerBuilder {
490 let config = GrpcServerConfig {
491 max_recv_message_size: opts.grpc.max_recv_message_size.as_bytes() as usize,
492 max_send_message_size: opts.grpc.max_send_message_size.as_bytes() as usize,
493 tls: opts.grpc.tls.clone(),
494 max_connection_age: opts.grpc.max_connection_age,
495 };
496 let service = flownode_server.create_flow_service();
497 let runtime = common_runtime::global_runtime();
498 let mut builder = GrpcServerBuilder::new(config, runtime);
499 add_service!(builder, service);
500 builder
501 }
502}
503
504#[derive(Clone)]
509pub struct FrontendInvoker {
510 inserter: Arc<Inserter>,
511 deleter: Arc<Deleter>,
512 statement_executor: Arc<StatementExecutor>,
513}
514
515impl FrontendInvoker {
516 pub fn new(
517 inserter: Arc<Inserter>,
518 deleter: Arc<Deleter>,
519 statement_executor: Arc<StatementExecutor>,
520 ) -> Self {
521 Self {
522 inserter,
523 deleter,
524 statement_executor,
525 }
526 }
527
528 pub async fn build_from(
529 flow_streaming_engine: FlowStreamingEngineRef,
530 catalog_manager: CatalogManagerRef,
531 kv_backend: KvBackendRef,
532 layered_cache_registry: LayeredCacheRegistryRef,
533 procedure_executor: ProcedureExecutorRef,
534 node_manager: NodeManagerRef,
535 ) -> Result<FrontendInvoker, Error> {
536 let table_route_cache: TableRouteCacheRef =
537 layered_cache_registry.get().context(CacheRequiredSnafu {
538 name: TABLE_ROUTE_CACHE_NAME,
539 })?;
540
541 let partition_manager = Arc::new(PartitionRuleManager::new(
542 kv_backend.clone(),
543 table_route_cache.clone(),
544 ));
545
546 let table_flownode_cache: TableFlownodeSetCacheRef =
547 layered_cache_registry.get().context(CacheRequiredSnafu {
548 name: TABLE_FLOWNODE_SET_CACHE_NAME,
549 })?;
550
551 let inserter = Arc::new(Inserter::new(
552 catalog_manager.clone(),
553 partition_manager.clone(),
554 node_manager.clone(),
555 table_flownode_cache,
556 ));
557
558 let deleter = Arc::new(Deleter::new(
559 catalog_manager.clone(),
560 partition_manager.clone(),
561 node_manager.clone(),
562 ));
563
564 let query_engine = flow_streaming_engine.query_engine.clone();
565
566 let statement_executor = Arc::new(StatementExecutor::new(
567 catalog_manager.clone(),
568 query_engine.clone(),
569 procedure_executor.clone(),
570 kv_backend.clone(),
571 layered_cache_registry.clone(),
572 inserter.clone(),
573 table_route_cache,
574 None,
575 ));
576
577 let invoker = FrontendInvoker::new(inserter, deleter, statement_executor);
578 Ok(invoker)
579 }
580}
581
582impl FrontendInvoker {
583 pub async fn row_inserts(
584 &self,
585 requests: RowInsertRequests,
586 ctx: QueryContextRef,
587 ) -> common_frontend::error::Result<Output> {
588 let _timer = METRIC_FLOW_PROCESSING_TIME
589 .with_label_values(&["output_insert"])
590 .start_timer();
591
592 self.inserter
593 .handle_row_inserts(requests, ctx, &self.statement_executor, false, false)
594 .await
595 .map_err(BoxedError::new)
596 .context(common_frontend::error::ExternalSnafu)
597 }
598
599 pub async fn row_deletes(
600 &self,
601 requests: RowDeleteRequests,
602 ctx: QueryContextRef,
603 ) -> common_frontend::error::Result<Output> {
604 let _timer = METRIC_FLOW_PROCESSING_TIME
605 .with_label_values(&["output_delete"])
606 .start_timer();
607
608 self.deleter
609 .handle_row_deletes(requests, ctx)
610 .await
611 .map_err(BoxedError::new)
612 .context(common_frontend::error::ExternalSnafu)
613 }
614
615 pub fn statement_executor(&self) -> Arc<StatementExecutor> {
616 self.statement_executor.clone()
617 }
618}
619
620pub(crate) async fn get_all_flow_ids(
622 flow_metadata_manager: &FlowMetadataManagerRef,
623 catalog_manager: &CatalogManagerRef,
624 nodeid: Option<u64>,
625) -> Result<Vec<u32>, Error> {
626 let ret = if let Some(nodeid) = nodeid {
627 let flow_ids_one_node = flow_metadata_manager
628 .flownode_flow_manager()
629 .flows(nodeid)
630 .try_collect::<Vec<_>>()
631 .await
632 .context(ListFlowsSnafu { id: Some(nodeid) })?;
633 flow_ids_one_node.into_iter().map(|(id, _)| id).collect()
634 } else {
635 let all_catalogs = catalog_manager
636 .catalog_names()
637 .await
638 .map_err(BoxedError::new)
639 .context(ExternalSnafu)?;
640 let mut all_flow_ids = vec![];
641 for catalog in all_catalogs {
642 let flows = flow_metadata_manager
643 .flow_name_manager()
644 .flow_names(&catalog)
645 .await
646 .try_collect::<Vec<_>>()
647 .await
648 .map_err(BoxedError::new)
649 .context(ExternalSnafu)?;
650
651 all_flow_ids.extend(flows.into_iter().map(|(_, id)| id.flow_id()));
652 }
653 all_flow_ids
654 };
655
656 Ok(ret)
657}