servers/grpc/
frontend_grpc_handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::frontend::frontend_server::Frontend;
16use api::v1::frontend::{
17    KillProcessRequest, KillProcessResponse, ListProcessRequest, ListProcessResponse,
18};
19use catalog::process_manager::ProcessManagerRef;
20use common_telemetry::error;
21use tonic::{Code, Request, Response, Status};
22
23#[derive(Clone)]
24pub struct FrontendGrpcHandler {
25    process_manager: ProcessManagerRef,
26}
27
28impl FrontendGrpcHandler {
29    pub fn new(process_manager: ProcessManagerRef) -> Self {
30        Self { process_manager }
31    }
32}
33
34#[async_trait::async_trait]
35impl Frontend for FrontendGrpcHandler {
36    async fn list_process(
37        &self,
38        request: Request<ListProcessRequest>,
39    ) -> Result<Response<ListProcessResponse>, Status> {
40        let list_process_request = request.into_inner();
41        let catalog = if list_process_request.catalog.is_empty() {
42            None
43        } else {
44            Some(list_process_request.catalog.as_str())
45        };
46        let processes = self.process_manager.local_processes(catalog).map_err(|e| {
47            error!(e; "Failed to handle list process request");
48            Status::new(Code::Internal, e.to_string())
49        })?;
50        Ok(Response::new(ListProcessResponse { processes }))
51    }
52
53    async fn kill_process(
54        &self,
55        request: Request<KillProcessRequest>,
56    ) -> Result<Response<KillProcessResponse>, Status> {
57        let req = request.into_inner();
58        let success = self
59            .process_manager
60            .kill_process(req.server_addr, req.catalog, req.process_id)
61            .await
62            .map_err(|e| {
63                error!(e; "Failed to handle kill process request");
64                Status::new(Code::Internal, e.to_string())
65            })?;
66
67        Ok(Response::new(KillProcessResponse { success }))
68    }
69}