meta_client/client/
config.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use api::v1::meta::config_client::ConfigClient;
18use api::v1::meta::{PullConfigRequest, PullConfigResponse, RequestHeader, Role};
19use common_grpc::channel_manager::ChannelManager;
20use common_meta::util;
21use common_telemetry::tracing_context::TracingContext;
22use snafu::{OptionExt, ResultExt, ensure};
23use tokio::sync::RwLock;
24use tonic::codec::CompressionEncoding;
25use tonic::transport::Channel;
26
27use crate::client::{Id, LeaderProviderRef};
28use crate::error;
29use crate::error::{InvalidResponseHeaderSnafu, Result};
30
31#[derive(Clone, Debug)]
32pub struct Client {
33    inner: Arc<RwLock<Inner>>,
34}
35
36impl Client {
37    pub fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self {
38        let inner = Arc::new(RwLock::new(Inner::new(id, role, channel_manager)));
39        Self { inner }
40    }
41
42    pub(crate) async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()> {
43        let mut inner = self.inner.write().await;
44        inner.start_with(leader_provider)
45    }
46
47    pub async fn pull_config(&self) -> Result<PullConfigResponse> {
48        let inner = self.inner.read().await;
49        inner.ask_leader().await?;
50        inner.pull_config().await
51    }
52}
53
54#[derive(Debug)]
55struct Inner {
56    id: Id,
57    role: Role,
58    channel_manager: ChannelManager,
59    leader_provider: Option<LeaderProviderRef>,
60}
61
62impl Inner {
63    fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self {
64        Self {
65            id,
66            role,
67            channel_manager,
68            leader_provider: None,
69        }
70    }
71
72    fn start_with(&mut self, leader_provider: LeaderProviderRef) -> Result<()> {
73        ensure!(
74            !self.is_started(),
75            error::IllegalGrpcClientStateSnafu {
76                err_msg: "Config client already started"
77            }
78        );
79        self.leader_provider = Some(leader_provider);
80        Ok(())
81    }
82
83    async fn ask_leader(&self) -> Result<String> {
84        let Some(leader_provider) = self.leader_provider.as_ref() else {
85            return error::IllegalGrpcClientStateSnafu {
86                err_msg: "not started",
87            }
88            .fail();
89        };
90        leader_provider.ask_leader().await
91    }
92
93    async fn pull_config(&self) -> Result<PullConfigResponse> {
94        ensure!(
95            self.is_started(),
96            error::IllegalGrpcClientStateSnafu {
97                err_msg: "Config client not start"
98            }
99        );
100
101        let leader_addr = self
102            .leader_provider
103            .as_ref()
104            .unwrap()
105            .leader()
106            .context(error::NoLeaderSnafu)?;
107        let mut client = self.make_client(&leader_addr)?;
108
109        let header = RequestHeader::new(
110            self.id,
111            self.role,
112            TracingContext::from_current_span().to_w3c(),
113        );
114        let req = PullConfigRequest {
115            header: Some(header),
116        };
117
118        let res = client
119            .pull_config(req)
120            .await
121            .map_err(error::Error::from)?
122            .into_inner();
123
124        util::check_response_header(res.header.as_ref()).context(InvalidResponseHeaderSnafu)?;
125
126        Ok(res)
127    }
128
129    fn make_client(&self, addr: impl AsRef<str>) -> Result<ConfigClient<Channel>> {
130        let channel = self
131            .channel_manager
132            .get(addr)
133            .context(error::CreateChannelSnafu)?;
134
135        Ok(ConfigClient::new(channel)
136            .accept_compressed(CompressionEncoding::Zstd)
137            .accept_compressed(CompressionEncoding::Gzip)
138            .send_compressed(CompressionEncoding::Zstd))
139    }
140
141    #[inline]
142    fn is_started(&self) -> bool {
143        self.leader_provider.is_some()
144    }
145}