flow/
server.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Implementation of grpc service for flow node
16
17use std::net::SocketAddr;
18use std::sync::Arc;
19
20use api::v1::flow::DirtyWindowRequests;
21use api::v1::{RowDeleteRequests, RowInsertRequests};
22use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME};
23use catalog::CatalogManagerRef;
24use common_base::Plugins;
25use common_error::ext::BoxedError;
26use common_meta::cache::{LayeredCacheRegistryRef, TableFlownodeSetCacheRef, TableRouteCacheRef};
27use common_meta::key::TableMetadataManagerRef;
28use common_meta::key::flow::FlowMetadataManagerRef;
29use common_meta::kv_backend::KvBackendRef;
30use common_meta::node_manager::{Flownode, NodeManagerRef};
31use common_meta::procedure_executor::ProcedureExecutorRef;
32use common_query::Output;
33use common_runtime::JoinHandle;
34use common_telemetry::tracing::info;
35use futures::TryStreamExt;
36use greptime_proto::v1::flow::{FlowRequest, FlowResponse, InsertRequests, flow_server};
37use itertools::Itertools;
38use operator::delete::Deleter;
39use operator::insert::Inserter;
40use operator::statement::StatementExecutor;
41use partition::manager::PartitionRuleManager;
42use query::{QueryEngine, QueryEngineFactory};
43use servers::add_service;
44use servers::grpc::builder::GrpcServerBuilder;
45use servers::grpc::{GrpcServer, GrpcServerConfig};
46use servers::http::HttpServerBuilder;
47use servers::metrics_handler::MetricsHandler;
48use servers::server::{ServerHandler, ServerHandlers};
49use session::context::QueryContextRef;
50use snafu::{OptionExt, ResultExt};
51use tokio::sync::{Mutex, broadcast, oneshot};
52use tonic::codec::CompressionEncoding;
53use tonic::{Request, Response, Status};
54
55use crate::adapter::flownode_impl::{FlowDualEngine, FlowDualEngineRef};
56use crate::adapter::{FlowStreamingEngineRef, create_worker};
57use crate::batching_mode::engine::BatchingEngine;
58use crate::error::{
59    CacheRequiredSnafu, ExternalSnafu, IllegalAuthConfigSnafu, ListFlowsSnafu, ParseAddrSnafu,
60    ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu, to_status_with_last_err,
61};
62use crate::heartbeat::HeartbeatTask;
63use crate::metrics::{METRIC_FLOW_PROCESSING_TIME, METRIC_FLOW_ROWS};
64use crate::transform::register_function_to_query_engine;
65use crate::utils::{SizeReportSender, StateReportHandler};
66use crate::{Error, FlowAuthHeader, FlownodeOptions, FrontendClient, StreamingEngine};
67
68pub const FLOW_NODE_SERVER_NAME: &str = "FLOW_NODE_SERVER";
69/// wrapping flow node manager to avoid orphan rule with Arc<...>
70#[derive(Clone)]
71pub struct FlowService {
72    pub dual_engine: FlowDualEngineRef,
73}
74
75impl FlowService {
76    pub fn new(manager: FlowDualEngineRef) -> Self {
77        Self {
78            dual_engine: manager,
79        }
80    }
81}
82
83#[async_trait::async_trait]
84impl flow_server::Flow for FlowService {
85    async fn handle_create_remove(
86        &self,
87        request: Request<FlowRequest>,
88    ) -> Result<Response<FlowResponse>, Status> {
89        let _timer = METRIC_FLOW_PROCESSING_TIME
90            .with_label_values(&["ddl"])
91            .start_timer();
92
93        let request = request.into_inner();
94        self.dual_engine
95            .handle(request)
96            .await
97            .map_err(|err| {
98                common_telemetry::error!(err; "Failed to handle flow request");
99                err
100            })
101            .map(Response::new)
102            .map_err(to_status_with_last_err)
103    }
104
105    async fn handle_mirror_request(
106        &self,
107        request: Request<InsertRequests>,
108    ) -> Result<Response<FlowResponse>, Status> {
109        let _timer = METRIC_FLOW_PROCESSING_TIME
110            .with_label_values(&["insert"])
111            .start_timer();
112
113        let request = request.into_inner();
114        // TODO(discord9): fix protobuf import order shenanigans to remove this duplicated define
115        let mut row_count = 0;
116        let request = api::v1::region::InsertRequests {
117            requests: request
118                .requests
119                .into_iter()
120                .map(|insert| {
121                    insert.rows.as_ref().inspect(|x| row_count += x.rows.len());
122                    api::v1::region::InsertRequest {
123                        region_id: insert.region_id,
124                        rows: insert.rows,
125                    }
126                })
127                .collect_vec(),
128        };
129
130        METRIC_FLOW_ROWS
131            .with_label_values(&["in"])
132            .inc_by(row_count as u64);
133
134        self.dual_engine
135            .handle_inserts(request)
136            .await
137            .map(Response::new)
138            .map_err(to_status_with_last_err)
139    }
140
141    async fn handle_mark_dirty_time_window(
142        &self,
143        reqs: Request<DirtyWindowRequests>,
144    ) -> Result<Response<FlowResponse>, Status> {
145        self.dual_engine
146            .handle_mark_window_dirty(reqs.into_inner())
147            .await
148            .map(Response::new)
149            .map_err(to_status_with_last_err)
150    }
151}
152
153#[derive(Clone)]
154pub struct FlownodeServer {
155    inner: Arc<FlownodeServerInner>,
156}
157
158/// FlownodeServerInner is the inner state of FlownodeServer,
159/// this struct mostly useful for construct/start and stop the
160/// flow node server
161struct FlownodeServerInner {
162    /// worker shutdown signal, not to be confused with server_shutdown_tx
163    worker_shutdown_tx: Mutex<broadcast::Sender<()>>,
164    /// server shutdown signal for shutdown grpc server
165    server_shutdown_tx: Mutex<broadcast::Sender<()>>,
166    /// streaming task handler
167    streaming_task_handler: Mutex<Option<JoinHandle<()>>>,
168    flow_service: FlowService,
169}
170
171impl FlownodeServer {
172    pub fn new(flow_service: FlowService) -> Self {
173        let (tx, _rx) = broadcast::channel::<()>(1);
174        let (server_tx, _server_rx) = broadcast::channel::<()>(1);
175        Self {
176            inner: Arc::new(FlownodeServerInner {
177                flow_service,
178                worker_shutdown_tx: Mutex::new(tx),
179                server_shutdown_tx: Mutex::new(server_tx),
180                streaming_task_handler: Mutex::new(None),
181            }),
182        }
183    }
184
185    /// Start the background task for streaming computation.
186    ///
187    /// Should be called only after heartbeat is establish, hence can get cluster info
188    async fn start_workers(&self) -> Result<(), Error> {
189        let manager_ref = self.inner.flow_service.dual_engine.clone();
190        let handle = manager_ref
191            .streaming_engine()
192            .run_background(Some(self.inner.worker_shutdown_tx.lock().await.subscribe()));
193        self.inner
194            .streaming_task_handler
195            .lock()
196            .await
197            .replace(handle);
198
199        self.inner
200            .flow_service
201            .dual_engine
202            .start_flow_consistent_check_task()
203            .await?;
204
205        Ok(())
206    }
207
208    /// Stop the background task for streaming computation.
209    async fn stop_workers(&self) -> Result<(), Error> {
210        let tx = self.inner.worker_shutdown_tx.lock().await;
211        if tx.send(()).is_err() {
212            info!("Receiver dropped, the flow node server has already shutdown");
213        }
214        self.inner
215            .flow_service
216            .dual_engine
217            .stop_flow_consistent_check_task()
218            .await?;
219        Ok(())
220    }
221}
222
223impl FlownodeServer {
224    pub fn create_flow_service(&self) -> flow_server::FlowServer<impl flow_server::Flow> {
225        flow_server::FlowServer::new(self.inner.flow_service.clone())
226            .accept_compressed(CompressionEncoding::Gzip)
227            .send_compressed(CompressionEncoding::Gzip)
228            .accept_compressed(CompressionEncoding::Zstd)
229            .send_compressed(CompressionEncoding::Zstd)
230    }
231}
232
233/// The flownode server instance.
234pub struct FlownodeInstance {
235    flownode_server: FlownodeServer,
236    services: ServerHandlers,
237    heartbeat_task: Option<HeartbeatTask>,
238}
239
240impl FlownodeInstance {
241    pub async fn start(&mut self) -> Result<(), crate::Error> {
242        if let Some(task) = &self.heartbeat_task {
243            task.start().await?;
244        }
245
246        self.flownode_server.start_workers().await?;
247
248        self.services.start_all().await.context(StartServerSnafu)?;
249
250        Ok(())
251    }
252    pub async fn shutdown(&mut self) -> Result<(), Error> {
253        self.services
254            .shutdown_all()
255            .await
256            .context(ShutdownServerSnafu)?;
257
258        self.flownode_server.stop_workers().await?;
259
260        if let Some(task) = &self.heartbeat_task {
261            task.shutdown();
262        }
263
264        Ok(())
265    }
266
267    pub fn flownode_server(&self) -> &FlownodeServer {
268        &self.flownode_server
269    }
270
271    pub fn flow_engine(&self) -> FlowDualEngineRef {
272        self.flownode_server.inner.flow_service.dual_engine.clone()
273    }
274
275    pub fn setup_services(&mut self, services: ServerHandlers) {
276        self.services = services;
277    }
278}
279
280pub fn get_flow_auth_options(fn_opts: &FlownodeOptions) -> Result<Option<FlowAuthHeader>, Error> {
281    if let Some(user_provider) = fn_opts.user_provider.as_ref() {
282        let static_provider = auth::static_user_provider_from_option(user_provider)
283            .context(IllegalAuthConfigSnafu)?;
284
285        let (usr, pwd) = static_provider
286            .get_one_user_pwd()
287            .context(IllegalAuthConfigSnafu)?;
288        let auth_header = FlowAuthHeader::from_user_pwd(&usr, &pwd);
289        return Ok(Some(auth_header));
290    }
291
292    Ok(None)
293}
294
295/// [`FlownodeInstance`] Builder
296pub struct FlownodeBuilder {
297    opts: FlownodeOptions,
298    plugins: Plugins,
299    table_meta: TableMetadataManagerRef,
300    catalog_manager: CatalogManagerRef,
301    flow_metadata_manager: FlowMetadataManagerRef,
302    heartbeat_task: Option<HeartbeatTask>,
303    /// receive a oneshot sender to send state size report
304    state_report_handler: Option<StateReportHandler>,
305    frontend_client: Arc<FrontendClient>,
306}
307
308impl FlownodeBuilder {
309    /// init flownode builder
310    pub fn new(
311        opts: FlownodeOptions,
312        plugins: Plugins,
313        table_meta: TableMetadataManagerRef,
314        catalog_manager: CatalogManagerRef,
315        flow_metadata_manager: FlowMetadataManagerRef,
316        frontend_client: Arc<FrontendClient>,
317    ) -> Self {
318        Self {
319            opts,
320            plugins,
321            table_meta,
322            catalog_manager,
323            flow_metadata_manager,
324            heartbeat_task: None,
325            state_report_handler: None,
326            frontend_client,
327        }
328    }
329
330    pub fn with_heartbeat_task(self, heartbeat_task: HeartbeatTask) -> Self {
331        let (sender, receiver) = SizeReportSender::new();
332        Self {
333            heartbeat_task: Some(heartbeat_task.with_query_stat_size(sender)),
334            state_report_handler: Some(receiver),
335            ..self
336        }
337    }
338
339    pub async fn build(mut self) -> Result<FlownodeInstance, Error> {
340        // TODO(discord9): does this query engine need those?
341        let query_engine_factory = QueryEngineFactory::new_with_plugins(
342            // query engine in flownode is only used for translate plan with resolved table source.
343            self.catalog_manager.clone(),
344            None,
345            None,
346            None,
347            None,
348            None,
349            false,
350            Default::default(),
351            self.opts.query.clone(),
352        );
353        let manager = Arc::new(
354            self.build_manager(query_engine_factory.query_engine())
355                .await?,
356        );
357        let batching = Arc::new(BatchingEngine::new(
358            self.frontend_client.clone(),
359            query_engine_factory.query_engine(),
360            self.flow_metadata_manager.clone(),
361            self.table_meta.clone(),
362            self.catalog_manager.clone(),
363            self.opts.flow.batching_mode.clone(),
364        ));
365        let dual = FlowDualEngine::new(
366            manager.clone(),
367            batching,
368            self.flow_metadata_manager.clone(),
369            self.catalog_manager.clone(),
370            self.plugins.clone(),
371        );
372
373        let server = FlownodeServer::new(FlowService::new(Arc::new(dual)));
374
375        let heartbeat_task = self.heartbeat_task;
376
377        let instance = FlownodeInstance {
378            flownode_server: server,
379            services: ServerHandlers::default(),
380            heartbeat_task,
381        };
382        Ok(instance)
383    }
384
385    /// build [`FlowWorkerManager`], note this doesn't take ownership of `self`,
386    /// nor does it actually start running the worker.
387    async fn build_manager(
388        &mut self,
389        query_engine: Arc<dyn QueryEngine>,
390    ) -> Result<StreamingEngine, Error> {
391        let table_meta = self.table_meta.clone();
392
393        register_function_to_query_engine(&query_engine);
394
395        let num_workers = self.opts.flow.num_workers;
396
397        let node_id = self.opts.node_id.map(|id| id as u32);
398
399        let mut man = StreamingEngine::new(node_id, query_engine, table_meta);
400        for worker_id in 0..num_workers {
401            let (tx, rx) = oneshot::channel();
402
403            let _handle = std::thread::Builder::new()
404                .name(format!("flow-worker-{}", worker_id))
405                .spawn(move || {
406                    let (handle, mut worker) = create_worker();
407                    let _ = tx.send(handle);
408                    info!("Flow Worker started in new thread");
409                    worker.run();
410                });
411            let worker_handle = rx.await.map_err(|e| {
412                UnexpectedSnafu {
413                    reason: format!("Failed to receive worker handle: {}", e),
414                }
415                .build()
416            })?;
417            man.add_worker_handle(worker_handle);
418        }
419        if let Some(handler) = self.state_report_handler.take() {
420            man = man.with_state_report_handler(handler).await;
421        }
422        info!("Flow Node Manager started");
423        Ok(man)
424    }
425}
426
427/// Useful in distributed mode
428pub struct FlownodeServiceBuilder<'a> {
429    opts: &'a FlownodeOptions,
430    grpc_server: Option<GrpcServer>,
431    enable_http_service: bool,
432}
433
434impl<'a> FlownodeServiceBuilder<'a> {
435    pub fn new(opts: &'a FlownodeOptions) -> Self {
436        Self {
437            opts,
438            grpc_server: None,
439            enable_http_service: false,
440        }
441    }
442
443    pub fn enable_http_service(self) -> Self {
444        Self {
445            enable_http_service: true,
446            ..self
447        }
448    }
449
450    pub fn with_grpc_server(self, grpc_server: GrpcServer) -> Self {
451        Self {
452            grpc_server: Some(grpc_server),
453            ..self
454        }
455    }
456
457    pub fn with_default_grpc_server(mut self, flownode_server: &FlownodeServer) -> Self {
458        let grpc_server = Self::grpc_server_builder(self.opts, flownode_server).build();
459        self.grpc_server = Some(grpc_server);
460        self
461    }
462
463    pub fn build(mut self) -> Result<ServerHandlers, Error> {
464        let handlers = ServerHandlers::default();
465        if let Some(grpc_server) = self.grpc_server.take() {
466            let addr: SocketAddr = self.opts.grpc.bind_addr.parse().context(ParseAddrSnafu {
467                addr: &self.opts.grpc.bind_addr,
468            })?;
469            let handler: ServerHandler = (Box::new(grpc_server), addr);
470            handlers.insert(handler);
471        }
472
473        if self.enable_http_service {
474            let http_server = HttpServerBuilder::new(self.opts.http.clone())
475                .with_metrics_handler(MetricsHandler)
476                .build();
477            let addr: SocketAddr = self.opts.http.addr.parse().context(ParseAddrSnafu {
478                addr: &self.opts.http.addr,
479            })?;
480            let handler: ServerHandler = (Box::new(http_server), addr);
481            handlers.insert(handler);
482        }
483        Ok(handlers)
484    }
485
486    pub fn grpc_server_builder(
487        opts: &FlownodeOptions,
488        flownode_server: &FlownodeServer,
489    ) -> GrpcServerBuilder {
490        let config = GrpcServerConfig {
491            max_recv_message_size: opts.grpc.max_recv_message_size.as_bytes() as usize,
492            max_send_message_size: opts.grpc.max_send_message_size.as_bytes() as usize,
493            max_total_message_memory: opts.grpc.max_total_message_memory.as_bytes() as usize,
494            tls: opts.grpc.tls.clone(),
495            max_connection_age: opts.grpc.max_connection_age,
496        };
497        let service = flownode_server.create_flow_service();
498        let runtime = common_runtime::global_runtime();
499        let mut builder = GrpcServerBuilder::new(config, runtime);
500        add_service!(builder, service);
501        builder
502    }
503}
504
505/// Basically a tiny frontend that communicates with datanode, different from [`FrontendClient`] which
506/// connect to a real frontend instead, this is used for flow's streaming engine. And is for simple query.
507///
508/// For heavy query use [`FrontendClient`] which offload computation to frontend, lifting the load from flownode
509#[derive(Clone)]
510pub struct FrontendInvoker {
511    inserter: Arc<Inserter>,
512    deleter: Arc<Deleter>,
513    statement_executor: Arc<StatementExecutor>,
514}
515
516impl FrontendInvoker {
517    pub fn new(
518        inserter: Arc<Inserter>,
519        deleter: Arc<Deleter>,
520        statement_executor: Arc<StatementExecutor>,
521    ) -> Self {
522        Self {
523            inserter,
524            deleter,
525            statement_executor,
526        }
527    }
528
529    pub async fn build_from(
530        flow_streaming_engine: FlowStreamingEngineRef,
531        catalog_manager: CatalogManagerRef,
532        kv_backend: KvBackendRef,
533        layered_cache_registry: LayeredCacheRegistryRef,
534        procedure_executor: ProcedureExecutorRef,
535        node_manager: NodeManagerRef,
536    ) -> Result<FrontendInvoker, Error> {
537        let table_route_cache: TableRouteCacheRef =
538            layered_cache_registry.get().context(CacheRequiredSnafu {
539                name: TABLE_ROUTE_CACHE_NAME,
540            })?;
541
542        let partition_manager = Arc::new(PartitionRuleManager::new(
543            kv_backend.clone(),
544            table_route_cache.clone(),
545        ));
546
547        let table_flownode_cache: TableFlownodeSetCacheRef =
548            layered_cache_registry.get().context(CacheRequiredSnafu {
549                name: TABLE_FLOWNODE_SET_CACHE_NAME,
550            })?;
551
552        let inserter = Arc::new(Inserter::new(
553            catalog_manager.clone(),
554            partition_manager.clone(),
555            node_manager.clone(),
556            table_flownode_cache,
557        ));
558
559        let deleter = Arc::new(Deleter::new(
560            catalog_manager.clone(),
561            partition_manager.clone(),
562            node_manager.clone(),
563        ));
564
565        let query_engine = flow_streaming_engine.query_engine.clone();
566
567        let statement_executor = Arc::new(StatementExecutor::new(
568            catalog_manager.clone(),
569            query_engine.clone(),
570            procedure_executor.clone(),
571            kv_backend.clone(),
572            layered_cache_registry.clone(),
573            inserter.clone(),
574            table_route_cache,
575            None,
576        ));
577
578        let invoker = FrontendInvoker::new(inserter, deleter, statement_executor);
579        Ok(invoker)
580    }
581}
582
583impl FrontendInvoker {
584    pub async fn row_inserts(
585        &self,
586        requests: RowInsertRequests,
587        ctx: QueryContextRef,
588    ) -> common_frontend::error::Result<Output> {
589        let _timer = METRIC_FLOW_PROCESSING_TIME
590            .with_label_values(&["output_insert"])
591            .start_timer();
592
593        self.inserter
594            .handle_row_inserts(requests, ctx, &self.statement_executor, false, false)
595            .await
596            .map_err(BoxedError::new)
597            .context(common_frontend::error::ExternalSnafu)
598    }
599
600    pub async fn row_deletes(
601        &self,
602        requests: RowDeleteRequests,
603        ctx: QueryContextRef,
604    ) -> common_frontend::error::Result<Output> {
605        let _timer = METRIC_FLOW_PROCESSING_TIME
606            .with_label_values(&["output_delete"])
607            .start_timer();
608
609        self.deleter
610            .handle_row_deletes(requests, ctx)
611            .await
612            .map_err(BoxedError::new)
613            .context(common_frontend::error::ExternalSnafu)
614    }
615
616    pub fn statement_executor(&self) -> Arc<StatementExecutor> {
617        self.statement_executor.clone()
618    }
619}
620
621/// get all flow ids in this flownode
622pub(crate) async fn get_all_flow_ids(
623    flow_metadata_manager: &FlowMetadataManagerRef,
624    catalog_manager: &CatalogManagerRef,
625    nodeid: Option<u64>,
626) -> Result<Vec<u32>, Error> {
627    let ret = if let Some(nodeid) = nodeid {
628        let flow_ids_one_node = flow_metadata_manager
629            .flownode_flow_manager()
630            .flows(nodeid)
631            .try_collect::<Vec<_>>()
632            .await
633            .context(ListFlowsSnafu { id: Some(nodeid) })?;
634        flow_ids_one_node.into_iter().map(|(id, _)| id).collect()
635    } else {
636        let all_catalogs = catalog_manager
637            .catalog_names()
638            .await
639            .map_err(BoxedError::new)
640            .context(ExternalSnafu)?;
641        let mut all_flow_ids = vec![];
642        for catalog in all_catalogs {
643            let flows = flow_metadata_manager
644                .flow_name_manager()
645                .flow_names(&catalog)
646                .await
647                .try_collect::<Vec<_>>()
648                .await
649                .map_err(BoxedError::new)
650                .context(ExternalSnafu)?;
651
652            all_flow_ids.extend(flows.into_iter().map(|(_, id)| id.flow_id()));
653        }
654        all_flow_ids
655    };
656
657    Ok(ret)
658}