flow/
server.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Implementation of grpc service for flow node
16
17use std::net::SocketAddr;
18use std::sync::Arc;
19
20use api::v1::{RowDeleteRequests, RowInsertRequests};
21use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME};
22use catalog::CatalogManagerRef;
23use common_base::Plugins;
24use common_error::ext::BoxedError;
25use common_meta::cache::{LayeredCacheRegistryRef, TableFlownodeSetCacheRef, TableRouteCacheRef};
26use common_meta::ddl::ProcedureExecutorRef;
27use common_meta::key::flow::FlowMetadataManagerRef;
28use common_meta::key::TableMetadataManagerRef;
29use common_meta::kv_backend::KvBackendRef;
30use common_meta::node_manager::{Flownode, NodeManagerRef};
31use common_query::Output;
32use common_runtime::JoinHandle;
33use common_telemetry::tracing::info;
34use futures::{FutureExt, TryStreamExt};
35use greptime_proto::v1::flow::{flow_server, FlowRequest, FlowResponse, InsertRequests};
36use itertools::Itertools;
37use operator::delete::Deleter;
38use operator::insert::Inserter;
39use operator::statement::StatementExecutor;
40use partition::manager::PartitionRuleManager;
41use query::{QueryEngine, QueryEngineFactory};
42use servers::error::{StartGrpcSnafu, TcpBindSnafu, TcpIncomingSnafu};
43use servers::http::HttpServerBuilder;
44use servers::metrics_handler::MetricsHandler;
45use servers::server::{ServerHandler, ServerHandlers};
46use session::context::QueryContextRef;
47use snafu::{OptionExt, ResultExt};
48use tokio::net::TcpListener;
49use tokio::sync::{broadcast, oneshot, Mutex};
50use tonic::codec::CompressionEncoding;
51use tonic::transport::server::TcpIncoming;
52use tonic::{Request, Response, Status};
53
54use crate::adapter::flownode_impl::{FlowDualEngine, FlowDualEngineRef};
55use crate::adapter::{create_worker, FlowStreamingEngineRef};
56use crate::batching_mode::engine::BatchingEngine;
57use crate::error::{
58    to_status_with_last_err, CacheRequiredSnafu, ExternalSnafu, IllegalAuthConfigSnafu,
59    ListFlowsSnafu, ParseAddrSnafu, ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu,
60};
61use crate::heartbeat::HeartbeatTask;
62use crate::metrics::{METRIC_FLOW_PROCESSING_TIME, METRIC_FLOW_ROWS};
63use crate::transform::register_function_to_query_engine;
64use crate::utils::{SizeReportSender, StateReportHandler};
65use crate::{Error, FlowAuthHeader, FlownodeOptions, FrontendClient, StreamingEngine};
66
67pub const FLOW_NODE_SERVER_NAME: &str = "FLOW_NODE_SERVER";
68/// wrapping flow node manager to avoid orphan rule with Arc<...>
69#[derive(Clone)]
70pub struct FlowService {
71    pub dual_engine: FlowDualEngineRef,
72}
73
74impl FlowService {
75    pub fn new(manager: FlowDualEngineRef) -> Self {
76        Self {
77            dual_engine: manager,
78        }
79    }
80}
81
82#[async_trait::async_trait]
83impl flow_server::Flow for FlowService {
84    async fn handle_create_remove(
85        &self,
86        request: Request<FlowRequest>,
87    ) -> Result<Response<FlowResponse>, Status> {
88        let _timer = METRIC_FLOW_PROCESSING_TIME
89            .with_label_values(&["ddl"])
90            .start_timer();
91
92        let request = request.into_inner();
93        self.dual_engine
94            .handle(request)
95            .await
96            .map_err(|err| {
97                common_telemetry::error!(err; "Failed to handle flow request");
98                err
99            })
100            .map(Response::new)
101            .map_err(to_status_with_last_err)
102    }
103
104    async fn handle_mirror_request(
105        &self,
106        request: Request<InsertRequests>,
107    ) -> Result<Response<FlowResponse>, Status> {
108        let _timer = METRIC_FLOW_PROCESSING_TIME
109            .with_label_values(&["insert"])
110            .start_timer();
111
112        let request = request.into_inner();
113        // TODO(discord9): fix protobuf import order shenanigans to remove this duplicated define
114        let mut row_count = 0;
115        let request = api::v1::region::InsertRequests {
116            requests: request
117                .requests
118                .into_iter()
119                .map(|insert| {
120                    insert.rows.as_ref().inspect(|x| row_count += x.rows.len());
121                    api::v1::region::InsertRequest {
122                        region_id: insert.region_id,
123                        rows: insert.rows,
124                    }
125                })
126                .collect_vec(),
127        };
128
129        METRIC_FLOW_ROWS
130            .with_label_values(&["in"])
131            .inc_by(row_count as u64);
132
133        self.dual_engine
134            .handle_inserts(request)
135            .await
136            .map(Response::new)
137            .map_err(to_status_with_last_err)
138    }
139}
140
141#[derive(Clone)]
142pub struct FlownodeServer {
143    inner: Arc<FlownodeServerInner>,
144}
145
146/// FlownodeServerInner is the inner state of FlownodeServer,
147/// this struct mostly useful for construct/start and stop the
148/// flow node server
149struct FlownodeServerInner {
150    /// worker shutdown signal, not to be confused with server_shutdown_tx
151    worker_shutdown_tx: Mutex<broadcast::Sender<()>>,
152    /// server shutdown signal for shutdown grpc server
153    server_shutdown_tx: Mutex<broadcast::Sender<()>>,
154    /// streaming task handler
155    streaming_task_handler: Mutex<Option<JoinHandle<()>>>,
156    flow_service: FlowService,
157}
158
159impl FlownodeServer {
160    pub fn new(flow_service: FlowService) -> Self {
161        let (tx, _rx) = broadcast::channel::<()>(1);
162        let (server_tx, _server_rx) = broadcast::channel::<()>(1);
163        Self {
164            inner: Arc::new(FlownodeServerInner {
165                flow_service,
166                worker_shutdown_tx: Mutex::new(tx),
167                server_shutdown_tx: Mutex::new(server_tx),
168                streaming_task_handler: Mutex::new(None),
169            }),
170        }
171    }
172
173    /// Start the background task for streaming computation.
174    ///
175    /// Should be called only after heartbeat is establish, hence can get cluster info
176    async fn start_workers(&self) -> Result<(), Error> {
177        let manager_ref = self.inner.flow_service.dual_engine.clone();
178        let handle = manager_ref
179            .streaming_engine()
180            .run_background(Some(self.inner.worker_shutdown_tx.lock().await.subscribe()));
181        self.inner
182            .streaming_task_handler
183            .lock()
184            .await
185            .replace(handle);
186
187        self.inner
188            .flow_service
189            .dual_engine
190            .start_flow_consistent_check_task()
191            .await?;
192
193        Ok(())
194    }
195
196    /// Stop the background task for streaming computation.
197    async fn stop_workers(&self) -> Result<(), Error> {
198        let tx = self.inner.worker_shutdown_tx.lock().await;
199        if tx.send(()).is_err() {
200            info!("Receiver dropped, the flow node server has already shutdown");
201        }
202        self.inner
203            .flow_service
204            .dual_engine
205            .stop_flow_consistent_check_task()
206            .await?;
207        Ok(())
208    }
209}
210
211impl FlownodeServer {
212    pub fn create_flow_service(&self) -> flow_server::FlowServer<impl flow_server::Flow> {
213        flow_server::FlowServer::new(self.inner.flow_service.clone())
214            .accept_compressed(CompressionEncoding::Gzip)
215            .send_compressed(CompressionEncoding::Gzip)
216            .accept_compressed(CompressionEncoding::Zstd)
217            .send_compressed(CompressionEncoding::Zstd)
218    }
219}
220
221#[async_trait::async_trait]
222impl servers::server::Server for FlownodeServer {
223    async fn shutdown(&self) -> Result<(), servers::error::Error> {
224        let tx = self.inner.server_shutdown_tx.lock().await;
225        if tx.send(()).is_err() {
226            info!("Receiver dropped, the flow node server has already shutdown");
227        }
228        info!("Shutdown flow node server");
229
230        Ok(())
231    }
232
233    async fn start(&mut self, addr: SocketAddr) -> Result<(), servers::error::Error> {
234        let mut rx_server = self.inner.server_shutdown_tx.lock().await.subscribe();
235
236        let incoming = {
237            let listener = TcpListener::bind(addr)
238                .await
239                .context(TcpBindSnafu { addr })?;
240            let addr = listener.local_addr().context(TcpBindSnafu { addr })?;
241            let incoming =
242                TcpIncoming::from_listener(listener, true, None).context(TcpIncomingSnafu)?;
243            info!("flow server is bound to {}", addr);
244
245            incoming
246        };
247
248        let builder = tonic::transport::Server::builder().add_service(self.create_flow_service());
249
250        let _handle = common_runtime::spawn_global(async move {
251            let _result = builder
252                .serve_with_incoming_shutdown(incoming, rx_server.recv().map(drop))
253                .await
254                .context(StartGrpcSnafu);
255        });
256
257        Ok(())
258    }
259
260    fn name(&self) -> &str {
261        FLOW_NODE_SERVER_NAME
262    }
263}
264
265/// The flownode server instance.
266pub struct FlownodeInstance {
267    flownode_server: FlownodeServer,
268    services: ServerHandlers,
269    heartbeat_task: Option<HeartbeatTask>,
270}
271
272impl FlownodeInstance {
273    pub async fn start(&mut self) -> Result<(), crate::Error> {
274        if let Some(task) = &self.heartbeat_task {
275            task.start().await?;
276        }
277
278        self.flownode_server.start_workers().await?;
279
280        self.services.start_all().await.context(StartServerSnafu)?;
281
282        Ok(())
283    }
284    pub async fn shutdown(&mut self) -> Result<(), Error> {
285        self.services
286            .shutdown_all()
287            .await
288            .context(ShutdownServerSnafu)?;
289
290        self.flownode_server.stop_workers().await?;
291
292        if let Some(task) = &self.heartbeat_task {
293            task.shutdown();
294        }
295
296        Ok(())
297    }
298
299    pub fn flownode_server(&self) -> &FlownodeServer {
300        &self.flownode_server
301    }
302
303    pub fn flow_engine(&self) -> FlowDualEngineRef {
304        self.flownode_server.inner.flow_service.dual_engine.clone()
305    }
306
307    pub fn setup_services(&mut self, services: ServerHandlers) {
308        self.services = services;
309    }
310}
311
312pub fn get_flow_auth_options(fn_opts: &FlownodeOptions) -> Result<Option<FlowAuthHeader>, Error> {
313    if let Some(user_provider) = fn_opts.user_provider.as_ref() {
314        let static_provider = auth::static_user_provider_from_option(user_provider)
315            .context(IllegalAuthConfigSnafu)?;
316
317        let (usr, pwd) = static_provider
318            .get_one_user_pwd()
319            .context(IllegalAuthConfigSnafu)?;
320        let auth_header = FlowAuthHeader::from_user_pwd(&usr, &pwd);
321        return Ok(Some(auth_header));
322    }
323
324    Ok(None)
325}
326
327/// [`FlownodeInstance`] Builder
328pub struct FlownodeBuilder {
329    opts: FlownodeOptions,
330    plugins: Plugins,
331    table_meta: TableMetadataManagerRef,
332    catalog_manager: CatalogManagerRef,
333    flow_metadata_manager: FlowMetadataManagerRef,
334    heartbeat_task: Option<HeartbeatTask>,
335    /// receive a oneshot sender to send state size report
336    state_report_handler: Option<StateReportHandler>,
337    frontend_client: Arc<FrontendClient>,
338}
339
340impl FlownodeBuilder {
341    /// init flownode builder
342    pub fn new(
343        opts: FlownodeOptions,
344        plugins: Plugins,
345        table_meta: TableMetadataManagerRef,
346        catalog_manager: CatalogManagerRef,
347        flow_metadata_manager: FlowMetadataManagerRef,
348        frontend_client: Arc<FrontendClient>,
349    ) -> Self {
350        Self {
351            opts,
352            plugins,
353            table_meta,
354            catalog_manager,
355            flow_metadata_manager,
356            heartbeat_task: None,
357            state_report_handler: None,
358            frontend_client,
359        }
360    }
361
362    pub fn with_heartbeat_task(self, heartbeat_task: HeartbeatTask) -> Self {
363        let (sender, receiver) = SizeReportSender::new();
364        Self {
365            heartbeat_task: Some(heartbeat_task.with_query_stat_size(sender)),
366            state_report_handler: Some(receiver),
367            ..self
368        }
369    }
370
371    pub async fn build(mut self) -> Result<FlownodeInstance, Error> {
372        // TODO(discord9): does this query engine need those?
373        let query_engine_factory = QueryEngineFactory::new_with_plugins(
374            // query engine in flownode is only used for translate plan with resolved table source.
375            self.catalog_manager.clone(),
376            None,
377            None,
378            None,
379            None,
380            false,
381            Default::default(),
382            self.opts.query.clone(),
383        );
384        let manager = Arc::new(
385            self.build_manager(query_engine_factory.query_engine())
386                .await?,
387        );
388        let batching = Arc::new(BatchingEngine::new(
389            self.frontend_client.clone(),
390            query_engine_factory.query_engine(),
391            self.flow_metadata_manager.clone(),
392            self.table_meta.clone(),
393            self.catalog_manager.clone(),
394        ));
395        let dual = FlowDualEngine::new(
396            manager.clone(),
397            batching,
398            self.flow_metadata_manager.clone(),
399            self.catalog_manager.clone(),
400            self.plugins.clone(),
401        );
402
403        let server = FlownodeServer::new(FlowService::new(Arc::new(dual)));
404
405        let heartbeat_task = self.heartbeat_task;
406
407        let instance = FlownodeInstance {
408            flownode_server: server,
409            services: ServerHandlers::default(),
410            heartbeat_task,
411        };
412        Ok(instance)
413    }
414
415    /// build [`FlowWorkerManager`], note this doesn't take ownership of `self`,
416    /// nor does it actually start running the worker.
417    async fn build_manager(
418        &mut self,
419        query_engine: Arc<dyn QueryEngine>,
420    ) -> Result<StreamingEngine, Error> {
421        let table_meta = self.table_meta.clone();
422
423        register_function_to_query_engine(&query_engine);
424
425        let num_workers = self.opts.flow.num_workers;
426
427        let node_id = self.opts.node_id.map(|id| id as u32);
428
429        let mut man = StreamingEngine::new(node_id, query_engine, table_meta);
430        for worker_id in 0..num_workers {
431            let (tx, rx) = oneshot::channel();
432
433            let _handle = std::thread::Builder::new()
434                .name(format!("flow-worker-{}", worker_id))
435                .spawn(move || {
436                    let (handle, mut worker) = create_worker();
437                    let _ = tx.send(handle);
438                    info!("Flow Worker started in new thread");
439                    worker.run();
440                });
441            let worker_handle = rx.await.map_err(|e| {
442                UnexpectedSnafu {
443                    reason: format!("Failed to receive worker handle: {}", e),
444                }
445                .build()
446            })?;
447            man.add_worker_handle(worker_handle);
448        }
449        if let Some(handler) = self.state_report_handler.take() {
450            man = man.with_state_report_handler(handler).await;
451        }
452        info!("Flow Node Manager started");
453        Ok(man)
454    }
455}
456
457/// Useful in distributed mode
458pub struct FlownodeServiceBuilder<'a> {
459    opts: &'a FlownodeOptions,
460    grpc_server: Option<FlownodeServer>,
461    enable_http_service: bool,
462}
463
464impl<'a> FlownodeServiceBuilder<'a> {
465    pub fn new(opts: &'a FlownodeOptions) -> Self {
466        Self {
467            opts,
468            grpc_server: None,
469            enable_http_service: false,
470        }
471    }
472
473    pub fn enable_http_service(self) -> Self {
474        Self {
475            enable_http_service: true,
476            ..self
477        }
478    }
479
480    pub fn with_grpc_server(self, grpc_server: FlownodeServer) -> Self {
481        Self {
482            grpc_server: Some(grpc_server),
483            ..self
484        }
485    }
486
487    pub fn build(mut self) -> Result<ServerHandlers, Error> {
488        let handlers = ServerHandlers::default();
489        if let Some(grpc_server) = self.grpc_server.take() {
490            let addr: SocketAddr = self.opts.grpc.bind_addr.parse().context(ParseAddrSnafu {
491                addr: &self.opts.grpc.bind_addr,
492            })?;
493            let handler: ServerHandler = (Box::new(grpc_server), addr);
494            handlers.insert(handler);
495        }
496
497        if self.enable_http_service {
498            let http_server = HttpServerBuilder::new(self.opts.http.clone())
499                .with_metrics_handler(MetricsHandler)
500                .build();
501            let addr: SocketAddr = self.opts.http.addr.parse().context(ParseAddrSnafu {
502                addr: &self.opts.http.addr,
503            })?;
504            let handler: ServerHandler = (Box::new(http_server), addr);
505            handlers.insert(handler);
506        }
507        Ok(handlers)
508    }
509}
510
511/// Basically a tiny frontend that communicates with datanode, different from [`FrontendClient`] which
512/// connect to a real frontend instead, this is used for flow's streaming engine. And is for simple query.
513///
514/// For heavy query use [`FrontendClient`] which offload computation to frontend, lifting the load from flownode
515#[derive(Clone)]
516pub struct FrontendInvoker {
517    inserter: Arc<Inserter>,
518    deleter: Arc<Deleter>,
519    statement_executor: Arc<StatementExecutor>,
520}
521
522impl FrontendInvoker {
523    pub fn new(
524        inserter: Arc<Inserter>,
525        deleter: Arc<Deleter>,
526        statement_executor: Arc<StatementExecutor>,
527    ) -> Self {
528        Self {
529            inserter,
530            deleter,
531            statement_executor,
532        }
533    }
534
535    pub async fn build_from(
536        flow_streaming_engine: FlowStreamingEngineRef,
537        catalog_manager: CatalogManagerRef,
538        kv_backend: KvBackendRef,
539        layered_cache_registry: LayeredCacheRegistryRef,
540        procedure_executor: ProcedureExecutorRef,
541        node_manager: NodeManagerRef,
542    ) -> Result<FrontendInvoker, Error> {
543        let table_route_cache: TableRouteCacheRef =
544            layered_cache_registry.get().context(CacheRequiredSnafu {
545                name: TABLE_ROUTE_CACHE_NAME,
546            })?;
547
548        let partition_manager = Arc::new(PartitionRuleManager::new(
549            kv_backend.clone(),
550            table_route_cache.clone(),
551        ));
552
553        let table_flownode_cache: TableFlownodeSetCacheRef =
554            layered_cache_registry.get().context(CacheRequiredSnafu {
555                name: TABLE_FLOWNODE_SET_CACHE_NAME,
556            })?;
557
558        let inserter = Arc::new(Inserter::new(
559            catalog_manager.clone(),
560            partition_manager.clone(),
561            node_manager.clone(),
562            table_flownode_cache,
563        ));
564
565        let deleter = Arc::new(Deleter::new(
566            catalog_manager.clone(),
567            partition_manager.clone(),
568            node_manager.clone(),
569        ));
570
571        let query_engine = flow_streaming_engine.query_engine.clone();
572
573        let statement_executor = Arc::new(StatementExecutor::new(
574            catalog_manager.clone(),
575            query_engine.clone(),
576            procedure_executor.clone(),
577            kv_backend.clone(),
578            layered_cache_registry.clone(),
579            inserter.clone(),
580            table_route_cache,
581        ));
582
583        let invoker = FrontendInvoker::new(inserter, deleter, statement_executor);
584        Ok(invoker)
585    }
586}
587
588impl FrontendInvoker {
589    pub async fn row_inserts(
590        &self,
591        requests: RowInsertRequests,
592        ctx: QueryContextRef,
593    ) -> common_frontend::error::Result<Output> {
594        let _timer = METRIC_FLOW_PROCESSING_TIME
595            .with_label_values(&["output_insert"])
596            .start_timer();
597
598        self.inserter
599            .handle_row_inserts(requests, ctx, &self.statement_executor)
600            .await
601            .map_err(BoxedError::new)
602            .context(common_frontend::error::ExternalSnafu)
603    }
604
605    pub async fn row_deletes(
606        &self,
607        requests: RowDeleteRequests,
608        ctx: QueryContextRef,
609    ) -> common_frontend::error::Result<Output> {
610        let _timer = METRIC_FLOW_PROCESSING_TIME
611            .with_label_values(&["output_delete"])
612            .start_timer();
613
614        self.deleter
615            .handle_row_deletes(requests, ctx)
616            .await
617            .map_err(BoxedError::new)
618            .context(common_frontend::error::ExternalSnafu)
619    }
620
621    pub fn statement_executor(&self) -> Arc<StatementExecutor> {
622        self.statement_executor.clone()
623    }
624}
625
626/// get all flow ids in this flownode
627pub(crate) async fn get_all_flow_ids(
628    flow_metadata_manager: &FlowMetadataManagerRef,
629    catalog_manager: &CatalogManagerRef,
630    nodeid: Option<u64>,
631) -> Result<Vec<u32>, Error> {
632    let ret = if let Some(nodeid) = nodeid {
633        let flow_ids_one_node = flow_metadata_manager
634            .flownode_flow_manager()
635            .flows(nodeid)
636            .try_collect::<Vec<_>>()
637            .await
638            .context(ListFlowsSnafu { id: Some(nodeid) })?;
639        flow_ids_one_node.into_iter().map(|(id, _)| id).collect()
640    } else {
641        let all_catalogs = catalog_manager
642            .catalog_names()
643            .await
644            .map_err(BoxedError::new)
645            .context(ExternalSnafu)?;
646        let mut all_flow_ids = vec![];
647        for catalog in all_catalogs {
648            let flows = flow_metadata_manager
649                .flow_name_manager()
650                .flow_names(&catalog)
651                .await
652                .try_collect::<Vec<_>>()
653                .await
654                .map_err(BoxedError::new)
655                .context(ExternalSnafu)?;
656
657            all_flow_ids.extend(flows.into_iter().map(|(_, id)| id.flow_id()));
658        }
659        all_flow_ids
660    };
661
662    Ok(ret)
663}