meta_client/client/
config.rs1use std::sync::Arc;
16
17use api::v1::meta::config_client::ConfigClient;
18use api::v1::meta::{PullConfigRequest, PullConfigResponse, RequestHeader, Role};
19use common_grpc::channel_manager::ChannelManager;
20use common_meta::util;
21use common_telemetry::tracing_context::TracingContext;
22use snafu::{OptionExt, ResultExt, ensure};
23use tokio::sync::RwLock;
24use tonic::codec::CompressionEncoding;
25use tonic::transport::Channel;
26
27use crate::client::{Id, LeaderProviderRef};
28use crate::error;
29use crate::error::{InvalidResponseHeaderSnafu, Result};
30
31#[derive(Clone, Debug)]
32pub struct Client {
33 inner: Arc<RwLock<Inner>>,
34}
35
36impl Client {
37 pub fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self {
38 let inner = Arc::new(RwLock::new(Inner::new(id, role, channel_manager)));
39 Self { inner }
40 }
41
42 pub(crate) async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()> {
43 let mut inner = self.inner.write().await;
44 inner.start_with(leader_provider)
45 }
46
47 pub async fn pull_config(&self) -> Result<PullConfigResponse> {
48 let inner = self.inner.read().await;
49 inner.ask_leader().await?;
50 inner.pull_config().await
51 }
52}
53
54#[derive(Debug)]
55struct Inner {
56 id: Id,
57 role: Role,
58 channel_manager: ChannelManager,
59 leader_provider: Option<LeaderProviderRef>,
60}
61
62impl Inner {
63 fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self {
64 Self {
65 id,
66 role,
67 channel_manager,
68 leader_provider: None,
69 }
70 }
71
72 fn start_with(&mut self, leader_provider: LeaderProviderRef) -> Result<()> {
73 ensure!(
74 !self.is_started(),
75 error::IllegalGrpcClientStateSnafu {
76 err_msg: "Config client already started"
77 }
78 );
79 self.leader_provider = Some(leader_provider);
80 Ok(())
81 }
82
83 async fn ask_leader(&self) -> Result<String> {
84 let Some(leader_provider) = self.leader_provider.as_ref() else {
85 return error::IllegalGrpcClientStateSnafu {
86 err_msg: "not started",
87 }
88 .fail();
89 };
90 leader_provider.ask_leader().await
91 }
92
93 async fn pull_config(&self) -> Result<PullConfigResponse> {
94 ensure!(
95 self.is_started(),
96 error::IllegalGrpcClientStateSnafu {
97 err_msg: "Config client not start"
98 }
99 );
100
101 let leader_addr = self
102 .leader_provider
103 .as_ref()
104 .unwrap()
105 .leader()
106 .context(error::NoLeaderSnafu)?;
107 let mut client = self.make_client(&leader_addr)?;
108
109 let header = RequestHeader::new(
110 self.id,
111 self.role,
112 TracingContext::from_current_span().to_w3c(),
113 );
114 let req = PullConfigRequest {
115 header: Some(header),
116 };
117
118 let res = client
119 .pull_config(req)
120 .await
121 .map_err(error::Error::from)?
122 .into_inner();
123
124 util::check_response_header(res.header.as_ref()).context(InvalidResponseHeaderSnafu)?;
125
126 Ok(res)
127 }
128
129 fn make_client(&self, addr: impl AsRef<str>) -> Result<ConfigClient<Channel>> {
130 let channel = self
131 .channel_manager
132 .get(addr)
133 .context(error::CreateChannelSnafu)?;
134
135 Ok(ConfigClient::new(channel)
136 .accept_compressed(CompressionEncoding::Zstd)
137 .accept_compressed(CompressionEncoding::Gzip)
138 .send_compressed(CompressionEncoding::Zstd))
139 }
140
141 #[inline]
142 fn is_started(&self) -> bool {
143 self.leader_provider.is_some()
144 }
145}