flow/
server.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Implementation of grpc service for flow node
16
17use std::net::SocketAddr;
18use std::sync::Arc;
19
20use api::v1::flow::DirtyWindowRequests;
21use api::v1::{RowDeleteRequests, RowInsertRequests};
22use cache::{PARTITION_INFO_CACHE_NAME, TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME};
23use catalog::CatalogManagerRef;
24use common_base::Plugins;
25use common_error::ext::BoxedError;
26use common_meta::cache::{LayeredCacheRegistryRef, TableFlownodeSetCacheRef, TableRouteCacheRef};
27use common_meta::key::TableMetadataManagerRef;
28use common_meta::key::flow::FlowMetadataManagerRef;
29use common_meta::kv_backend::KvBackendRef;
30use common_meta::node_manager::{Flownode, NodeManagerRef};
31use common_meta::procedure_executor::ProcedureExecutorRef;
32use common_query::Output;
33use common_runtime::JoinHandle;
34use common_telemetry::tracing::info;
35use futures::TryStreamExt;
36use greptime_proto::v1::flow::{FlowRequest, FlowResponse, InsertRequests, flow_server};
37use itertools::Itertools;
38use operator::delete::Deleter;
39use operator::insert::Inserter;
40use operator::statement::StatementExecutor;
41use partition::cache::PartitionInfoCacheRef;
42use partition::manager::PartitionRuleManager;
43use query::{QueryEngine, QueryEngineFactory};
44use servers::add_service;
45use servers::grpc::builder::GrpcServerBuilder;
46use servers::grpc::{GrpcServer, GrpcServerConfig};
47use servers::http::HttpServerBuilder;
48use servers::metrics_handler::MetricsHandler;
49use servers::server::{ServerHandler, ServerHandlers};
50use session::context::QueryContextRef;
51use snafu::{OptionExt, ResultExt};
52use tokio::sync::{Mutex, broadcast, oneshot};
53use tonic::codec::CompressionEncoding;
54use tonic::{Request, Response, Status};
55
56use crate::adapter::flownode_impl::{FlowDualEngine, FlowDualEngineRef};
57use crate::adapter::{FlowStreamingEngineRef, create_worker};
58use crate::batching_mode::engine::BatchingEngine;
59use crate::error::{
60    CacheRequiredSnafu, ExternalSnafu, IllegalAuthConfigSnafu, ListFlowsSnafu, ParseAddrSnafu,
61    ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu, to_status_with_last_err,
62};
63use crate::heartbeat::HeartbeatTask;
64use crate::metrics::{METRIC_FLOW_PROCESSING_TIME, METRIC_FLOW_ROWS};
65use crate::transform::register_function_to_query_engine;
66use crate::utils::{SizeReportSender, StateReportHandler};
67use crate::{Error, FlowAuthHeader, FlownodeOptions, FrontendClient, StreamingEngine};
68
69pub const FLOW_NODE_SERVER_NAME: &str = "FLOW_NODE_SERVER";
70/// wrapping flow node manager to avoid orphan rule with Arc<...>
71#[derive(Clone)]
72pub struct FlowService {
73    pub dual_engine: FlowDualEngineRef,
74}
75
76impl FlowService {
77    pub fn new(manager: FlowDualEngineRef) -> Self {
78        Self {
79            dual_engine: manager,
80        }
81    }
82}
83
84#[async_trait::async_trait]
85impl flow_server::Flow for FlowService {
86    async fn handle_create_remove(
87        &self,
88        request: Request<FlowRequest>,
89    ) -> Result<Response<FlowResponse>, Status> {
90        let _timer = METRIC_FLOW_PROCESSING_TIME
91            .with_label_values(&["ddl"])
92            .start_timer();
93
94        let request = request.into_inner();
95        self.dual_engine
96            .handle(request)
97            .await
98            .map_err(|err| {
99                common_telemetry::error!(err; "Failed to handle flow request");
100                err
101            })
102            .map(Response::new)
103            .map_err(to_status_with_last_err)
104    }
105
106    async fn handle_mirror_request(
107        &self,
108        request: Request<InsertRequests>,
109    ) -> Result<Response<FlowResponse>, Status> {
110        let _timer = METRIC_FLOW_PROCESSING_TIME
111            .with_label_values(&["insert"])
112            .start_timer();
113
114        let request = request.into_inner();
115        // TODO(discord9): fix protobuf import order shenanigans to remove this duplicated define
116        let mut row_count = 0;
117        let request = api::v1::region::InsertRequests {
118            requests: request
119                .requests
120                .into_iter()
121                .map(|insert| {
122                    insert.rows.as_ref().inspect(|x| row_count += x.rows.len());
123                    api::v1::region::InsertRequest {
124                        region_id: insert.region_id,
125                        rows: insert.rows,
126                        partition_expr_version: insert.partition_expr_version,
127                    }
128                })
129                .collect_vec(),
130        };
131
132        METRIC_FLOW_ROWS
133            .with_label_values(&["in"])
134            .inc_by(row_count as u64);
135
136        self.dual_engine
137            .handle_inserts(request)
138            .await
139            .map(Response::new)
140            .map_err(to_status_with_last_err)
141    }
142
143    async fn handle_mark_dirty_time_window(
144        &self,
145        reqs: Request<DirtyWindowRequests>,
146    ) -> Result<Response<FlowResponse>, Status> {
147        self.dual_engine
148            .handle_mark_window_dirty(reqs.into_inner())
149            .await
150            .map(Response::new)
151            .map_err(to_status_with_last_err)
152    }
153}
154
155#[derive(Clone)]
156pub struct FlownodeServer {
157    inner: Arc<FlownodeServerInner>,
158}
159
160/// FlownodeServerInner is the inner state of FlownodeServer,
161/// this struct mostly useful for construct/start and stop the
162/// flow node server
163struct FlownodeServerInner {
164    /// worker shutdown signal, not to be confused with server_shutdown_tx
165    worker_shutdown_tx: Mutex<broadcast::Sender<()>>,
166    /// server shutdown signal for shutdown grpc server
167    server_shutdown_tx: Mutex<broadcast::Sender<()>>,
168    /// streaming task handler
169    streaming_task_handler: Mutex<Option<JoinHandle<()>>>,
170    /// state report task handler
171    state_report_task_handler: Mutex<Option<JoinHandle<()>>>,
172    flow_service: FlowService,
173}
174
175impl FlownodeServer {
176    pub fn new(flow_service: FlowService) -> Self {
177        let (tx, _rx) = broadcast::channel::<()>(1);
178        let (server_tx, _server_rx) = broadcast::channel::<()>(1);
179        Self {
180            inner: Arc::new(FlownodeServerInner {
181                flow_service,
182                worker_shutdown_tx: Mutex::new(tx),
183                server_shutdown_tx: Mutex::new(server_tx),
184                streaming_task_handler: Mutex::new(None),
185                state_report_task_handler: Mutex::new(None),
186            }),
187        }
188    }
189
190    /// Start the background task for streaming computation.
191    ///
192    /// Should be called only after heartbeat is establish, hence can get cluster info
193    async fn start_workers(&self) -> Result<(), Error> {
194        let manager_ref = self.inner.flow_service.dual_engine.clone();
195        let mut state_report_task_handler = self.inner.state_report_task_handler.lock().await;
196        if state_report_task_handler.is_none() {
197            *state_report_task_handler = manager_ref.clone().start_state_report_task().await;
198        }
199        drop(state_report_task_handler);
200        let handle = manager_ref
201            .streaming_engine()
202            .run_background(Some(self.inner.worker_shutdown_tx.lock().await.subscribe()));
203        self.inner
204            .streaming_task_handler
205            .lock()
206            .await
207            .replace(handle);
208
209        self.inner
210            .flow_service
211            .dual_engine
212            .start_flow_consistent_check_task()
213            .await?;
214
215        Ok(())
216    }
217
218    /// Stop the background task for streaming computation.
219    async fn stop_workers(&self) -> Result<(), Error> {
220        let tx = self.inner.worker_shutdown_tx.lock().await;
221        if tx.send(()).is_err() {
222            info!("Receiver dropped, the flow node server has already shutdown");
223        }
224        // Keep state_report_task_handler alive across worker restarts.
225        // Dropping it here would permanently lose the report channel receiver.
226        self.inner
227            .flow_service
228            .dual_engine
229            .stop_flow_consistent_check_task()
230            .await?;
231        Ok(())
232    }
233}
234
235impl FlownodeServer {
236    pub fn create_flow_service(&self) -> flow_server::FlowServer<impl flow_server::Flow> {
237        flow_server::FlowServer::new(self.inner.flow_service.clone())
238            .accept_compressed(CompressionEncoding::Gzip)
239            .send_compressed(CompressionEncoding::Gzip)
240            .accept_compressed(CompressionEncoding::Zstd)
241            .send_compressed(CompressionEncoding::Zstd)
242    }
243}
244
245/// The flownode server instance.
246pub struct FlownodeInstance {
247    flownode_server: FlownodeServer,
248    services: ServerHandlers,
249    heartbeat_task: Option<HeartbeatTask>,
250}
251
252impl FlownodeInstance {
253    pub async fn start(&mut self) -> Result<(), crate::Error> {
254        if let Some(task) = &self.heartbeat_task {
255            task.start().await?;
256        }
257
258        self.flownode_server.start_workers().await?;
259
260        self.services.start_all().await.context(StartServerSnafu)?;
261
262        Ok(())
263    }
264    pub async fn shutdown(&mut self) -> Result<(), Error> {
265        self.services
266            .shutdown_all()
267            .await
268            .context(ShutdownServerSnafu)?;
269
270        self.flownode_server.stop_workers().await?;
271
272        if let Some(task) = &self.heartbeat_task {
273            task.shutdown();
274        }
275
276        Ok(())
277    }
278
279    pub fn flownode_server(&self) -> &FlownodeServer {
280        &self.flownode_server
281    }
282
283    pub fn flow_engine(&self) -> FlowDualEngineRef {
284        self.flownode_server.inner.flow_service.dual_engine.clone()
285    }
286
287    pub fn setup_services(&mut self, services: ServerHandlers) {
288        self.services = services;
289    }
290}
291
292pub fn get_flow_auth_options(fn_opts: &FlownodeOptions) -> Result<Option<FlowAuthHeader>, Error> {
293    if let Some(user_provider) = fn_opts.user_provider.as_ref() {
294        let static_provider = auth::static_user_provider_from_option(user_provider)
295            .context(IllegalAuthConfigSnafu)?;
296
297        let (usr, pwd) = static_provider
298            .get_one_user_pwd()
299            .context(IllegalAuthConfigSnafu)?;
300        let auth_header = FlowAuthHeader::from_user_pwd(&usr, &pwd);
301        return Ok(Some(auth_header));
302    }
303
304    Ok(None)
305}
306
307/// [`FlownodeInstance`] Builder
308pub struct FlownodeBuilder {
309    opts: FlownodeOptions,
310    plugins: Plugins,
311    table_meta: TableMetadataManagerRef,
312    catalog_manager: CatalogManagerRef,
313    flow_metadata_manager: FlowMetadataManagerRef,
314    heartbeat_task: Option<HeartbeatTask>,
315    /// receive a oneshot sender to send state size report
316    state_report_handler: Option<StateReportHandler>,
317    frontend_client: Arc<FrontendClient>,
318}
319
320impl FlownodeBuilder {
321    /// init flownode builder
322    pub fn new(
323        opts: FlownodeOptions,
324        plugins: Plugins,
325        table_meta: TableMetadataManagerRef,
326        catalog_manager: CatalogManagerRef,
327        flow_metadata_manager: FlowMetadataManagerRef,
328        frontend_client: Arc<FrontendClient>,
329    ) -> Self {
330        Self {
331            opts,
332            plugins,
333            table_meta,
334            catalog_manager,
335            flow_metadata_manager,
336            heartbeat_task: None,
337            state_report_handler: None,
338            frontend_client,
339        }
340    }
341
342    pub fn with_heartbeat_task(self, heartbeat_task: HeartbeatTask) -> Self {
343        let (sender, receiver) = SizeReportSender::new();
344        Self {
345            heartbeat_task: Some(heartbeat_task.with_query_stat_size(sender)),
346            state_report_handler: Some(receiver),
347            ..self
348        }
349    }
350
351    pub async fn build(mut self) -> Result<FlownodeInstance, Error> {
352        // TODO(discord9): does this query engine need those?
353        let query_engine_factory = QueryEngineFactory::new_with_plugins(
354            // query engine in flownode is only used for translate plan with resolved table source.
355            self.catalog_manager.clone(),
356            None,
357            None,
358            None,
359            None,
360            None,
361            false,
362            Default::default(),
363            self.opts.query.clone(),
364        );
365        let manager = Arc::new(
366            self.build_manager(query_engine_factory.query_engine())
367                .await?,
368        );
369        let batching = Arc::new(BatchingEngine::new(
370            self.frontend_client.clone(),
371            query_engine_factory.query_engine(),
372            self.flow_metadata_manager.clone(),
373            self.table_meta.clone(),
374            self.catalog_manager.clone(),
375            self.opts.flow.batching_mode.clone(),
376        ));
377        let dual = Arc::new(FlowDualEngine::new(
378            manager.clone(),
379            batching,
380            self.flow_metadata_manager.clone(),
381            self.catalog_manager.clone(),
382            self.plugins.clone(),
383        ));
384        if let Some(handler) = self.state_report_handler.take() {
385            dual.set_state_report_handler(handler).await;
386        }
387
388        let server = FlownodeServer::new(FlowService::new(dual));
389
390        let heartbeat_task = self.heartbeat_task;
391
392        let instance = FlownodeInstance {
393            flownode_server: server,
394            services: ServerHandlers::default(),
395            heartbeat_task,
396        };
397        Ok(instance)
398    }
399
400    /// build [`FlowWorkerManager`], note this doesn't take ownership of `self`,
401    /// nor does it actually start running the worker.
402    async fn build_manager(
403        &mut self,
404        query_engine: Arc<dyn QueryEngine>,
405    ) -> Result<StreamingEngine, Error> {
406        let table_meta = self.table_meta.clone();
407
408        register_function_to_query_engine(&query_engine);
409
410        let num_workers = self.opts.flow.num_workers;
411
412        let node_id = self.opts.node_id.map(|id| id as u32);
413
414        let mut man = StreamingEngine::new(node_id, query_engine, table_meta);
415        for worker_id in 0..num_workers {
416            let (tx, rx) = oneshot::channel();
417
418            let _handle = std::thread::Builder::new()
419                .name(format!("flow-worker-{}", worker_id))
420                .spawn(move || {
421                    let (handle, mut worker) = create_worker();
422                    let _ = tx.send(handle);
423                    info!("Flow Worker started in new thread");
424                    worker.run();
425                });
426            let worker_handle = rx.await.map_err(|e| {
427                UnexpectedSnafu {
428                    reason: format!("Failed to receive worker handle: {}", e),
429                }
430                .build()
431            })?;
432            man.add_worker_handle(worker_handle);
433        }
434        info!("Flow Node Manager started");
435        Ok(man)
436    }
437}
438
439/// Useful in distributed mode
440pub struct FlownodeServiceBuilder<'a> {
441    opts: &'a FlownodeOptions,
442    grpc_server: Option<GrpcServer>,
443    enable_http_service: bool,
444}
445
446impl<'a> FlownodeServiceBuilder<'a> {
447    pub fn new(opts: &'a FlownodeOptions) -> Self {
448        Self {
449            opts,
450            grpc_server: None,
451            enable_http_service: false,
452        }
453    }
454
455    pub fn enable_http_service(self) -> Self {
456        Self {
457            enable_http_service: true,
458            ..self
459        }
460    }
461
462    pub fn with_grpc_server(self, grpc_server: GrpcServer) -> Self {
463        Self {
464            grpc_server: Some(grpc_server),
465            ..self
466        }
467    }
468
469    pub fn with_default_grpc_server(mut self, flownode_server: &FlownodeServer) -> Self {
470        let grpc_server = Self::grpc_server_builder(self.opts, flownode_server).build();
471        self.grpc_server = Some(grpc_server);
472        self
473    }
474
475    pub fn build(mut self) -> Result<ServerHandlers, Error> {
476        let handlers = ServerHandlers::default();
477        if let Some(grpc_server) = self.grpc_server.take() {
478            let addr: SocketAddr = self.opts.grpc.bind_addr.parse().context(ParseAddrSnafu {
479                addr: &self.opts.grpc.bind_addr,
480            })?;
481            let handler: ServerHandler = (Box::new(grpc_server), addr);
482            handlers.insert(handler);
483        }
484
485        if self.enable_http_service {
486            let http_server = HttpServerBuilder::new(self.opts.http.clone())
487                .with_metrics_handler(MetricsHandler)
488                .build();
489            let addr: SocketAddr = self.opts.http.addr.parse().context(ParseAddrSnafu {
490                addr: &self.opts.http.addr,
491            })?;
492            let handler: ServerHandler = (Box::new(http_server), addr);
493            handlers.insert(handler);
494        }
495        Ok(handlers)
496    }
497
498    pub fn grpc_server_builder(
499        opts: &FlownodeOptions,
500        flownode_server: &FlownodeServer,
501    ) -> GrpcServerBuilder {
502        let config = GrpcServerConfig {
503            max_recv_message_size: opts.grpc.max_recv_message_size.as_bytes() as usize,
504            max_send_message_size: opts.grpc.max_send_message_size.as_bytes() as usize,
505            tls: opts.grpc.tls.clone(),
506            max_connection_age: opts.grpc.max_connection_age,
507        };
508        let service = flownode_server.create_flow_service();
509        let runtime = common_runtime::global_runtime();
510        let mut builder = GrpcServerBuilder::new(config, runtime);
511        add_service!(builder, service);
512        builder
513    }
514}
515
516/// Basically a tiny frontend that communicates with datanode, different from [`FrontendClient`] which
517/// connect to a real frontend instead, this is used for flow's streaming engine. And is for simple query.
518///
519/// For heavy query use [`FrontendClient`] which offload computation to frontend, lifting the load from flownode
520#[derive(Clone)]
521pub struct FrontendInvoker {
522    inserter: Arc<Inserter>,
523    deleter: Arc<Deleter>,
524    statement_executor: Arc<StatementExecutor>,
525}
526
527impl FrontendInvoker {
528    pub fn new(
529        inserter: Arc<Inserter>,
530        deleter: Arc<Deleter>,
531        statement_executor: Arc<StatementExecutor>,
532    ) -> Self {
533        Self {
534            inserter,
535            deleter,
536            statement_executor,
537        }
538    }
539
540    pub async fn build_from(
541        flow_streaming_engine: FlowStreamingEngineRef,
542        catalog_manager: CatalogManagerRef,
543        kv_backend: KvBackendRef,
544        layered_cache_registry: LayeredCacheRegistryRef,
545        procedure_executor: ProcedureExecutorRef,
546        node_manager: NodeManagerRef,
547    ) -> Result<FrontendInvoker, Error> {
548        let table_route_cache: TableRouteCacheRef =
549            layered_cache_registry.get().context(CacheRequiredSnafu {
550                name: TABLE_ROUTE_CACHE_NAME,
551            })?;
552        let partition_info_cache: PartitionInfoCacheRef =
553            layered_cache_registry.get().context(CacheRequiredSnafu {
554                name: PARTITION_INFO_CACHE_NAME,
555            })?;
556
557        let partition_manager = Arc::new(PartitionRuleManager::new(
558            kv_backend.clone(),
559            table_route_cache.clone(),
560            partition_info_cache.clone(),
561        ));
562
563        let table_flownode_cache: TableFlownodeSetCacheRef =
564            layered_cache_registry.get().context(CacheRequiredSnafu {
565                name: TABLE_FLOWNODE_SET_CACHE_NAME,
566            })?;
567
568        let inserter = Arc::new(Inserter::new(
569            catalog_manager.clone(),
570            partition_manager.clone(),
571            node_manager.clone(),
572            table_flownode_cache,
573        ));
574
575        let deleter = Arc::new(Deleter::new(
576            catalog_manager.clone(),
577            partition_manager.clone(),
578            node_manager.clone(),
579        ));
580
581        let query_engine = flow_streaming_engine.query_engine.clone();
582
583        let statement_executor = Arc::new(StatementExecutor::new(
584            catalog_manager.clone(),
585            query_engine.clone(),
586            procedure_executor.clone(),
587            kv_backend.clone(),
588            layered_cache_registry.clone(),
589            inserter.clone(),
590            partition_manager,
591            None,
592        ));
593
594        let invoker = FrontendInvoker::new(inserter, deleter, statement_executor);
595        Ok(invoker)
596    }
597}
598
599impl FrontendInvoker {
600    pub async fn row_inserts(
601        &self,
602        requests: RowInsertRequests,
603        ctx: QueryContextRef,
604    ) -> common_frontend::error::Result<Output> {
605        let _timer = METRIC_FLOW_PROCESSING_TIME
606            .with_label_values(&["output_insert"])
607            .start_timer();
608
609        self.inserter
610            .handle_row_inserts(requests, ctx, &self.statement_executor, false, false)
611            .await
612            .map_err(BoxedError::new)
613            .context(common_frontend::error::ExternalSnafu)
614    }
615
616    pub async fn row_deletes(
617        &self,
618        requests: RowDeleteRequests,
619        ctx: QueryContextRef,
620    ) -> common_frontend::error::Result<Output> {
621        let _timer = METRIC_FLOW_PROCESSING_TIME
622            .with_label_values(&["output_delete"])
623            .start_timer();
624
625        self.deleter
626            .handle_row_deletes(requests, ctx)
627            .await
628            .map_err(BoxedError::new)
629            .context(common_frontend::error::ExternalSnafu)
630    }
631
632    pub fn statement_executor(&self) -> Arc<StatementExecutor> {
633        self.statement_executor.clone()
634    }
635}
636
637/// get all flow ids in this flownode
638pub(crate) async fn get_all_flow_ids(
639    flow_metadata_manager: &FlowMetadataManagerRef,
640    catalog_manager: &CatalogManagerRef,
641    nodeid: Option<u64>,
642) -> Result<Vec<u32>, Error> {
643    let ret = if let Some(nodeid) = nodeid {
644        let flow_ids_one_node = flow_metadata_manager
645            .flownode_flow_manager()
646            .flows(nodeid)
647            .try_collect::<Vec<_>>()
648            .await
649            .context(ListFlowsSnafu { id: Some(nodeid) })?;
650        flow_ids_one_node.into_iter().map(|(id, _)| id).collect()
651    } else {
652        let all_catalogs = catalog_manager
653            .catalog_names()
654            .await
655            .map_err(BoxedError::new)
656            .context(ExternalSnafu)?;
657        let mut all_flow_ids = vec![];
658        for catalog in all_catalogs {
659            let flows = flow_metadata_manager
660                .flow_name_manager()
661                .flow_names(&catalog)
662                .await
663                .try_collect::<Vec<_>>()
664                .await
665                .map_err(BoxedError::new)
666                .context(ExternalSnafu)?;
667
668            all_flow_ids.extend(flows.into_iter().map(|(_, id)| id.flow_id()));
669        }
670        all_flow_ids
671    };
672
673    Ok(ret)
674}
675
676#[cfg(test)]
677mod tests {
678    use std::sync::Arc;
679    use std::time::Duration;
680
681    use catalog::memory::new_memory_catalog_manager;
682    use common_base::Plugins;
683    use common_meta::key::TableMetadataManager;
684    use common_meta::key::flow::FlowMetadataManager;
685    use common_meta::kv_backend::memory::MemoryKvBackend;
686    use query::options::QueryOptions;
687
688    use super::*;
689    use crate::adapter::flownode_impl::FlowDualEngine;
690    use crate::batching_mode::BatchingModeOptions;
691    use crate::batching_mode::engine::BatchingEngine;
692    use crate::utils::SizeReportSender;
693
694    async fn new_test_flownode_server() -> (FlownodeServer, SizeReportSender) {
695        let kv_backend = Arc::new(MemoryKvBackend::new());
696        let table_meta = Arc::new(TableMetadataManager::new(kv_backend.clone()));
697        table_meta.init().await.unwrap();
698        let flow_meta = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
699        let catalog_manager = new_memory_catalog_manager().unwrap();
700        let query_engine = crate::test_utils::create_test_query_engine();
701
702        let streaming_engine = Arc::new(StreamingEngine::new(
703            None,
704            query_engine.clone(),
705            table_meta.clone(),
706        ));
707        let (frontend_client, _handler) =
708            FrontendClient::from_empty_grpc_handler(QueryOptions::default());
709        let batching_engine = Arc::new(BatchingEngine::new(
710            Arc::new(frontend_client),
711            query_engine,
712            flow_meta.clone(),
713            table_meta,
714            catalog_manager.clone(),
715            BatchingModeOptions::default(),
716        ));
717        let dual_engine = Arc::new(FlowDualEngine::new(
718            streaming_engine,
719            batching_engine,
720            flow_meta,
721            catalog_manager,
722            Plugins::new(),
723        ));
724
725        let (report_sender, report_handler) = SizeReportSender::new();
726        dual_engine.set_state_report_handler(report_handler).await;
727
728        let server = FlownodeServer::new(FlowService::new(dual_engine));
729        (server, report_sender)
730    }
731
732    #[tokio::test]
733    async fn test_state_report_handler_survives_worker_restart() {
734        let (server, report_sender) = new_test_flownode_server().await;
735
736        server.start_workers().await.unwrap();
737        report_sender.query(Duration::from_secs(3)).await.unwrap();
738
739        server.stop_workers().await.unwrap();
740        report_sender.query(Duration::from_secs(3)).await.unwrap();
741
742        server.start_workers().await.unwrap();
743        report_sender.query(Duration::from_secs(3)).await.unwrap();
744
745        server.stop_workers().await.unwrap();
746    }
747}