servers/grpc/
frontend_grpc_handler.rs1use api::v1::frontend::frontend_server::Frontend;
16use api::v1::frontend::{
17 KillProcessRequest, KillProcessResponse, ListProcessRequest, ListProcessResponse,
18};
19use catalog::process_manager::ProcessManagerRef;
20use common_telemetry::error;
21use tonic::{Code, Request, Response, Status};
22
23#[derive(Clone)]
24pub struct FrontendGrpcHandler {
25 process_manager: ProcessManagerRef,
26}
27
28impl FrontendGrpcHandler {
29 pub fn new(process_manager: ProcessManagerRef) -> Self {
30 Self { process_manager }
31 }
32}
33
34#[async_trait::async_trait]
35impl Frontend for FrontendGrpcHandler {
36 async fn list_process(
37 &self,
38 request: Request<ListProcessRequest>,
39 ) -> Result<Response<ListProcessResponse>, Status> {
40 let list_process_request = request.into_inner();
41 let catalog = if list_process_request.catalog.is_empty() {
42 None
43 } else {
44 Some(list_process_request.catalog.as_str())
45 };
46 let processes = self.process_manager.local_processes(catalog).map_err(|e| {
47 error!(e; "Failed to handle list process request");
48 Status::new(Code::Internal, e.to_string())
49 })?;
50 Ok(Response::new(ListProcessResponse { processes }))
51 }
52
53 async fn kill_process(
54 &self,
55 request: Request<KillProcessRequest>,
56 ) -> Result<Response<KillProcessResponse>, Status> {
57 let req = request.into_inner();
58 let success = self
59 .process_manager
60 .kill_process(req.server_addr, req.catalog, req.process_id)
61 .await
62 .map_err(|e| {
63 error!(e; "Failed to handle kill process request");
64 Status::new(Code::Internal, e.to_string())
65 })?;
66
67 Ok(Response::new(KillProcessResponse { success }))
68 }
69}