common_frontend/
selector.rs1use std::time::Duration;
16
17use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
18use common_meta::cluster::{ClusterInfo, NodeInfo, Role};
19use greptime_proto::v1::frontend::{
20 frontend_client, KillProcessRequest, KillProcessResponse, ListProcessRequest,
21 ListProcessResponse,
22};
23use meta_client::MetaClientRef;
24use snafu::ResultExt;
25use tonic::Response;
26
27use crate::error;
28use crate::error::{MetaSnafu, Result};
29
30pub type FrontendClientPtr = Box<dyn FrontendClient>;
31
32#[async_trait::async_trait]
33pub trait FrontendClient: Send {
34 async fn list_process(&mut self, req: ListProcessRequest) -> Result<ListProcessResponse>;
35
36 async fn kill_process(&mut self, req: KillProcessRequest) -> Result<KillProcessResponse>;
37}
38
39#[async_trait::async_trait]
40impl FrontendClient for frontend_client::FrontendClient<tonic::transport::channel::Channel> {
41 async fn list_process(&mut self, req: ListProcessRequest) -> Result<ListProcessResponse> {
42 frontend_client::FrontendClient::<tonic::transport::channel::Channel>::list_process(
43 self, req,
44 )
45 .await
46 .context(error::InvokeFrontendSnafu)
47 .map(Response::into_inner)
48 }
49
50 async fn kill_process(&mut self, req: KillProcessRequest) -> Result<KillProcessResponse> {
51 frontend_client::FrontendClient::<tonic::transport::channel::Channel>::kill_process(
52 self, req,
53 )
54 .await
55 .context(error::InvokeFrontendSnafu)
56 .map(Response::into_inner)
57 }
58}
59
60#[async_trait::async_trait]
61pub trait FrontendSelector {
62 async fn select<F>(&self, predicate: F) -> Result<Vec<FrontendClientPtr>>
63 where
64 F: Fn(&NodeInfo) -> bool + Send;
65}
66
67#[derive(Debug, Clone)]
68pub struct MetaClientSelector {
69 meta_client: MetaClientRef,
70 channel_manager: ChannelManager,
71}
72
73#[async_trait::async_trait]
74impl FrontendSelector for MetaClientSelector {
75 async fn select<F>(&self, predicate: F) -> Result<Vec<FrontendClientPtr>>
76 where
77 F: Fn(&NodeInfo) -> bool + Send,
78 {
79 let nodes = self
80 .meta_client
81 .list_nodes(Some(Role::Frontend))
82 .await
83 .map_err(Box::new)
84 .context(MetaSnafu)?;
85
86 nodes
87 .into_iter()
88 .filter(predicate)
89 .map(|node| {
90 let channel = self
91 .channel_manager
92 .get(node.peer.addr)
93 .context(error::CreateChannelSnafu)?;
94 let client = frontend_client::FrontendClient::new(channel);
95 Ok(Box::new(client) as FrontendClientPtr)
96 })
97 .collect::<Result<Vec<_>>>()
98 }
99}
100
101impl MetaClientSelector {
102 pub fn new(meta_client: MetaClientRef) -> Self {
103 let cfg = ChannelConfig::new()
104 .connect_timeout(Duration::from_secs(30))
105 .timeout(Duration::from_secs(30));
106 let channel_manager = ChannelManager::with_config(cfg);
107 Self {
108 meta_client,
109 channel_manager,
110 }
111 }
112}