common_frontend/
selector.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Duration;
16
17use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
18use common_meta::cluster::{ClusterInfo, NodeInfo, Role};
19use greptime_proto::v1::frontend::{
20    frontend_client, KillProcessRequest, KillProcessResponse, ListProcessRequest,
21    ListProcessResponse,
22};
23use meta_client::MetaClientRef;
24use snafu::ResultExt;
25use tonic::Response;
26
27use crate::error;
28use crate::error::{MetaSnafu, Result};
29
30pub type FrontendClientPtr = Box<dyn FrontendClient>;
31
32#[async_trait::async_trait]
33pub trait FrontendClient: Send {
34    async fn list_process(&mut self, req: ListProcessRequest) -> Result<ListProcessResponse>;
35
36    async fn kill_process(&mut self, req: KillProcessRequest) -> Result<KillProcessResponse>;
37}
38
39#[async_trait::async_trait]
40impl FrontendClient for frontend_client::FrontendClient<tonic::transport::channel::Channel> {
41    async fn list_process(&mut self, req: ListProcessRequest) -> Result<ListProcessResponse> {
42        frontend_client::FrontendClient::<tonic::transport::channel::Channel>::list_process(
43            self, req,
44        )
45        .await
46        .context(error::InvokeFrontendSnafu)
47        .map(Response::into_inner)
48    }
49
50    async fn kill_process(&mut self, req: KillProcessRequest) -> Result<KillProcessResponse> {
51        frontend_client::FrontendClient::<tonic::transport::channel::Channel>::kill_process(
52            self, req,
53        )
54        .await
55        .context(error::InvokeFrontendSnafu)
56        .map(Response::into_inner)
57    }
58}
59
60#[async_trait::async_trait]
61pub trait FrontendSelector {
62    async fn select<F>(&self, predicate: F) -> Result<Vec<FrontendClientPtr>>
63    where
64        F: Fn(&NodeInfo) -> bool + Send;
65}
66
67#[derive(Debug, Clone)]
68pub struct MetaClientSelector {
69    meta_client: MetaClientRef,
70    channel_manager: ChannelManager,
71}
72
73#[async_trait::async_trait]
74impl FrontendSelector for MetaClientSelector {
75    async fn select<F>(&self, predicate: F) -> Result<Vec<FrontendClientPtr>>
76    where
77        F: Fn(&NodeInfo) -> bool + Send,
78    {
79        let nodes = self
80            .meta_client
81            .list_nodes(Some(Role::Frontend))
82            .await
83            .map_err(Box::new)
84            .context(MetaSnafu)?;
85
86        nodes
87            .into_iter()
88            .filter(predicate)
89            .map(|node| {
90                let channel = self
91                    .channel_manager
92                    .get(node.peer.addr)
93                    .context(error::CreateChannelSnafu)?;
94                let client = frontend_client::FrontendClient::new(channel);
95                Ok(Box::new(client) as FrontendClientPtr)
96            })
97            .collect::<Result<Vec<_>>>()
98    }
99}
100
101impl MetaClientSelector {
102    pub fn new(meta_client: MetaClientRef) -> Self {
103        let cfg = ChannelConfig::new()
104            .connect_timeout(Duration::from_secs(30))
105            .timeout(Duration::from_secs(30));
106        let channel_manager = ChannelManager::with_config(cfg);
107        Self {
108            meta_client,
109            channel_manager,
110        }
111    }
112}