meta_client/client/
ask_leader.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Debug;
16use std::sync::{Arc, RwLock};
17use std::time::Duration;
18
19use api::v1::meta::heartbeat_client::HeartbeatClient;
20use api::v1::meta::{AskLeaderRequest, RequestHeader, Role};
21use async_trait::async_trait;
22use common_grpc::channel_manager::ChannelManager;
23use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
24use common_telemetry::tracing_context::TracingContext;
25use common_telemetry::warn;
26use rand::seq::SliceRandom;
27use snafu::ResultExt;
28use tokio::time::timeout;
29use tonic::transport::Channel;
30
31use crate::client::Id;
32use crate::error;
33use crate::error::Result;
34
35pub type LeaderProviderRef = Arc<dyn LeaderProvider>;
36
37/// Provide [`MetaClient`] a Metasrv leader's address.
38#[async_trait]
39pub trait LeaderProvider: Debug + Send + Sync {
40    /// Get the leader of the Metasrv. If it returns `None`, or the leader is outdated,
41    /// you can use `ask_leader` to find a new one.
42    fn leader(&self) -> Option<String>;
43
44    /// Find the current leader of the Metasrv.
45    async fn ask_leader(&self) -> Result<String>;
46}
47
48pub type LeaderProviderFactoryRef = Arc<dyn LeaderProviderFactory>;
49
50/// A factory for creating [`LeaderProvider`] instances.
51pub trait LeaderProviderFactory: Send + Sync + Debug {
52    fn create(&self, peers: &[&str]) -> LeaderProviderRef;
53}
54
55#[derive(Debug)]
56struct LeadershipGroup {
57    leader: Option<String>,
58    peers: Vec<String>,
59}
60
61#[derive(Clone, Debug)]
62pub struct AskLeader {
63    id: Id,
64    role: Role,
65    leadership_group: Arc<RwLock<LeadershipGroup>>,
66    channel_manager: ChannelManager,
67    max_retry: usize,
68}
69
70impl AskLeader {
71    pub fn new(
72        id: Id,
73        role: Role,
74        peers: impl Into<Vec<String>>,
75        channel_manager: ChannelManager,
76        max_retry: usize,
77    ) -> Self {
78        let leadership_group = Arc::new(RwLock::new(LeadershipGroup {
79            leader: None,
80            peers: peers.into(),
81        }));
82        Self {
83            id,
84            role,
85            leadership_group,
86            channel_manager,
87            max_retry,
88        }
89    }
90
91    pub fn get_leader(&self) -> Option<String> {
92        self.leadership_group.read().unwrap().leader.clone()
93    }
94
95    async fn ask_leader_inner(&self) -> Result<String> {
96        let mut peers = {
97            let leadership_group = self.leadership_group.read().unwrap();
98            leadership_group.peers.clone()
99        };
100        peers.shuffle(&mut rand::rng());
101
102        let req = AskLeaderRequest {
103            header: Some(RequestHeader::new(
104                self.id,
105                self.role,
106                TracingContext::from_current_span().to_w3c(),
107            )),
108        };
109
110        let (tx, mut rx) = tokio::sync::mpsc::channel(peers.len());
111        let channel_manager = self.channel_manager.clone();
112
113        for addr in &peers {
114            let mut client = self.create_asker(addr)?;
115            let tx_clone = tx.clone();
116            let req = req.clone();
117            let addr = addr.clone();
118            let channel_manager = channel_manager.clone();
119            tokio::spawn(async move {
120                match client.ask_leader(req).await {
121                    Ok(res) => {
122                        if let Some(endpoint) = res.into_inner().leader {
123                            let _ = tx_clone.send(endpoint.addr).await;
124                        } else {
125                            warn!("No leader from: {addr}");
126                        };
127                    }
128                    Err(status) => {
129                        // Reset cached channel even on generic errors: the VIP may keep us on a dead
130                        // backend, so forcing a reconnect gives us a chance to hit a healthy peer.
131                        Self::reset_channels_with_manager(
132                            &channel_manager,
133                            std::slice::from_ref(&addr),
134                        );
135                        warn!("Failed to ask leader from: {addr}, {status}");
136                    }
137                }
138            });
139        }
140
141        let leader = match timeout(
142            self.channel_manager
143                .config()
144                .timeout
145                .unwrap_or(Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS)),
146            rx.recv(),
147        )
148        .await
149        {
150            Ok(Some(leader)) => leader,
151            Ok(None) => return error::NoLeaderSnafu.fail(),
152            Err(e) => {
153                // All peers timed out. Reset channels to force reconnection,
154                // which may help escape dead backends in VIP/LB scenarios.
155                Self::reset_channels_with_manager(&self.channel_manager, &peers);
156                return Err(e).context(error::AskLeaderTimeoutSnafu);
157            }
158        };
159
160        let mut leadership_group = self.leadership_group.write().unwrap();
161        leadership_group.leader = Some(leader.clone());
162
163        Ok(leader)
164    }
165
166    pub async fn ask_leader(&self) -> Result<String> {
167        let mut times = 0;
168        while times < self.max_retry {
169            match self.ask_leader_inner().await {
170                Ok(res) => {
171                    return Ok(res);
172                }
173                Err(err) => {
174                    warn!("Failed to ask leader, source: {err}, retry {times} times");
175                    times += 1;
176                    continue;
177                }
178            }
179        }
180
181        error::RetryTimesExceededSnafu {
182            msg: "Failed to ask leader",
183            times: self.max_retry,
184        }
185        .fail()
186    }
187
188    fn create_asker(&self, addr: impl AsRef<str>) -> Result<HeartbeatClient<Channel>> {
189        Ok(HeartbeatClient::new(
190            self.channel_manager
191                .get(addr)
192                .context(error::CreateChannelSnafu)?,
193        ))
194    }
195
196    /// Drop cached channels for the given peers so a fresh connection is used next time.
197    fn reset_channels_with_manager(channel_manager: &ChannelManager, peers: &[String]) {
198        if peers.is_empty() {
199            return;
200        }
201
202        channel_manager.retain_channel(|addr, _| !peers.iter().any(|peer| peer == addr));
203    }
204}
205
206#[async_trait]
207impl LeaderProvider for AskLeader {
208    fn leader(&self) -> Option<String> {
209        self.get_leader()
210    }
211
212    async fn ask_leader(&self) -> Result<String> {
213        self.ask_leader().await
214    }
215}
216
217/// A factory for creating [`LeaderProvider`] instances.
218#[derive(Clone, Debug)]
219pub struct LeaderProviderFactoryImpl {
220    id: Id,
221    role: Role,
222    max_retry: usize,
223    channel_manager: ChannelManager,
224}
225
226impl LeaderProviderFactoryImpl {
227    pub fn new(id: Id, role: Role, max_retry: usize, channel_manager: ChannelManager) -> Self {
228        Self {
229            id,
230            role,
231            max_retry,
232            channel_manager,
233        }
234    }
235}
236
237impl LeaderProviderFactory for LeaderProviderFactoryImpl {
238    fn create(&self, peers: &[&str]) -> LeaderProviderRef {
239        Arc::new(AskLeader::new(
240            self.id,
241            self.role,
242            peers.iter().map(|p| p.to_string()).collect::<Vec<_>>(),
243            self.channel_manager.clone(),
244            self.max_retry,
245        ))
246    }
247}