1use std::net::SocketAddr;
18use std::sync::Arc;
19
20use api::v1::{RowDeleteRequests, RowInsertRequests};
21use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME};
22use catalog::CatalogManagerRef;
23use common_base::Plugins;
24use common_error::ext::BoxedError;
25use common_meta::cache::{LayeredCacheRegistryRef, TableFlownodeSetCacheRef, TableRouteCacheRef};
26use common_meta::ddl::ProcedureExecutorRef;
27use common_meta::key::flow::FlowMetadataManagerRef;
28use common_meta::key::TableMetadataManagerRef;
29use common_meta::kv_backend::KvBackendRef;
30use common_meta::node_manager::{Flownode, NodeManagerRef};
31use common_query::Output;
32use common_runtime::JoinHandle;
33use common_telemetry::tracing::info;
34use futures::{FutureExt, TryStreamExt};
35use greptime_proto::v1::flow::{flow_server, FlowRequest, FlowResponse, InsertRequests};
36use itertools::Itertools;
37use operator::delete::Deleter;
38use operator::insert::Inserter;
39use operator::statement::StatementExecutor;
40use partition::manager::PartitionRuleManager;
41use query::{QueryEngine, QueryEngineFactory};
42use servers::error::{StartGrpcSnafu, TcpBindSnafu, TcpIncomingSnafu};
43use servers::http::HttpServerBuilder;
44use servers::metrics_handler::MetricsHandler;
45use servers::server::{ServerHandler, ServerHandlers};
46use session::context::QueryContextRef;
47use snafu::{OptionExt, ResultExt};
48use tokio::net::TcpListener;
49use tokio::sync::{broadcast, oneshot, Mutex};
50use tonic::codec::CompressionEncoding;
51use tonic::transport::server::TcpIncoming;
52use tonic::{Request, Response, Status};
53
54use crate::adapter::flownode_impl::{FlowDualEngine, FlowDualEngineRef};
55use crate::adapter::{create_worker, FlowStreamingEngineRef};
56use crate::batching_mode::engine::BatchingEngine;
57use crate::error::{
58 to_status_with_last_err, CacheRequiredSnafu, ExternalSnafu, IllegalAuthConfigSnafu,
59 ListFlowsSnafu, ParseAddrSnafu, ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu,
60};
61use crate::heartbeat::HeartbeatTask;
62use crate::metrics::{METRIC_FLOW_PROCESSING_TIME, METRIC_FLOW_ROWS};
63use crate::transform::register_function_to_query_engine;
64use crate::utils::{SizeReportSender, StateReportHandler};
65use crate::{Error, FlowAuthHeader, FlownodeOptions, FrontendClient, StreamingEngine};
66
67pub const FLOW_NODE_SERVER_NAME: &str = "FLOW_NODE_SERVER";
68#[derive(Clone)]
70pub struct FlowService {
71 pub dual_engine: FlowDualEngineRef,
72}
73
74impl FlowService {
75 pub fn new(manager: FlowDualEngineRef) -> Self {
76 Self {
77 dual_engine: manager,
78 }
79 }
80}
81
82#[async_trait::async_trait]
83impl flow_server::Flow for FlowService {
84 async fn handle_create_remove(
85 &self,
86 request: Request<FlowRequest>,
87 ) -> Result<Response<FlowResponse>, Status> {
88 let _timer = METRIC_FLOW_PROCESSING_TIME
89 .with_label_values(&["ddl"])
90 .start_timer();
91
92 let request = request.into_inner();
93 self.dual_engine
94 .handle(request)
95 .await
96 .map_err(|err| {
97 common_telemetry::error!(err; "Failed to handle flow request");
98 err
99 })
100 .map(Response::new)
101 .map_err(to_status_with_last_err)
102 }
103
104 async fn handle_mirror_request(
105 &self,
106 request: Request<InsertRequests>,
107 ) -> Result<Response<FlowResponse>, Status> {
108 let _timer = METRIC_FLOW_PROCESSING_TIME
109 .with_label_values(&["insert"])
110 .start_timer();
111
112 let request = request.into_inner();
113 let mut row_count = 0;
115 let request = api::v1::region::InsertRequests {
116 requests: request
117 .requests
118 .into_iter()
119 .map(|insert| {
120 insert.rows.as_ref().inspect(|x| row_count += x.rows.len());
121 api::v1::region::InsertRequest {
122 region_id: insert.region_id,
123 rows: insert.rows,
124 }
125 })
126 .collect_vec(),
127 };
128
129 METRIC_FLOW_ROWS
130 .with_label_values(&["in"])
131 .inc_by(row_count as u64);
132
133 self.dual_engine
134 .handle_inserts(request)
135 .await
136 .map(Response::new)
137 .map_err(to_status_with_last_err)
138 }
139}
140
141#[derive(Clone)]
142pub struct FlownodeServer {
143 inner: Arc<FlownodeServerInner>,
144}
145
146struct FlownodeServerInner {
150 worker_shutdown_tx: Mutex<broadcast::Sender<()>>,
152 server_shutdown_tx: Mutex<broadcast::Sender<()>>,
154 streaming_task_handler: Mutex<Option<JoinHandle<()>>>,
156 flow_service: FlowService,
157}
158
159impl FlownodeServer {
160 pub fn new(flow_service: FlowService) -> Self {
161 let (tx, _rx) = broadcast::channel::<()>(1);
162 let (server_tx, _server_rx) = broadcast::channel::<()>(1);
163 Self {
164 inner: Arc::new(FlownodeServerInner {
165 flow_service,
166 worker_shutdown_tx: Mutex::new(tx),
167 server_shutdown_tx: Mutex::new(server_tx),
168 streaming_task_handler: Mutex::new(None),
169 }),
170 }
171 }
172
173 async fn start_workers(&self) -> Result<(), Error> {
177 let manager_ref = self.inner.flow_service.dual_engine.clone();
178 let handle = manager_ref
179 .streaming_engine()
180 .run_background(Some(self.inner.worker_shutdown_tx.lock().await.subscribe()));
181 self.inner
182 .streaming_task_handler
183 .lock()
184 .await
185 .replace(handle);
186
187 self.inner
188 .flow_service
189 .dual_engine
190 .start_flow_consistent_check_task()
191 .await?;
192
193 Ok(())
194 }
195
196 async fn stop_workers(&self) -> Result<(), Error> {
198 let tx = self.inner.worker_shutdown_tx.lock().await;
199 if tx.send(()).is_err() {
200 info!("Receiver dropped, the flow node server has already shutdown");
201 }
202 self.inner
203 .flow_service
204 .dual_engine
205 .stop_flow_consistent_check_task()
206 .await?;
207 Ok(())
208 }
209}
210
211impl FlownodeServer {
212 pub fn create_flow_service(&self) -> flow_server::FlowServer<impl flow_server::Flow> {
213 flow_server::FlowServer::new(self.inner.flow_service.clone())
214 .accept_compressed(CompressionEncoding::Gzip)
215 .send_compressed(CompressionEncoding::Gzip)
216 .accept_compressed(CompressionEncoding::Zstd)
217 .send_compressed(CompressionEncoding::Zstd)
218 }
219}
220
221#[async_trait::async_trait]
222impl servers::server::Server for FlownodeServer {
223 async fn shutdown(&self) -> Result<(), servers::error::Error> {
224 let tx = self.inner.server_shutdown_tx.lock().await;
225 if tx.send(()).is_err() {
226 info!("Receiver dropped, the flow node server has already shutdown");
227 }
228 info!("Shutdown flow node server");
229
230 Ok(())
231 }
232
233 async fn start(&mut self, addr: SocketAddr) -> Result<(), servers::error::Error> {
234 let mut rx_server = self.inner.server_shutdown_tx.lock().await.subscribe();
235
236 let incoming = {
237 let listener = TcpListener::bind(addr)
238 .await
239 .context(TcpBindSnafu { addr })?;
240 let addr = listener.local_addr().context(TcpBindSnafu { addr })?;
241 let incoming =
242 TcpIncoming::from_listener(listener, true, None).context(TcpIncomingSnafu)?;
243 info!("flow server is bound to {}", addr);
244
245 incoming
246 };
247
248 let builder = tonic::transport::Server::builder().add_service(self.create_flow_service());
249
250 let _handle = common_runtime::spawn_global(async move {
251 let _result = builder
252 .serve_with_incoming_shutdown(incoming, rx_server.recv().map(drop))
253 .await
254 .context(StartGrpcSnafu);
255 });
256
257 Ok(())
258 }
259
260 fn name(&self) -> &str {
261 FLOW_NODE_SERVER_NAME
262 }
263}
264
265pub struct FlownodeInstance {
267 flownode_server: FlownodeServer,
268 services: ServerHandlers,
269 heartbeat_task: Option<HeartbeatTask>,
270}
271
272impl FlownodeInstance {
273 pub async fn start(&mut self) -> Result<(), crate::Error> {
274 if let Some(task) = &self.heartbeat_task {
275 task.start().await?;
276 }
277
278 self.flownode_server.start_workers().await?;
279
280 self.services.start_all().await.context(StartServerSnafu)?;
281
282 Ok(())
283 }
284 pub async fn shutdown(&mut self) -> Result<(), Error> {
285 self.services
286 .shutdown_all()
287 .await
288 .context(ShutdownServerSnafu)?;
289
290 self.flownode_server.stop_workers().await?;
291
292 if let Some(task) = &self.heartbeat_task {
293 task.shutdown();
294 }
295
296 Ok(())
297 }
298
299 pub fn flownode_server(&self) -> &FlownodeServer {
300 &self.flownode_server
301 }
302
303 pub fn flow_engine(&self) -> FlowDualEngineRef {
304 self.flownode_server.inner.flow_service.dual_engine.clone()
305 }
306
307 pub fn setup_services(&mut self, services: ServerHandlers) {
308 self.services = services;
309 }
310}
311
312pub fn get_flow_auth_options(fn_opts: &FlownodeOptions) -> Result<Option<FlowAuthHeader>, Error> {
313 if let Some(user_provider) = fn_opts.user_provider.as_ref() {
314 let static_provider = auth::static_user_provider_from_option(user_provider)
315 .context(IllegalAuthConfigSnafu)?;
316
317 let (usr, pwd) = static_provider
318 .get_one_user_pwd()
319 .context(IllegalAuthConfigSnafu)?;
320 let auth_header = FlowAuthHeader::from_user_pwd(&usr, &pwd);
321 return Ok(Some(auth_header));
322 }
323
324 Ok(None)
325}
326
327pub struct FlownodeBuilder {
329 opts: FlownodeOptions,
330 plugins: Plugins,
331 table_meta: TableMetadataManagerRef,
332 catalog_manager: CatalogManagerRef,
333 flow_metadata_manager: FlowMetadataManagerRef,
334 heartbeat_task: Option<HeartbeatTask>,
335 state_report_handler: Option<StateReportHandler>,
337 frontend_client: Arc<FrontendClient>,
338}
339
340impl FlownodeBuilder {
341 pub fn new(
343 opts: FlownodeOptions,
344 plugins: Plugins,
345 table_meta: TableMetadataManagerRef,
346 catalog_manager: CatalogManagerRef,
347 flow_metadata_manager: FlowMetadataManagerRef,
348 frontend_client: Arc<FrontendClient>,
349 ) -> Self {
350 Self {
351 opts,
352 plugins,
353 table_meta,
354 catalog_manager,
355 flow_metadata_manager,
356 heartbeat_task: None,
357 state_report_handler: None,
358 frontend_client,
359 }
360 }
361
362 pub fn with_heartbeat_task(self, heartbeat_task: HeartbeatTask) -> Self {
363 let (sender, receiver) = SizeReportSender::new();
364 Self {
365 heartbeat_task: Some(heartbeat_task.with_query_stat_size(sender)),
366 state_report_handler: Some(receiver),
367 ..self
368 }
369 }
370
371 pub async fn build(mut self) -> Result<FlownodeInstance, Error> {
372 let query_engine_factory = QueryEngineFactory::new_with_plugins(
374 self.catalog_manager.clone(),
376 None,
377 None,
378 None,
379 None,
380 false,
381 Default::default(),
382 self.opts.query.clone(),
383 );
384 let manager = Arc::new(
385 self.build_manager(query_engine_factory.query_engine())
386 .await?,
387 );
388 let batching = Arc::new(BatchingEngine::new(
389 self.frontend_client.clone(),
390 query_engine_factory.query_engine(),
391 self.flow_metadata_manager.clone(),
392 self.table_meta.clone(),
393 self.catalog_manager.clone(),
394 ));
395 let dual = FlowDualEngine::new(
396 manager.clone(),
397 batching,
398 self.flow_metadata_manager.clone(),
399 self.catalog_manager.clone(),
400 self.plugins.clone(),
401 );
402
403 let server = FlownodeServer::new(FlowService::new(Arc::new(dual)));
404
405 let heartbeat_task = self.heartbeat_task;
406
407 let instance = FlownodeInstance {
408 flownode_server: server,
409 services: ServerHandlers::default(),
410 heartbeat_task,
411 };
412 Ok(instance)
413 }
414
415 async fn build_manager(
418 &mut self,
419 query_engine: Arc<dyn QueryEngine>,
420 ) -> Result<StreamingEngine, Error> {
421 let table_meta = self.table_meta.clone();
422
423 register_function_to_query_engine(&query_engine);
424
425 let num_workers = self.opts.flow.num_workers;
426
427 let node_id = self.opts.node_id.map(|id| id as u32);
428
429 let mut man = StreamingEngine::new(node_id, query_engine, table_meta);
430 for worker_id in 0..num_workers {
431 let (tx, rx) = oneshot::channel();
432
433 let _handle = std::thread::Builder::new()
434 .name(format!("flow-worker-{}", worker_id))
435 .spawn(move || {
436 let (handle, mut worker) = create_worker();
437 let _ = tx.send(handle);
438 info!("Flow Worker started in new thread");
439 worker.run();
440 });
441 let worker_handle = rx.await.map_err(|e| {
442 UnexpectedSnafu {
443 reason: format!("Failed to receive worker handle: {}", e),
444 }
445 .build()
446 })?;
447 man.add_worker_handle(worker_handle);
448 }
449 if let Some(handler) = self.state_report_handler.take() {
450 man = man.with_state_report_handler(handler).await;
451 }
452 info!("Flow Node Manager started");
453 Ok(man)
454 }
455}
456
457pub struct FlownodeServiceBuilder<'a> {
459 opts: &'a FlownodeOptions,
460 grpc_server: Option<FlownodeServer>,
461 enable_http_service: bool,
462}
463
464impl<'a> FlownodeServiceBuilder<'a> {
465 pub fn new(opts: &'a FlownodeOptions) -> Self {
466 Self {
467 opts,
468 grpc_server: None,
469 enable_http_service: false,
470 }
471 }
472
473 pub fn enable_http_service(self) -> Self {
474 Self {
475 enable_http_service: true,
476 ..self
477 }
478 }
479
480 pub fn with_grpc_server(self, grpc_server: FlownodeServer) -> Self {
481 Self {
482 grpc_server: Some(grpc_server),
483 ..self
484 }
485 }
486
487 pub fn build(mut self) -> Result<ServerHandlers, Error> {
488 let handlers = ServerHandlers::default();
489 if let Some(grpc_server) = self.grpc_server.take() {
490 let addr: SocketAddr = self.opts.grpc.bind_addr.parse().context(ParseAddrSnafu {
491 addr: &self.opts.grpc.bind_addr,
492 })?;
493 let handler: ServerHandler = (Box::new(grpc_server), addr);
494 handlers.insert(handler);
495 }
496
497 if self.enable_http_service {
498 let http_server = HttpServerBuilder::new(self.opts.http.clone())
499 .with_metrics_handler(MetricsHandler)
500 .build();
501 let addr: SocketAddr = self.opts.http.addr.parse().context(ParseAddrSnafu {
502 addr: &self.opts.http.addr,
503 })?;
504 let handler: ServerHandler = (Box::new(http_server), addr);
505 handlers.insert(handler);
506 }
507 Ok(handlers)
508 }
509}
510
511#[derive(Clone)]
516pub struct FrontendInvoker {
517 inserter: Arc<Inserter>,
518 deleter: Arc<Deleter>,
519 statement_executor: Arc<StatementExecutor>,
520}
521
522impl FrontendInvoker {
523 pub fn new(
524 inserter: Arc<Inserter>,
525 deleter: Arc<Deleter>,
526 statement_executor: Arc<StatementExecutor>,
527 ) -> Self {
528 Self {
529 inserter,
530 deleter,
531 statement_executor,
532 }
533 }
534
535 pub async fn build_from(
536 flow_streaming_engine: FlowStreamingEngineRef,
537 catalog_manager: CatalogManagerRef,
538 kv_backend: KvBackendRef,
539 layered_cache_registry: LayeredCacheRegistryRef,
540 procedure_executor: ProcedureExecutorRef,
541 node_manager: NodeManagerRef,
542 ) -> Result<FrontendInvoker, Error> {
543 let table_route_cache: TableRouteCacheRef =
544 layered_cache_registry.get().context(CacheRequiredSnafu {
545 name: TABLE_ROUTE_CACHE_NAME,
546 })?;
547
548 let partition_manager = Arc::new(PartitionRuleManager::new(
549 kv_backend.clone(),
550 table_route_cache.clone(),
551 ));
552
553 let table_flownode_cache: TableFlownodeSetCacheRef =
554 layered_cache_registry.get().context(CacheRequiredSnafu {
555 name: TABLE_FLOWNODE_SET_CACHE_NAME,
556 })?;
557
558 let inserter = Arc::new(Inserter::new(
559 catalog_manager.clone(),
560 partition_manager.clone(),
561 node_manager.clone(),
562 table_flownode_cache,
563 ));
564
565 let deleter = Arc::new(Deleter::new(
566 catalog_manager.clone(),
567 partition_manager.clone(),
568 node_manager.clone(),
569 ));
570
571 let query_engine = flow_streaming_engine.query_engine.clone();
572
573 let statement_executor = Arc::new(StatementExecutor::new(
574 catalog_manager.clone(),
575 query_engine.clone(),
576 procedure_executor.clone(),
577 kv_backend.clone(),
578 layered_cache_registry.clone(),
579 inserter.clone(),
580 table_route_cache,
581 ));
582
583 let invoker = FrontendInvoker::new(inserter, deleter, statement_executor);
584 Ok(invoker)
585 }
586}
587
588impl FrontendInvoker {
589 pub async fn row_inserts(
590 &self,
591 requests: RowInsertRequests,
592 ctx: QueryContextRef,
593 ) -> common_frontend::error::Result<Output> {
594 let _timer = METRIC_FLOW_PROCESSING_TIME
595 .with_label_values(&["output_insert"])
596 .start_timer();
597
598 self.inserter
599 .handle_row_inserts(requests, ctx, &self.statement_executor)
600 .await
601 .map_err(BoxedError::new)
602 .context(common_frontend::error::ExternalSnafu)
603 }
604
605 pub async fn row_deletes(
606 &self,
607 requests: RowDeleteRequests,
608 ctx: QueryContextRef,
609 ) -> common_frontend::error::Result<Output> {
610 let _timer = METRIC_FLOW_PROCESSING_TIME
611 .with_label_values(&["output_delete"])
612 .start_timer();
613
614 self.deleter
615 .handle_row_deletes(requests, ctx)
616 .await
617 .map_err(BoxedError::new)
618 .context(common_frontend::error::ExternalSnafu)
619 }
620
621 pub fn statement_executor(&self) -> Arc<StatementExecutor> {
622 self.statement_executor.clone()
623 }
624}
625
626pub(crate) async fn get_all_flow_ids(
628 flow_metadata_manager: &FlowMetadataManagerRef,
629 catalog_manager: &CatalogManagerRef,
630 nodeid: Option<u64>,
631) -> Result<Vec<u32>, Error> {
632 let ret = if let Some(nodeid) = nodeid {
633 let flow_ids_one_node = flow_metadata_manager
634 .flownode_flow_manager()
635 .flows(nodeid)
636 .try_collect::<Vec<_>>()
637 .await
638 .context(ListFlowsSnafu { id: Some(nodeid) })?;
639 flow_ids_one_node.into_iter().map(|(id, _)| id).collect()
640 } else {
641 let all_catalogs = catalog_manager
642 .catalog_names()
643 .await
644 .map_err(BoxedError::new)
645 .context(ExternalSnafu)?;
646 let mut all_flow_ids = vec![];
647 for catalog in all_catalogs {
648 let flows = flow_metadata_manager
649 .flow_name_manager()
650 .flow_names(&catalog)
651 .await
652 .try_collect::<Vec<_>>()
653 .await
654 .map_err(BoxedError::new)
655 .context(ExternalSnafu)?;
656
657 all_flow_ids.extend(flows.into_iter().map(|(_, id)| id.flow_id()));
658 }
659 all_flow_ids
660 };
661
662 Ok(ret)
663}